Merge branch 'jk/push-client-deadlock-fix'
authorJunio C Hamano <gitster@pobox.com>
Fri, 29 Apr 2016 19:59:08 +0000 (12:59 -0700)
committerJunio C Hamano <gitster@pobox.com>
Fri, 29 Apr 2016 19:59:08 +0000 (12:59 -0700)
"git push" from a corrupt repository that attempts to push a large
number of refs deadlocked; the thread to relay rejection notices
for these ref updates blocked on writing them to the main thread,
after the main thread at the receiving end notices that the push
failed and decides not to read these notices and return a failure.

* jk/push-client-deadlock-fix:
t5504: drop sigpipe=ok from push tests
fetch-pack: isolate sigpipe in demuxer thread
send-pack: isolate sigpipe in demuxer thread
run-command: teach async threads to ignore SIGPIPE
send-pack: close demux pipe before finishing async process

1  2 
run-command.c
run-command.h
diff --combined run-command.c
index 8c7115ade496e5bbaf1b584199da8f3ab21fa087,11ac060490345854171a5fde17ab922d6ba72642..e4593cd99b13fdbfec02939526ac151270aa20f5
@@@ -3,8 -3,6 +3,8 @@@
  #include "exec_cmd.h"
  #include "sigchain.h"
  #include "argv-array.h"
 +#include "thread-utils.h"
 +#include "strbuf.h"
  
  void child_process_init(struct child_process *child)
  {
@@@ -238,7 -236,7 +238,7 @@@ static int wait_or_whine(pid_t pid, con
                error("waitpid is confused (%s)", argv0);
        } else if (WIFSIGNALED(status)) {
                code = WTERMSIG(status);
 -              if (code != SIGINT && code != SIGQUIT)
 +              if (code != SIGINT && code != SIGQUIT && code != SIGPIPE)
                        error("%s died of signal %d", argv0, code);
                /*
                 * This return value is chosen so that code & 0xff
@@@ -590,6 -588,16 +590,16 @@@ static void *run_thread(void *data
        struct async *async = data;
        intptr_t ret;
  
+       if (async->isolate_sigpipe) {
+               sigset_t mask;
+               sigemptyset(&mask);
+               sigaddset(&mask, SIGPIPE);
+               if (pthread_sigmask(SIG_BLOCK, &mask, NULL) < 0) {
+                       ret = error("unable to block SIGPIPE in async thread");
+                       return (void *)ret;
+               }
+       }
        pthread_setspecific(async_key, async);
        ret = async->proc(async->proc_in, async->proc_out, async->data);
        return (void *)ret;
@@@ -867,318 -875,3 +877,318 @@@ int capture_command(struct child_proces
        close(cmd->out);
        return finish_command(cmd);
  }
 +
 +enum child_state {
 +      GIT_CP_FREE,
 +      GIT_CP_WORKING,
 +      GIT_CP_WAIT_CLEANUP,
 +};
 +
 +struct parallel_processes {
 +      void *data;
 +
 +      int max_processes;
 +      int nr_processes;
 +
 +      get_next_task_fn get_next_task;
 +      start_failure_fn start_failure;
 +      task_finished_fn task_finished;
 +
 +      struct {
 +              enum child_state state;
 +              struct child_process process;
 +              struct strbuf err;
 +              void *data;
 +      } *children;
 +      /*
 +       * The struct pollfd is logically part of *children,
 +       * but the system call expects it as its own array.
 +       */
 +      struct pollfd *pfd;
 +
 +      unsigned shutdown : 1;
 +
 +      int output_owner;
 +      struct strbuf buffered_output; /* of finished children */
 +};
 +
 +static int default_start_failure(struct strbuf *out,
 +                               void *pp_cb,
 +                               void *pp_task_cb)
 +{
 +      return 0;
 +}
 +
 +static int default_task_finished(int result,
 +                               struct strbuf *out,
 +                               void *pp_cb,
 +                               void *pp_task_cb)
 +{
 +      return 0;
 +}
 +
 +static void kill_children(struct parallel_processes *pp, int signo)
 +{
 +      int i, n = pp->max_processes;
 +
 +      for (i = 0; i < n; i++)
 +              if (pp->children[i].state == GIT_CP_WORKING)
 +                      kill(pp->children[i].process.pid, signo);
 +}
 +
 +static struct parallel_processes *pp_for_signal;
 +
 +static void handle_children_on_signal(int signo)
 +{
 +      kill_children(pp_for_signal, signo);
 +      sigchain_pop(signo);
 +      raise(signo);
 +}
 +
 +static void pp_init(struct parallel_processes *pp,
 +                  int n,
 +                  get_next_task_fn get_next_task,
 +                  start_failure_fn start_failure,
 +                  task_finished_fn task_finished,
 +                  void *data)
 +{
 +      int i;
 +
 +      if (n < 1)
 +              n = online_cpus();
 +
 +      pp->max_processes = n;
 +
 +      trace_printf("run_processes_parallel: preparing to run up to %d tasks", n);
 +
 +      pp->data = data;
 +      if (!get_next_task)
 +              die("BUG: you need to specify a get_next_task function");
 +      pp->get_next_task = get_next_task;
 +
 +      pp->start_failure = start_failure ? start_failure : default_start_failure;
 +      pp->task_finished = task_finished ? task_finished : default_task_finished;
 +
 +      pp->nr_processes = 0;
 +      pp->output_owner = 0;
 +      pp->shutdown = 0;
 +      pp->children = xcalloc(n, sizeof(*pp->children));
 +      pp->pfd = xcalloc(n, sizeof(*pp->pfd));
 +      strbuf_init(&pp->buffered_output, 0);
 +
 +      for (i = 0; i < n; i++) {
 +              strbuf_init(&pp->children[i].err, 0);
 +              child_process_init(&pp->children[i].process);
 +              pp->pfd[i].events = POLLIN | POLLHUP;
 +              pp->pfd[i].fd = -1;
 +      }
 +
 +      pp_for_signal = pp;
 +      sigchain_push_common(handle_children_on_signal);
 +}
 +
 +static void pp_cleanup(struct parallel_processes *pp)
 +{
 +      int i;
 +
 +      trace_printf("run_processes_parallel: done");
 +      for (i = 0; i < pp->max_processes; i++) {
 +              strbuf_release(&pp->children[i].err);
 +              child_process_clear(&pp->children[i].process);
 +      }
 +
 +      free(pp->children);
 +      free(pp->pfd);
 +
 +      /*
 +       * When get_next_task added messages to the buffer in its last
 +       * iteration, the buffered output is non empty.
 +       */
 +      strbuf_write(&pp->buffered_output, stderr);
 +      strbuf_release(&pp->buffered_output);
 +
 +      sigchain_pop_common();
 +}
 +
 +/* returns
 + *  0 if a new task was started.
 + *  1 if no new jobs was started (get_next_task ran out of work, non critical
 + *    problem with starting a new command)
 + * <0 no new job was started, user wishes to shutdown early. Use negative code
 + *    to signal the children.
 + */
 +static int pp_start_one(struct parallel_processes *pp)
 +{
 +      int i, code;
 +
 +      for (i = 0; i < pp->max_processes; i++)
 +              if (pp->children[i].state == GIT_CP_FREE)
 +                      break;
 +      if (i == pp->max_processes)
 +              die("BUG: bookkeeping is hard");
 +
 +      code = pp->get_next_task(&pp->children[i].process,
 +                               &pp->children[i].err,
 +                               pp->data,
 +                               &pp->children[i].data);
 +      if (!code) {
 +              strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
 +              strbuf_reset(&pp->children[i].err);
 +              return 1;
 +      }
 +      pp->children[i].process.err = -1;
 +      pp->children[i].process.stdout_to_stderr = 1;
 +      pp->children[i].process.no_stdin = 1;
 +
 +      if (start_command(&pp->children[i].process)) {
 +              code = pp->start_failure(&pp->children[i].err,
 +                                       pp->data,
 +                                       &pp->children[i].data);
 +              strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
 +              strbuf_reset(&pp->children[i].err);
 +              if (code)
 +                      pp->shutdown = 1;
 +              return code;
 +      }
 +
 +      pp->nr_processes++;
 +      pp->children[i].state = GIT_CP_WORKING;
 +      pp->pfd[i].fd = pp->children[i].process.err;
 +      return 0;
 +}
 +
 +static void pp_buffer_stderr(struct parallel_processes *pp, int output_timeout)
 +{
 +      int i;
 +
 +      while ((i = poll(pp->pfd, pp->max_processes, output_timeout)) < 0) {
 +              if (errno == EINTR)
 +                      continue;
 +              pp_cleanup(pp);
 +              die_errno("poll");
 +      }
 +
 +      /* Buffer output from all pipes. */
 +      for (i = 0; i < pp->max_processes; i++) {
 +              if (pp->children[i].state == GIT_CP_WORKING &&
 +                  pp->pfd[i].revents & (POLLIN | POLLHUP)) {
 +                      int n = strbuf_read_once(&pp->children[i].err,
 +                                               pp->children[i].process.err, 0);
 +                      if (n == 0) {
 +                              close(pp->children[i].process.err);
 +                              pp->children[i].state = GIT_CP_WAIT_CLEANUP;
 +                      } else if (n < 0)
 +                              if (errno != EAGAIN)
 +                                      die_errno("read");
 +              }
 +      }
 +}
 +
 +static void pp_output(struct parallel_processes *pp)
 +{
 +      int i = pp->output_owner;
 +      if (pp->children[i].state == GIT_CP_WORKING &&
 +          pp->children[i].err.len) {
 +              strbuf_write(&pp->children[i].err, stderr);
 +              strbuf_reset(&pp->children[i].err);
 +      }
 +}
 +
 +static int pp_collect_finished(struct parallel_processes *pp)
 +{
 +      int i, code;
 +      int n = pp->max_processes;
 +      int result = 0;
 +
 +      while (pp->nr_processes > 0) {
 +              for (i = 0; i < pp->max_processes; i++)
 +                      if (pp->children[i].state == GIT_CP_WAIT_CLEANUP)
 +                              break;
 +              if (i == pp->max_processes)
 +                      break;
 +
 +              code = finish_command(&pp->children[i].process);
 +
 +              code = pp->task_finished(code,
 +                                       &pp->children[i].err, pp->data,
 +                                       &pp->children[i].data);
 +
 +              if (code)
 +                      result = code;
 +              if (code < 0)
 +                      break;
 +
 +              pp->nr_processes--;
 +              pp->children[i].state = GIT_CP_FREE;
 +              pp->pfd[i].fd = -1;
 +              child_process_init(&pp->children[i].process);
 +
 +              if (i != pp->output_owner) {
 +                      strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
 +                      strbuf_reset(&pp->children[i].err);
 +              } else {
 +                      strbuf_write(&pp->children[i].err, stderr);
 +                      strbuf_reset(&pp->children[i].err);
 +
 +                      /* Output all other finished child processes */
 +                      strbuf_write(&pp->buffered_output, stderr);
 +                      strbuf_reset(&pp->buffered_output);
 +
 +                      /*
 +                       * Pick next process to output live.
 +                       * NEEDSWORK:
 +                       * For now we pick it randomly by doing a round
 +                       * robin. Later we may want to pick the one with
 +                       * the most output or the longest or shortest
 +                       * running process time.
 +                       */
 +                      for (i = 0; i < n; i++)
 +                              if (pp->children[(pp->output_owner + i) % n].state == GIT_CP_WORKING)
 +                                      break;
 +                      pp->output_owner = (pp->output_owner + i) % n;
 +              }
 +      }
 +      return result;
 +}
 +
 +int run_processes_parallel(int n,
 +                         get_next_task_fn get_next_task,
 +                         start_failure_fn start_failure,
 +                         task_finished_fn task_finished,
 +                         void *pp_cb)
 +{
 +      int i, code;
 +      int output_timeout = 100;
 +      int spawn_cap = 4;
 +      struct parallel_processes pp;
 +
 +      pp_init(&pp, n, get_next_task, start_failure, task_finished, pp_cb);
 +      while (1) {
 +              for (i = 0;
 +                  i < spawn_cap && !pp.shutdown &&
 +                  pp.nr_processes < pp.max_processes;
 +                  i++) {
 +                      code = pp_start_one(&pp);
 +                      if (!code)
 +                              continue;
 +                      if (code < 0) {
 +                              pp.shutdown = 1;
 +                              kill_children(&pp, -code);
 +                      }
 +                      break;
 +              }
 +              if (!pp.nr_processes)
 +                      break;
 +              pp_buffer_stderr(&pp, output_timeout);
 +              pp_output(&pp);
 +              code = pp_collect_finished(&pp);
 +              if (code) {
 +                      pp.shutdown = 1;
 +                      if (code < 0)
 +                              kill_children(&pp, -code);
 +              }
 +      }
 +
 +      pp_cleanup(&pp);
 +      return 0;
 +}
diff --combined run-command.h
index de1727efab9d7e57a41626ac8bd89e5149412dc8,a3043b16dfffdae5662aef2b0bef1c1b91fc7606..11f76b04edf65fe6117604782c220e55b2d80661
@@@ -116,6 -116,7 +116,7 @@@ struct async 
        int proc_in;
        int proc_out;
  #endif
+       int isolate_sigpipe;
  };
  
  int start_async(struct async *async);
