Merge branch 'jk/epipe-in-async'
authorJunio C Hamano <gitster@pobox.com>
Fri, 26 Feb 2016 21:37:26 +0000 (13:37 -0800)
committerJunio C Hamano <gitster@pobox.com>
Fri, 26 Feb 2016 21:37:26 +0000 (13:37 -0800)
Handling of errors while writing into our internal asynchronous
process has been made more robust, which reduces flakiness in our
tests.

* jk/epipe-in-async:
t5504: handle expected output from SIGPIPE death
test_must_fail: report number of unexpected signal
fetch-pack: ignore SIGPIPE in sideband demuxer
write_or_die: handle EPIPE in async threads

1  2 
run-command.c
run-command.h
diff --combined run-command.c
index 019f6d19a5a10718bd4aa94cf1076312e0297016,3add1d66ac344feab2ca39ee6fff663628c5fe32..863dad52f1913d8fa20fccd23fc527580c6fcaf7
@@@ -3,8 -3,6 +3,8 @@@
  #include "exec_cmd.h"
  #include "sigchain.h"
  #include "argv-array.h"
 +#include "thread-utils.h"
 +#include "strbuf.h"
  
  void child_process_init(struct child_process *child)
  {
@@@ -160,41 -158,50 +160,41 @@@ int sane_execvp(const char *file, char 
        return -1;
  }
  
 -static const char **prepare_shell_cmd(const char **argv)
 +static const char **prepare_shell_cmd(struct argv_array *out, const char **argv)
  {
 -      int argc, nargc = 0;
 -      const char **nargv;
 -
 -      for (argc = 0; argv[argc]; argc++)
 -              ; /* just counting */
 -      /* +1 for NULL, +3 for "sh -c" plus extra $0 */
 -      nargv = xmalloc(sizeof(*nargv) * (argc + 1 + 3));
 -
 -      if (argc < 1)
 +      if (!argv[0])
                die("BUG: shell command is empty");
  
        if (strcspn(argv[0], "|&;<>()$`\\\"' \t\n*?[#~=%") != strlen(argv[0])) {
  #ifndef GIT_WINDOWS_NATIVE
 -              nargv[nargc++] = SHELL_PATH;
 +              argv_array_push(out, SHELL_PATH);
  #else
 -              nargv[nargc++] = "sh";
 +              argv_array_push(out, "sh");
  #endif
 -              nargv[nargc++] = "-c";
 -
 -              if (argc < 2)
 -                      nargv[nargc++] = argv[0];
 -              else {
 -                      struct strbuf arg0 = STRBUF_INIT;
 -                      strbuf_addf(&arg0, "%s \"$@\"", argv[0]);
 -                      nargv[nargc++] = strbuf_detach(&arg0, NULL);
 -              }
 -      }
 +              argv_array_push(out, "-c");
  
 -      for (argc = 0; argv[argc]; argc++)
 -              nargv[nargc++] = argv[argc];
 -      nargv[nargc] = NULL;
 +              /*
 +               * If we have no extra arguments, we do not even need to
 +               * bother with the "$@" magic.
 +               */
 +              if (!argv[1])
 +                      argv_array_push(out, argv[0]);
 +              else
 +                      argv_array_pushf(out, "%s \"$@\"", argv[0]);
 +      }
  
 -      return nargv;
 +      argv_array_pushv(out, argv);
 +      return out->argv;
  }
  
  #ifndef GIT_WINDOWS_NATIVE
  static int execv_shell_cmd(const char **argv)
  {
 -      const char **nargv = prepare_shell_cmd(argv);
 -      trace_argv_printf(nargv, "trace: exec:");
 -      sane_execvp(nargv[0], (char **)nargv);
 -      free(nargv);
 +      struct argv_array nargv = ARGV_ARRAY_INIT;
 +      prepare_shell_cmd(&nargv, argv);
 +      trace_argv_printf(nargv.argv, "trace: exec:");
 +      sane_execvp(nargv.argv[0], (char **)nargv.argv);
 +      argv_array_clear(&nargv);
        return -1;
  }
  #endif
