On 07/22/2009 12:27 AM, Paolo Bonzini wrote:
Your proposal is more general, but still assumes that
- the control structure of the reader is simple.
This is what Bison requires. Actually Bison's reader is complex, but
the output is fed to a Flex scanner, which takes care of the complexity.
Here is some code with an example, the Win32 part I'll do in the next
few days. If I had to choose one of the three cases you mentioned, I'd
include this one in gnulib. It shouldn't be too bad to adapt msgfilter
to this API; for Bison it is overkill.
Paolo
/* Filtering of data through a subprocess.
Copyright (C) 2009 Free Software Foundation, Inc.
Written by Bruno Haible <hai...@clisp.cons.org>, 2009.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>. */
#ifndef _PIPE_FILTER_H
#define _PIPE_FILTER_H
#include <stdbool.h>
#include <stddef.h>
#ifdef __cplusplus
extern "C" {
#endif
/* Piping data through a subprocess in the naïve way - write data to the
subprocess and read from the subprocess when you expect it to have
produced results - is subject to two kinds of deadlocks:
1) If you write more than PIPE_MAX bytes or, more generally, if you write
more bytes than the subprocess can handle at once, the subprocess
may write its data and wait on you to read it, but you are currently
busy writing.
2) When you don't know ahead of time how many bytes the subprocess
will produce, the usual technique of calling read (fd, buf, BUFSIZ)
with a fixed BUFSIZ will, on Linux 2.2.17 and on BSD systems, cause
the read() call to block until *all* of the buffer has been filled.
But the subprocess cannot produce more data until you gave it more
input. But you are currently busy reading from it.
This module provides a function that pipes data to a subprocess and
gets its output back via a callback, without risking these deadlocks. */
typedef void (*done_read_fn) (char *read_buf, size_t num_bytes_read,
void *private_data);
struct filter;
/* Create a subprocess and pipe some data through it.
progname is the program name used in error messages.
prog_path is the file name of the program to invoke.
prog_argv is a NULL terminated argument list, starting with
prog_path as first element.
If null_stderr is true, the subprocess' stderr will be redirected
to /dev/null, and the usual error message to stderr will be
omitted. This is suitable when the subprocess does not fulfill an
important task.
If exit_on_error is true, any error will cause the main process to
exit with an error status.
If the subprocess does not start correctly, exit if exit_on_error is
true, otherwise return NULL and set errno.
The caller will write to the subprocess through filter_write; during
calls to filter_write, the done_read function may be called to
process any data that the subprocess has written. done_read will
receive at most read_bufsize bytes stored into buf, as well as a
copy of private_data. */
extern struct filter * filter_create (const char *progname,
const char *prog_path,
const char **prog_argv,
bool null_stderr, bool exit_on_error,
char *read_buf, size_t read_bufsize,
done_read_fn done_read,
void *private_data);
/* Write size bytes starting at buf into the pipe and in the meanwhile
possibly call the done_read function specified in create_filter.
The done_read function may be called in a different thread than
the current thread, depending on the platform. However, it will
always be called before filter_write has returned (or else will be
delayed to the next call to filter_write or filter_close). Return
only after all the entire buffer has been written to the pipe or
the subprocess has exited.
If there is a problem reading or writing, return -1 and set errno.
If the subprocess exits early with nonzero status, return the status.
(In either case, filter_write will instead exit if exit_on_error was
passed as true).
If the subprocess exits early with zero status, subsequent writes
will becomes no-ops and zero is returned.
Otherwise return 0. */
extern int filter_write (struct filter *f,
const char *buf, size_t size);
/* Finish reading the output via the done_read function specified in
create_filter. The done_read function may be called in a different
thread than. However, it will always be called before filter_close
has returned. The write side of the pipe is closed as soon as
filter_close starts, while the read side will be closed just before
it finishes. If there is a problem reading or closing the pipe,
return -1 and set errno. If the subprocess exits early with nonzero
status, return the status. (In either case, filter_close will
instead exit if exit_on_error was passed as true).
Otherwise return 0. */
extern int filter_close (struct filter *f);
#ifdef __cplusplus
}
#endif
#endif /* _PIPE_FILTER_H */
/* Synchronous writing, asynchronous reading of pipes connected to a
subprocess.
Copyright (C) 2009 Free Software Foundation, Inc.
Written by Paolo Bonzini <bonz...@gnu.org>, 2009.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>. */
#include <config.h>
#include "pipe-filter.h"
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>
#include <sys/select.h>
#include <unistd.h>
#include "error.h"
#include "gettext.h"
#include "pipe.h"
#include "wait-process.h"
#include "xalloc.h"
#define _(str) gettext (str)
#ifndef SSIZE_MAX
# define SSIZE_MAX ((ssize_t) (SIZE_MAX / 2))
#endif
/* We use a child process, and communicate through a bidirectional pipe.
To avoid deadlocks, let the child process decide when it wants to write,
and let the parent read accordingly. The parent uses select() to
know whether it must write or read. On platforms without select(),
we use non-blocking I/O. (This means the parent is busy looping
while waiting for the child. Not good. But hardly any platform
lacks select() nowadays.) */
/* On BeOS select() works only on sockets, not on normal file descriptors. */
#ifdef __BEOS__
# undef HAVE_SELECT
#endif
#ifdef EINTR
/* EINTR handling for close(), read(), write(), select().
These functions can return -1/EINTR even though we don't have any
signal handlers set up, namely when we get interrupted via SIGSTOP. */
static inline int
nonintr_close (int fd)
{
int retval;
do
retval = close (fd);
while (retval < 0 && errno == EINTR);
return retval;
}
#undef close /* avoid warning related to gnulib module unistd */
#define close nonintr_close
static inline ssize_t
nonintr_read (int fd, void *buf, size_t count)
{
ssize_t retval;
do
retval = read (fd, buf, count);
while (retval < 0 && errno == EINTR);
return retval;
}
#define read nonintr_read
static inline ssize_t
nonintr_write (int fd, const void *buf, size_t count)
{
ssize_t retval;
do
retval = write (fd, buf, count);
while (retval < 0 && errno == EINTR);
return retval;
}
#undef write /* avoid warning on VMS */
#define write nonintr_write
# if HAVE_SELECT
static inline int
nonintr_select (int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds,
struct timeval *timeout)
{
int retval;
do
retval = select (n, readfds, writefds, exceptfds, timeout);
while (retval < 0 && errno == EINTR);
return retval;
}
# undef select /* avoid warning on VMS */
# define select nonintr_select
# endif
#endif
/* Non-blocking I/O. */
#ifndef O_NONBLOCK
# define O_NONBLOCK O_NDELAY
#endif
#if HAVE_SELECT
# define IS_EAGAIN(errcode) 0
#else
# ifdef EWOULDBLOCK
# define IS_EAGAIN(errcode) ((errcode) == EAGAIN || (errcode) == EWOULDBLOCK)
# else
# define IS_EAGAIN(errcode) ((errcode) == EAGAIN)
# endif
#endif
struct filter {
pid_t child;
const char *progname;
bool null_stderr;
bool exit_on_error;
char *read_buf;
size_t read_bufsize;
done_read_fn done_read;
void *private_data;
int fd[2];
int exit_code;
volatile bool reader_terminated;
volatile bool writer_terminated;
#if (defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__
volatile int writer_final_errno;
volatile int reader_final_errno;
const char *write_buf;
size_t write_bufsize;
HANDLE hReader, hWriter;
/* thread synchronization yadda yadda */
#else
fd_set readfds, writefds;
#endif
};
#if (defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__
static unsigned int WINAPI
writer_thread_func (void *thread_arg)
{
/* writing yadda yadda */
}
static unsigned int WINAPI
reader_thread_func (void *thread_arg)
{
/* reading yadda yadda */
}
static int
filter_init (struct filter *f)
{
f->writer_final_errno = 0;
f->reader_final_errno = 0;
f->write_buf = NULL;
f->write_bufsize = 0;
/* create synchronization objects yadda yadda */
f->hWriter =
(HANDLE) _beginthreadex (NULL, 100000, writer_thread_func, f, 0, NULL);
f->hReader =
(HANDLE) _beginthreadex (NULL, 100000, reader_thread_func, f, 0, NULL);
if (f->hWriter == NULL || f->hReader == NULL)
{
if (f->exit_on_error)
error (EXIT_FAILURE, 0, _("creation of threads failed"));
return -1;
}
else
return 0;
}
static int
filter_loop (struct filter *f, const char *buf, size_t size)
{
/* wake up threads yadda yadda */
if (f->writer_final_errno || f->reader_final_errno)
{
errno = (f->writer_final_errno ? f->writer_final_errno
: f->reader_final_errno);
return -1;
}
else
return 0;
}
static void
filter_cleanup (struct filter *f, bool try_io)
{
TerminateThread (f->hWriter, 1);
close (f->fd[1]);
f->writer_terminated = true;
/* clean up synchronization objects yadda yadda */
f->reader_terminated = true;
close (f->fd[0]);
}
#else
static int
filter_init (struct filter *f)
{
/* Enable non-blocking I/O. This permits the read() and write() calls
to return -1/EAGAIN without blocking; this is important for polling
if HAVE_SELECT is not defined. It also permits the read() and write()
calls to return after partial reads/writes; this is important if
HAVE_SELECT is defined, because select() only says that some data
can be read or written, not how many. Without non-blocking I/O,
Linux 2.2.17 and BSD systems prefer to block instead of returning
with partial results. */
int fcntl_flags;
if ((fcntl_flags = fcntl (f->fd[1], F_GETFL, 0)) < 0
|| fcntl (f->fd[1], F_SETFL, fcntl_flags | O_NONBLOCK) < 0
|| (fcntl_flags = fcntl (f->fd[0], F_GETFL, 0)) < 0
|| fcntl (f->fd[0], F_SETFL, fcntl_flags | O_NONBLOCK) < 0)
{
if (f->exit_on_error)
error (EXIT_FAILURE, errno,
_("cannot set up nonblocking I/O to %s subprocess"),
f->progname);
return -1;
}
FD_ZERO (&f->readfds);
FD_ZERO (&f->writefds);
return 0;
}
static int
filter_loop (struct filter *f, const char *buf, size_t bufsize)
{
static struct timeval tv0;
for (;;)
{
#if HAVE_SELECT
int n = f->fd[0] > f->fd[1] ? f->fd[0] + 1 : f->fd[1] + 1;
if (!f->reader_terminated)
FD_SET (f->fd[0], &f->readfds);
if (!f->writer_terminated)
FD_SET (f->fd[1], &f->writefds);
n = select (n, &f->readfds, (bufsize ? &f->writefds : NULL), NULL,
(!f->writer_terminated && !bufsize ? &tv0 : NULL));
if (n == 0)
break;
if (n < 0)
{
if (f->exit_on_error)
error (EXIT_FAILURE, errno,
_("communication with %s subprocess failed"),
f->progname);
f->writer_terminated = true;
return -1;
}
if (bufsize && FD_ISSET (f->fd[1], &f->writefds))
goto try_write;
if (FD_ISSET (f->fd[0], &f->readfds))
goto try_read;
break;
#endif
/* Attempt to write. */
#if HAVE_SELECT
try_write:
#endif
if (bufsize)
{
ssize_t nwritten = write (f->fd[1], buf,
bufsize > SSIZE_MAX ? SSIZE_MAX : bufsize);
if (nwritten < 0)
{
if (IS_EAGAIN (errno))
continue;
if (f->exit_on_error)
error (EXIT_FAILURE, errno,
_("write to %s subprocess failed"), f->progname);
f->writer_terminated = true;
return -1;
}
else
{
bufsize -= nwritten;
buf += nwritten;
}
}
#if HAVE_SELECT
continue;
#endif
/* Attempt to read. */
#if HAVE_SELECT
try_read:
#endif
{
ssize_t nread = read (f->fd[0], f->read_buf, f->read_bufsize);
if (nread < 0)
{
if (IS_EAGAIN (errno))
continue;
if (f->exit_on_error)
error (EXIT_FAILURE, errno,
_("read from %s subprocess failed"), f->progname);
}
if (nread <= 0)
{
f->reader_terminated = true;
return 0;
}
else
f->done_read (f->read_buf, nread, f->private_data);
}
#if HAVE_SELECT
continue;
#endif
}
}
static void
filter_cleanup (struct filter *f, bool try_io)
{
close (f->fd[1]);
f->writer_terminated = true;
if (try_io && !f->reader_terminated)
filter_loop (f, NULL, 0);
f->reader_terminated = true;
close (f->fd[0]);
}
#endif
static void
filter_terminate (struct filter *f)
{
if (f->exit_code == -1)
{
filter_cleanup (f, !f->reader_terminated && !f->writer_terminated);
f->exit_code = wait_subprocess (f->child, f->progname, true, f->null_stderr,
true, f->exit_on_error, NULL);
if (f->exit_on_error && f->exit_code)
error (EXIT_FAILURE, 0, _("subprocess %s failed (exit status %d)"),
f->progname, f->exit_code);
}
}
/* Create a subprocess and pipe some data through it.
progname is the program name used in error messages.
prog_path is the file name of the program to invoke.
prog_argv is a NULL terminated argument list, starting with
prog_path as first element.
If null_stderr is true, the subprocess' stderr will be redirected
to /dev/null, and the usual error message to stderr will be
omitted. This is suitable when the subprocess does not fulfill an
important task.
If exit_on_error is true, any error will cause the main process to
exit with an error status.
If the subprocess does not start correctly, exit if exit_on_error is
true, otherwise return NULL and set errno.
The caller will write to the subprocess through filter_write; during
calls to filter_write, the done_read function may be called to
process any data that the subprocess has written. done_read will
receive at most read_bufsize bytes stored into buf, as well as a
copy of private_data. */
struct filter *
filter_create (const char *progname,
const char *prog_path, const char **prog_argv,
bool null_stderr, bool exit_on_error,
char *read_buf, size_t read_bufsize,
done_read_fn done_read,
void *private_data)
{
struct filter *f = xmalloc (sizeof (struct filter));
pid_t child;
int fd[2];
/* Open a bidirectional pipe to a subprocess. */
f->child = create_pipe_bidi (progname, prog_path, (char **) prog_argv,
null_stderr, true, exit_on_error,
fd);
f->progname = progname;
f->null_stderr = null_stderr;
f->exit_on_error = exit_on_error;
f->exit_code = -1;
f->read_buf = read_buf;
f->read_bufsize = read_bufsize;
f->done_read = done_read;
f->private_data = private_data;
f->reader_terminated = false;
f->writer_terminated = false;
f->fd[0] = fd[0];
f->fd[1] = fd[1];
if (filter_init (f) < 0)
filter_terminate (f);
return f;
}
/* Write size bytes starting at buf into the pipe and in the meanwhile
possibly call the done_read function specified in create_filter.
The done_read function may be called in a different thread than
the current thread, depending on the platform. However, it will
always be called before filter_write has returned (or else will be
delayed to the next call to filter_write or filter_close). Return
only after all the entire buffer has been written to the pipe.
If the subprocess exits early with zero status, subsequent writes
will becomes no-ops and zero is returned.
If there is a problem reading or writing, return -1 and set errno.
If the subprocess exits early with nonzero status, return the status.
(In either case, filter_write will instead exit if exit_on_error was
passed as true).
Otherwise return 0. */
int
filter_write (struct filter *f, const char *buf, size_t size)
{
int rc, save_errno;
assert (buf);
if (f->exit_code != -1)
return f->exit_code;
if (!size)
return 0;
rc = filter_loop (f, buf, size);
if (!f->reader_terminated && !f->writer_terminated)
return 0;
save_errno = errno;
filter_terminate (f);
errno = save_errno;
return (rc < 0 ? rc : f->exit_code);
}
/* Finish reading the output via the done_read function specified in
create_filter. The done_read function may be called in a different
thread than. However, it will always be called before filter_close
has returned. The write side of the pipe is closed as soon as
filter_close starts, while the read side will be closed just before
it finishes. If there is a problem reading or closing the pipe,
return -1 and set errno. If the subprocess exits early with nonzero
status, return the status. (In either case, filter_close will
instead exit if exit_on_error was passed as true).
Otherwise return 0. */
int
filter_close (struct filter *f)
{
int rc, save_errno;
filter_terminate (f);
rc = f->exit_code;
save_errno = errno;
free (f);
errno = save_errno;
return rc;
}
Description:
Filtering of data through a subprocess.
Files:
lib/pipe-filter.h
lib/pipe-filter.c
Depends-on:
pipe
wait-process
error
exit
gettext-h
stdbool
stdint
sys_select
unistd
configure.ac:
AC_REQUIRE([AC_C_INLINE])
AC_CHECK_FUNCS([select])
Makefile.am:
lib_SOURCES += pipe-filter.c
Include:
"pipe-filter.h"
License:
GPL
Maintainer:
Paolo Bonzini
#include <assert.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <signal.h>
#include "pipe-filter.h"
void dummy_read_cb (char *buf, size_t nread, void *data)
{
}
void read_cb (char *buf, size_t nread, void *data)
{
write (STDOUT_FILENO, buf, nread);
}
int filter_writes (struct filter *f, const char *s)
{
filter_write (f, s, strlen (s));
}
int main ()
{
struct filter *f;
const char *m4 = getenv ("M4");
const char *path[] = { NULL, NULL };
char buf[512];
int rc;
if (!m4)
m4 = "/usr/bin/m4";
/* Test writing to a nonexistent program traps sooner or later. */
path[0] = "/nonexistent/blah";
f = filter_create ("pipe-filter-test", path[0], path, true, false,
buf, sizeof(buf), dummy_read_cb, NULL);
sleep (1);
signal (SIGPIPE, SIG_IGN);
if (filter_write (f, "", 1) != -1)
abort ();
if (filter_close (f) != 127)
abort ();
/* Test whether we have m4, and test returning the exit status. */
path[0] = m4;
f = filter_create ("pipe-filter-test", m4, path, true, false,
buf, sizeof(buf), dummy_read_cb, NULL);
sleep (1);
if (filter_write (f, "", 1) == -1)
exit (77);
filter_writes (f, "m4exit(1)");
if (filter_close (f) != 1)
abort ();
/* Same, but test returning the status in filter_write. */
path[0] = m4;
f = filter_create ("pipe-filter-test", m4, path, true, false,
buf, sizeof(buf), dummy_read_cb, NULL);
filter_writes (f, "m4exit(1)");
sleep (1);
rc = filter_write (f, "", 1);
if (rc != 1 && rc != -1)
abort ();
if (filter_close (f) != 1)
abort ();
/* Now test asynchronous I/O. */
f = filter_create ("pipe-filter-test", m4, path, true, true,
buf, sizeof(buf), read_cb, NULL);
filter_writes (f, "divert(`-1')\n");
filter_writes (f, "define(`forloop', `pushdef(`$1', `$2')"
"_$0($@)popdef(`$1')')\n");
filter_writes (f, "define(`_forloop', `$4`'ifelse($1, `$3', `', "
"`define(`$1', incr($1))$0($@)')')divert`'dnl\n");
filter_writes (f, "forloop(`i', `1', `500', `i\n')");
filter_writes (f, "forloop(`i', `501', `1000', `i\n')");
filter_close (f);
}