@@@ -123,81 -124,4 +124,81 @@@ int finish_async(struct async *async)
  int in_async(void);
  void NORETURN async_exit(int code);
  
 +/**
 + * This callback should initialize the child process and preload the
 + * error channel if desired. The preloading of is useful if you want to
 + * have a message printed directly before the output of the child process.
 + * pp_cb is the callback cookie as passed to run_processes_parallel.
 + * You can store a child process specific callback cookie in pp_task_cb.
 + *
 + * Even after returning 0 to indicate that there are no more processes,
 + * this function will be called again until there are no more running
 + * child processes.
 + *
 + * Return 1 if the next child is ready to run.
 + * Return 0 if there are currently no more tasks to be processed.
 + * To send a signal to other child processes for abortion,
 + * return the negative signal number.
 + */
 +typedef int (*get_next_task_fn)(struct child_process *cp,
 +                              struct strbuf *out,
 +                              void *pp_cb,
 +                              void **pp_task_cb);
 +
 +/**
 + * This callback is called whenever there are problems starting
 + * a new process.
 + *
 + * You must not write to stdout or stderr in this function. Add your
 + * message to the strbuf out instead, which will be printed without
 + * messing up the output of the other parallel processes.
 + *
 + * pp_cb is the callback cookie as passed into run_processes_parallel,
 + * pp_task_cb is the callback cookie as passed into get_next_task_fn.
 + *
 + * Return 0 to continue the parallel processing. To abort return non zero.
 + * To send a signal to other child processes for abortion, return
 + * the negative signal number.
 + */
 +typedef int (*start_failure_fn)(struct strbuf *out,
 +                              void *pp_cb,
 +                              void *pp_task_cb);
 +
 +/**
 + * This callback is called on every child process that finished processing.
 + *
 + * You must not write to stdout or stderr in this function. Add your
 + * message to the strbuf out instead, which will be printed without
 + * messing up the output of the other parallel processes.
 + *
 + * pp_cb is the callback cookie as passed into run_processes_parallel,
 + * pp_task_cb is the callback cookie as passed into get_next_task_fn.
 + *
 + * Return 0 to continue the parallel processing.  To abort return non zero.
 + * To send a signal to other child processes for abortion, return
 + * the negative signal number.
 + */
 +typedef int (*task_finished_fn)(int result,
 +                              struct strbuf *out,
 +                              void *pp_cb,
 +                              void *pp_task_cb);
 +
 +/**
 + * Runs up to n processes at the same time. Whenever a process can be
 + * started, the callback get_next_task_fn is called to obtain the data
 + * required to start another child process.
 + *
 + * The children started via this function run in parallel. Their output
 + * (both stdout and stderr) is routed to stderr in a manner that output
 + * from different tasks does not interleave.
 + *
 + * start_failure_fn and task_finished_fn can be NULL to omit any
 + * special handling.
 + */
 +int run_processes_parallel(int n,
 +                         get_next_task_fn,
 +                         start_failure_fn,
 +                         task_finished_fn,
 +                         void *pp_cb);
 +
  #endif