@@@ -238,7 -245,7 +238,7 @@@ static int wait_or_whine(pid_t pid, con
                error("waitpid is confused (%s)", argv0);
        } else if (WIFSIGNALED(status)) {
                code = WTERMSIG(status);
 -              if (code != SIGINT && code != SIGQUIT)
 +              if (code != SIGINT && code != SIGQUIT && code != SIGPIPE)
                        error("%s died of signal %d", argv0, code);
                /*
                 * This return value is chosen so that code & 0xff
@@@ -448,7 -455,6 +448,7 @@@ fail_pipe
  {
        int fhin = 0, fhout = 1, fherr = 2;
        const char **sargv = cmd->argv;
 +      struct argv_array nargv = ARGV_ARRAY_INIT;
  
        if (cmd->no_stdin)
                fhin = open("/dev/null", O_RDWR);
                fhout = dup(cmd->out);
  
        if (cmd->git_cmd)
 -              cmd->argv = prepare_git_cmd(cmd->argv);
 +              cmd->argv = prepare_git_cmd(&nargv, cmd->argv);
        else if (cmd->use_shell)
 -              cmd->argv = prepare_shell_cmd(cmd->argv);
 +              cmd->argv = prepare_shell_cmd(&nargv, cmd->argv);
  
        cmd->pid = mingw_spawnvpe(cmd->argv[0], cmd->argv, (char**) cmd->env,
                        cmd->dir, fhin, fhout, fherr);
        if (cmd->clean_on_exit && cmd->pid >= 0)
                mark_child_for_cleanup(cmd->pid);
  
 -      if (cmd->git_cmd)
 -              free(cmd->argv);
 -
 +      argv_array_clear(&nargv);
        cmd->argv = sargv;
        if (fhin != 0)
                close(fhin);
@@@ -625,6 -633,11 +625,11 @@@ int in_async(void
        return !pthread_equal(main_thread, pthread_self());
  }
  
+ void NORETURN async_exit(int code)
+ {
+       pthread_exit((void *)(intptr_t)code);
+ }
  #else
  
  static struct {
@@@ -670,6 -683,11 +675,11 @@@ int in_async(void
        return process_is_async;
  }
  
+ void NORETURN async_exit(int code)
+ {
+       exit(code);
+ }
  #endif
  
  int start_async(struct async *async)
@@@ -857,336 -875,3 +867,336 @@@ int capture_command(struct child_proces
        close(cmd->out);
        return finish_command(cmd);
  }
 +
 +enum child_state {
 +      GIT_CP_FREE,
 +      GIT_CP_WORKING,
 +      GIT_CP_WAIT_CLEANUP,
 +};
 +
 +struct parallel_processes {
 +      void *data;
 +
 +      int max_processes;
 +      int nr_processes;
 +
 +      get_next_task_fn get_next_task;
 +      start_failure_fn start_failure;
 +      task_finished_fn task_finished;
 +
 +      struct {
 +              enum child_state state;
 +              struct child_process process;
 +              struct strbuf err;
 +              void *data;
 +      } *children;
 +      /*
 +       * The struct pollfd is logically part of *children,
 +       * but the system call expects it as its own array.
 +       */
 +      struct pollfd *pfd;
 +
 +      unsigned shutdown : 1;
 +
 +      int output_owner;
 +      struct strbuf buffered_output; /* of finished children */
 +};
 +
 +static int default_start_failure(struct child_process *cp,
 +                               struct strbuf *err,
 +                               void *pp_cb,
 +                               void *pp_task_cb)
 +{
 +      int i;
 +
 +      strbuf_addstr(err, "Starting a child failed:");
 +      for (i = 0; cp->argv[i]; i++)
 +              strbuf_addf(err, " %s", cp->argv[i]);
 +
 +      return 0;
 +}
 +
 +static int default_task_finished(int result,
 +                               struct child_process *cp,
 +                               struct strbuf *err,
 +                               void *pp_cb,
 +                               void *pp_task_cb)
 +{
 +      int i;
 +
 +      if (!result)
 +              return 0;
 +
 +      strbuf_addf(err, "A child failed with return code %d:", result);
 +      for (i = 0; cp->argv[i]; i++)
 +              strbuf_addf(err, " %s", cp->argv[i]);
 +
 +      return 0;
 +}
 +
 +static void kill_children(struct parallel_processes *pp, int signo)
 +{
 +      int i, n = pp->max_processes;
 +
 +      for (i = 0; i < n; i++)
 +              if (pp->children[i].state == GIT_CP_WORKING)
 +                      kill(pp->children[i].process.pid, signo);
 +}
 +
 +static struct parallel_processes *pp_for_signal;
 +
 +static void handle_children_on_signal(int signo)
 +{
 +      kill_children(pp_for_signal, signo);
 +      sigchain_pop(signo);
 +      raise(signo);
 +}
 +
 +static void pp_init(struct parallel_processes *pp,
 +                  int n,
 +                  get_next_task_fn get_next_task,
 +                  start_failure_fn start_failure,
 +                  task_finished_fn task_finished,
 +                  void *data)
 +{
 +      int i;
 +
 +      if (n < 1)
 +              n = online_cpus();
 +
 +      pp->max_processes = n;
 +
 +      trace_printf("run_processes_parallel: preparing to run up to %d tasks", n);
 +
 +      pp->data = data;
 +      if (!get_next_task)
 +              die("BUG: you need to specify a get_next_task function");
 +      pp->get_next_task = get_next_task;
 +
 +      pp->start_failure = start_failure ? start_failure : default_start_failure;
 +      pp->task_finished = task_finished ? task_finished : default_task_finished;
 +
 +      pp->nr_processes = 0;
 +      pp->output_owner = 0;
 +      pp->shutdown = 0;
 +      pp->children = xcalloc(n, sizeof(*pp->children));
 +      pp->pfd = xcalloc(n, sizeof(*pp->pfd));
 +      strbuf_init(&pp->buffered_output, 0);
 +
 +      for (i = 0; i < n; i++) {
 +              strbuf_init(&pp->children[i].err, 0);
 +              child_process_init(&pp->children[i].process);
 +              pp->pfd[i].events = POLLIN | POLLHUP;
 +              pp->pfd[i].fd = -1;
 +      }
 +
 +      pp_for_signal = pp;
 +      sigchain_push_common(handle_children_on_signal);
 +}
 +
 +static void pp_cleanup(struct parallel_processes *pp)
 +{
 +      int i;
 +
 +      trace_printf("run_processes_parallel: done");
 +      for (i = 0; i < pp->max_processes; i++) {
 +              strbuf_release(&pp->children[i].err);
 +              child_process_clear(&pp->children[i].process);
 +      }
 +
 +      free(pp->children);
 +      free(pp->pfd);
 +
 +      /*
 +       * When get_next_task added messages to the buffer in its last
 +       * iteration, the buffered output is non empty.
 +       */
 +      fputs(pp->buffered_output.buf, stderr);
 +      strbuf_release(&pp->buffered_output);
 +
 +      sigchain_pop_common();
 +}
 +
 +/* returns
 + *  0 if a new task was started.
 + *  1 if no new jobs was started (get_next_task ran out of work, non critical
 + *    problem with starting a new command)
 + * <0 no new job was started, user wishes to shutdown early. Use negative code
 + *    to signal the children.
 + */
 +static int pp_start_one(struct parallel_processes *pp)
 +{
 +      int i, code;
 +
 +      for (i = 0; i < pp->max_processes; i++)
 +              if (pp->children[i].state == GIT_CP_FREE)
 +                      break;
 +      if (i == pp->max_processes)
 +              die("BUG: bookkeeping is hard");
 +
 +      code = pp->get_next_task(&pp->children[i].process,
 +                               &pp->children[i].err,
 +                               pp->data,
 +                               &pp->children[i].data);
 +      if (!code) {
 +              strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
 +              strbuf_reset(&pp->children[i].err);
 +              return 1;
 +      }
 +      pp->children[i].process.err = -1;
 +      pp->children[i].process.stdout_to_stderr = 1;
 +      pp->children[i].process.no_stdin = 1;
 +
 +      if (start_command(&pp->children[i].process)) {
 +              code = pp->start_failure(&pp->children[i].process,
 +                                       &pp->children[i].err,
 +                                       pp->data,
 +                                       &pp->children[i].data);
 +              strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
 +              strbuf_reset(&pp->children[i].err);
 +              if (code)
 +                      pp->shutdown = 1;
 +              return code;
 +      }
 +
 +      pp->nr_processes++;
 +      pp->children[i].state = GIT_CP_WORKING;
 +      pp->pfd[i].fd = pp->children[i].process.err;
 +      return 0;
 +}
 +
 +static void pp_buffer_stderr(struct parallel_processes *pp, int output_timeout)
 +{
 +      int i;
 +
 +      while ((i = poll(pp->pfd, pp->max_processes, output_timeout)) < 0) {
 +              if (errno == EINTR)
 +                      continue;
 +              pp_cleanup(pp);
 +              die_errno("poll");
 +      }
 +
 +      /* Buffer output from all pipes. */
 +      for (i = 0; i < pp->max_processes; i++) {
 +              if (pp->children[i].state == GIT_CP_WORKING &&
 +                  pp->pfd[i].revents & (POLLIN | POLLHUP)) {
 +                      int n = strbuf_read_once(&pp->children[i].err,
 +                                               pp->children[i].process.err, 0);
 +                      if (n == 0) {
 +                              close(pp->children[i].process.err);
 +                              pp->children[i].state = GIT_CP_WAIT_CLEANUP;
 +                      } else if (n < 0)
 +                              if (errno != EAGAIN)
 +                                      die_errno("read");
 +              }
 +      }
 +}
 +
 +static void pp_output(struct parallel_processes *pp)
 +{
 +      int i = pp->output_owner;
 +      if (pp->children[i].state == GIT_CP_WORKING &&
 +          pp->children[i].err.len) {
 +              fputs(pp->children[i].err.buf, stderr);
 +              strbuf_reset(&pp->children[i].err);
 +      }
 +}
 +
 +static int pp_collect_finished(struct parallel_processes *pp)
 +{
 +      int i, code;
 +      int n = pp->max_processes;
 +      int result = 0;
 +
 +      while (pp->nr_processes > 0) {
 +              for (i = 0; i < pp->max_processes; i++)
 +                      if (pp->children[i].state == GIT_CP_WAIT_CLEANUP)
 +                              break;
 +              if (i == pp->max_processes)
 +                      break;
 +
 +              code = finish_command(&pp->children[i].process);
 +
 +              code = pp->task_finished(code, &pp->children[i].process,
 +                                       &pp->children[i].err, pp->data,
 +                                       &pp->children[i].data);
 +
 +              if (code)
 +                      result = code;
 +              if (code < 0)
 +                      break;
 +
 +              pp->nr_processes--;
 +              pp->children[i].state = GIT_CP_FREE;
 +              pp->pfd[i].fd = -1;
 +              child_process_init(&pp->children[i].process);
 +
 +              if (i != pp->output_owner) {
 +                      strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
 +                      strbuf_reset(&pp->children[i].err);
 +              } else {
 +                      fputs(pp->children[i].err.buf, stderr);
 +                      strbuf_reset(&pp->children[i].err);
 +
 +                      /* Output all other finished child processes */
 +                      fputs(pp->buffered_output.buf, stderr);
 +                      strbuf_reset(&pp->buffered_output);
 +
 +                      /*
 +                       * Pick next process to output live.
 +                       * NEEDSWORK:
 +                       * For now we pick it randomly by doing a round
 +                       * robin. Later we may want to pick the one with
 +                       * the most output or the longest or shortest
 +                       * running process time.
 +                       */
 +                      for (i = 0; i < n; i++)
 +                              if (pp->children[(pp->output_owner + i) % n].state == GIT_CP_WORKING)
 +                                      break;
 +                      pp->output_owner = (pp->output_owner + i) % n;
 +              }
 +      }
 +      return result;
 +}
 +
 +int run_processes_parallel(int n,
 +                         get_next_task_fn get_next_task,
 +                         start_failure_fn start_failure,
 +                         task_finished_fn task_finished,
 +                         void *pp_cb)
 +{
 +      int i, code;
 +      int output_timeout = 100;
 +      int spawn_cap = 4;
 +      struct parallel_processes pp;
 +
 +      pp_init(&pp, n, get_next_task, start_failure, task_finished, pp_cb);
 +      while (1) {
 +              for (i = 0;
 +                  i < spawn_cap && !pp.shutdown &&
 +                  pp.nr_processes < pp.max_processes;
 +                  i++) {
 +                      code = pp_start_one(&pp);
 +                      if (!code)
 +                              continue;
 +                      if (code < 0) {
 +                              pp.shutdown = 1;
 +                              kill_children(&pp, -code);
 +                      }
 +                      break;
 +              }
 +              if (!pp.nr_processes)
 +                      break;
 +              pp_buffer_stderr(&pp, output_timeout);
 +              pp_output(&pp);
 +              code = pp_collect_finished(&pp);
 +              if (code) {
 +                      pp.shutdown = 1;
 +                      if (code < 0)
 +                              kill_children(&pp, -code);
 +              }
 +      }
 +
 +      pp_cleanup(&pp);
 +      return 0;
 +}
