Stefan Beller <[email protected]> writes:
> time -->
> output: |---A---| |-B-| |----C-----------| |-D-| |-E-|
Be nice and distribute the line evenly around "C". Same for thread
2 below.
> diff --git a/run-command.c b/run-command.c
> index c892e9a..3af97ab 100644
> --- a/run-command.c
> +++ b/run-command.c
> @@ -3,6 +3,7 @@
> #include "exec_cmd.h"
> #include "sigchain.h"
> #include "argv-array.h"
> +#include "thread-utils.h"
>
> void child_process_init(struct child_process *child)
> {
> @@ -862,3 +863,230 @@ int capture_command(struct child_process *cmd, struct
> strbuf *buf, size_t hint)
> close(cmd->out);
> return finish_command(cmd);
> }
> +
> +struct parallel_processes {
> + int max_number_processes;
> + void *data;
> + get_next_task fn;
> + handle_child_starting_failure fn_err;
> + handle_child_return_value fn_exit;
The 'fn' feels really misnamed, especially when compared with the
other fields. fn_task or something, perhaps.
Also I think we call a function type we define with a name that ends
with _fn, e.g.
typedef void (*show_commit_fn)(struct commit *, void *);
void traverse_commit_list(struct rev_info *revs,
show_commit_fn show_commit,
show_object_fn show_object,
void *data)
So perhaps
get_next_task_fn get_next_task;
start_failure_fn start_failure;
return_value_fn return_value;
or something like that.
> + int nr_processes;
> + int all_tasks_started;
> + int foreground_child;
> + char *slots;
What does slots[i] mean? Whatever explanation you would use as an
answer to that question, I'd name the field after the key words used
in the explanation. For example, if it means "children[i] is in use
with a process", then the code would be a lot happier if the field
is called in_use[] or something.
But do not just rename the field yet...
> + struct child_process *children;
> + struct pollfd *pfd;
> + struct strbuf *err;
struct pollfd needs to be a contiguous array of nr_processes
elements because that is what poll(2) takes, but other per-child
fields would be easier to grasp if you did it like so:
struct parallel_processes {
...
struct {
int in_use;
struct child_process child;
struct strbuf err;
... /* maybe other per-child field later */
} *children;
...
}
> + struct strbuf finished_children;
A strbuf that holds "finished_children"? Does it hold "what
finished_children said"?
We care about good naming because clearly named variables and fields
make the code easier to read.
> +static void unblock_fd(int fd)
I would probably have called this "set_nonblocking()".
> +{
> + int flags = fcntl(fd, F_GETFL);
> + if (flags < 0) {
> + warning("Could not get file status flags, "
> + "output will be degraded");
> + return;
> + }
> + if (fcntl(fd, F_SETFL, flags | O_NONBLOCK)) {
> + warning("Could not set file status flags, "
> + "output will be degraded");
The first line of "warning(" is indented one level too deep.
> + return;
> + }
Wouldn't it be easier to follow if you did
fn(...)
{
if (flags < 0)
warn();
else if (fcntrl() < 0)
warn();
}
> +static void run_processes_parallel_start_new(struct parallel_processes *pp)
> +{
> + int i;
> + /* Start new processes. */
Drop this comment and replace it with a blank line.
> + while (!pp->all_tasks_started
> + && pp->nr_processes < pp->max_number_processes) {
Remove "&& " and add " &&" at the end of the previous line.
> + for (i = 0; i < pp->max_number_processes; i++)
> + if (!pp->slots[i])
> + break; /* found an empty slot */
The comment does not help us at all. if (...) break; tells as much,
and it does not tell us what it means that slot[] being empty at all.
for (...)
if (!pp->children[i].in_use)
break;
would be a lot easier to follow without any comment.
> + if (i == pp->max_number_processes)
> + die("BUG: bookkeeping is hard");
> +
> + if (pp->fn(pp->data, &pp->children[i], &pp->err[i])) {
This use pattern (and the explanation in run-command.h) suggests
that the name of the field should be s/fn/more_task/; or something
along that line (and flip the return value, i.e. more_task() returns
yes if it did grab another task, no if there is no more task).
> + pp->all_tasks_started = 1;
> + break;
> + }
> + if (start_command(&pp->children[i]))
> + pp->fn_err(pp->data, &pp->children[i], &pp->err[i]);
Can fn_err be NULL here? Shouldn't it (to give a default behaviour
to lazy or bog-standard callers)?
> + unblock_fd(pp->children[i].err);
> +
> + pp->nr_processes++;
> + pp->slots[i] = 1;
> + pp->pfd[i].fd = pp->children[i].err;
> + }
> +}
> +
> +static int run_processes_parallel_buffer_stderr(struct parallel_processes
> *pp)
> +{
> + int i;
Have a blank line here between decls and the first statement.
> + i = poll(pp->pfd, pp->max_number_processes, 100);
Give a symbolic constant for this 100ms, e.g. OUTPUT_POLL_INTERVAL
or something.
> + if (i < 0) {
> + if (errno == EINTR)
> + /* A signal was caught; try again */
> + return -1;
> + else {
> + run_processes_parallel_cleanup(pp);
> + die_errno("poll");
> + }
> + }
Shouldn't this be more like
while ((i = poll()) < 0) {
if (errno == EINTR)
continue;
cleanup;
die;
}
The caller after all was willing to wait for some time, and we were
interrupted before that time came.
> + /* Buffer output from all pipes. */
> + for (i = 0; i < pp->max_number_processes; i++) {
> + if (!pp->slots[i])
> + continue;
> + if (pp->pfd[i].revents & POLLIN)
> + strbuf_read_noblock(&pp->err[i], pp->children[i].err,
> 0);
> + if (pp->foreground_child == i) {
> + fputs(pp->err[i].buf, stderr);
> + strbuf_reset(&pp->err[i]);
> + }
Even if we own the output channel, we may not have read anything
yet---poll() may have said that pfd[i] is not ready, or
read_nonblock() may have returned EWOULDBLOCK. Perhaps check not
just that i owns the output but err[i].len is not zero?
I think output should be done outside the loop and make the comment
before the loop match what the loop actually does.
/* Buffer output from all pipes. */
for (i = 0; ...) {
if (pp->children[i].in_use && (pp->pfd[i].revents & POLLIN))
strbuf_read_nonblock();
}
/* Drain the output from the owner of the output channel */
if (pp->children[pp->output_owner].in_use &&
pp->children[pp->output_owner].err.len) {
fputs(...);
strbuf_reset(...);
}
> +static void run_processes_parallel_collect_finished(struct
> parallel_processes *pp)
> +{
> + int i = 0;
> + pid_t pid;
> + int wait_status, code;
> + int n = pp->max_number_processes;
> + /* Collect finished child processes. */
Drop this comment and replace it with a blank line.
> + while (pp->nr_processes > 0) {
> + pid = waitpid(-1, &wait_status, WNOHANG);
> + if (pid == 0)
> + return; /* no child finished */
Do we need that comment?
> + if (pid < 0) {
> + if (errno == EINTR)
> + return; /* just try again next time */
Can we get EINTR here (we are passing WNOHANG above)?
> + if (errno == EINVAL || errno == ECHILD)
> + die_errno("wait");
What should happen when we get an error not listed here? 'i' is
left as initialized to 0 and we do strbuf_read_nonblock() for the
first child (which may not even be running)?
You can sweep this bug under the rug by returning here, but I
suspect that you would just want
if (pid < 0)
die_errno();
And that would allow you to dedent the else clause below.
> + } else {
> + /* Find the finished child. */
> + for (i = 0; i < pp->max_number_processes; i++)
> + if (pp->slots[i] && pid == pp->children[i].pid)
> + break;
Hmm, you are relying on the fact that a valid pid can never be 0, so
you can just use pp->children[i].child.pid to see if a "slot" is
occupied without even using pp->slots[] (or pp->children[i].in_use).
> + if (i == pp->max_number_processes)
> + /*
> + * waitpid returned another process id
> + * which we are not waiting for.
> + */
> + return;
> + }
> + strbuf_read_noblock(&pp->err[i], pp->children[i].err, 0);
This is to read leftover output?
As discussed elsewhere, read_nonblock() will have to have "read
some, not necessarily to the end" semantics to serve the caller in
run_processes_parallel_buffer_stderr(), so you'd need a loop around
it here to read until you see EOF.
Or you may be able to just call strbuf_read() and the function may
do the right thing to read things through to the EOF. It depends on
how you redo the patch [2/10].
> + if (determine_return_value(wait_status, &code, &errno,
> + pp->children[i].argv[0]) < 0)
> + error("waitpid is confused (%s)",
> + pp->children[i].argv[0]);
> +
> + pp->fn_exit(pp->data, &pp->children[i], code);
You are clobbering errno by calling determine_return_value() but you
do not use the returned value anywhere. Intended? Or should that
be given to fn_exit() for error reporting?
> + argv_array_clear(&pp->children[i].args);
> + argv_array_clear(&pp->children[i].env_array);
> +
> + pp->nr_processes--;
> + pp->slots[i] = 0;
> + pp->pfd[i].fd = -1;
Mental note: here the "slot" is cleared for the child 'i'.
> + if (i != pp->foreground_child) {
> + strbuf_addbuf(&pp->finished_children, &pp->err[i]);
> + strbuf_reset(&pp->err[i]);
OK, so the idea is that pp->child[i].err holds the entire output for
any process that does not own the output channel until it dies, and
they are appended to pp->finished_children. That suggests that the
name of the "finished" field should have "output" somewhere in it.
> + } else {
Mental note: this side of if/else is what happens to the process
that used to own the output channel.
> + fputs(pp->err[i].buf, stderr);
> + strbuf_reset(&pp->err[i]);
... and it just flushes the final part of the output.
> + /* Output all other finished child processes */
> + fputs(pp->finished_children.buf, stderr);
> + strbuf_reset(&pp->finished_children);
If there is any, that is.
> + /*
> + * Pick next process to output live.
> + * NEEDSWORK:
> + * For now we pick it randomly by doing a round
> + * robin. Later we may want to pick the one with
> + * the most output or the longest or shortest
> + * running process time.
> + */
> + for (i = 0; i < n; i++)
> + if (pp->slots[(pp->foreground_child + i) % n])
> + break;
> + pp->foreground_child = (pp->foreground_child + i) % n;
... and then picks a new owner of the output channel.
Up to this point it looks sensible.
> + fputs(pp->err[pp->foreground_child].buf, stderr);
> + strbuf_reset(&pp->err[pp->foreground_child]);
I do not think these two lines need to be here, especially if you
follow the above advice of separating buffering and draining.
> +int run_processes_parallel(int n, void *data,
> + get_next_task fn,
> + handle_child_starting_failure fn_err,
> + handle_child_return_value fn_exit)
> +{
> + struct parallel_processes pp;
> + run_processes_parallel_init(&pp, n, data, fn, fn_err, fn_exit);
> +
> + while (!pp.all_tasks_started || pp.nr_processes > 0) {
The former is true as long as more_task() says there may be more.
The latter is true as long as we have something already running.
In either case, we should keep collecting and spawning as needed.
> + run_processes_parallel_start_new(&pp);
But calling start_new() unconditionally feels sloppy. It should at
least be something like
if (pp.nr_processes < pp.max_processes &&
!pp.all_task_started)
start_new_process()
no?
> diff --git a/test-run-command.c b/test-run-command.c
> index 89c7de2..70b6c7a 100644
> --- a/test-run-command.c
> +++ b/test-run-command.c
> @@ -30,6 +50,10 @@ int main(int argc, char **argv)
> if (!strcmp(argv[1], "run-command"))
> exit(run_command(&proc));
>
> + if (!strcmp(argv[1], "run-command-parallel-4"))
> + exit(run_processes_parallel(4, &proc, parallel_next,
> + NULL, NULL));
So not only fn_err, but fn_exit is optional. You'd need to update
the code above to allow these.
> +
> fprintf(stderr, "check usage\n");
> return 1;
> }
It was a fun read. There were tons of niggles, but I didn't see
anything fundamentally unsalvageable.
Thanks.
--
To unsubscribe from this list: send the line "unsubscribe git" in
the body of a message to [email protected]
More majordomo info at http://vger.kernel.org/majordomo-info.html