diff --combined run-command.h
index d5a57f922781de57d8ee6ae56f90af1456ccd5d5,c0969c7695f6b550b382b5704ceb9b2c23b3e3e0..42917e8618ea469ff56d52704794b6e3bc86cf97
@@@ -121,85 -121,6 +121,86 @@@ struct async 
  int start_async(struct async *async);
  int finish_async(struct async *async);
  int in_async(void);
+ void NORETURN async_exit(int code);
  
 +/**
 + * This callback should initialize the child process and preload the
 + * error channel if desired. The preloading of is useful if you want to
 + * have a message printed directly before the output of the child process.
 + * pp_cb is the callback cookie as passed to run_processes_parallel.
 + * You can store a child process specific callback cookie in pp_task_cb.
 + *
 + * Even after returning 0 to indicate that there are no more processes,
 + * this function will be called again until there are no more running
 + * child processes.
 + *
 + * Return 1 if the next child is ready to run.
 + * Return 0 if there are currently no more tasks to be processed.
 + * To send a signal to other child processes for abortion,
 + * return the negative signal number.
 + */
 +typedef int (*get_next_task_fn)(struct child_process *cp,
 +                              struct strbuf *err,
 +                              void *pp_cb,
 +                              void **pp_task_cb);
 +
 +/**
 + * This callback is called whenever there are problems starting
 + * a new process.
 + *
 + * You must not write to stdout or stderr in this function. Add your
 + * message to the strbuf err instead, which will be printed without
 + * messing up the output of the other parallel processes.
 + *
 + * pp_cb is the callback cookie as passed into run_processes_parallel,
 + * pp_task_cb is the callback cookie as passed into get_next_task_fn.
 + *
 + * Return 0 to continue the parallel processing. To abort return non zero.
 + * To send a signal to other child processes for abortion, return
 + * the negative signal number.
 + */
 +typedef int (*start_failure_fn)(struct child_process *cp,
 +                              struct strbuf *err,
 +                              void *pp_cb,
 +                              void *pp_task_cb);
 +
 +/**
 + * This callback is called on every child process that finished processing.
 + *
 + * You must not write to stdout or stderr in this function. Add your
 + * message to the strbuf err instead, which will be printed without
 + * messing up the output of the other parallel processes.
 + *
 + * pp_cb is the callback cookie as passed into run_processes_parallel,
 + * pp_task_cb is the callback cookie as passed into get_next_task_fn.
 + *
 + * Return 0 to continue the parallel processing.  To abort return non zero.
 + * To send a signal to other child processes for abortion, return
 + * the negative signal number.
 + */
 +typedef int (*task_finished_fn)(int result,
 +                              struct child_process *cp,
 +                              struct strbuf *err,
 +                              void *pp_cb,
 +                              void *pp_task_cb);
 +
 +/**
 + * Runs up to n processes at the same time. Whenever a process can be
 + * started, the callback get_next_task_fn is called to obtain the data
 + * required to start another child process.
 + *
 + * The children started via this function run in parallel. Their output
 + * (both stdout and stderr) is routed to stderr in a manner that output
 + * from different tasks does not interleave.
 + *
 + * If start_failure_fn or task_finished_fn are NULL, default handlers
 + * will be used. The default handlers will print an error message on
 + * error without issuing an emergency stop.
 + */
 +int run_processes_parallel(int n,
 +                         get_next_task_fn,
 +                         start_failure_fn,
 +                         task_finished_fn,
 +                         void *pp_cb);
 +
  #endif