Merge branch 'jk/push-client-deadlock-fix' into HEAD
authorJunio C Hamano <gitster@pobox.com>
Wed, 18 May 2016 21:40:06 +0000 (14:40 -0700)
committerJunio C Hamano <gitster@pobox.com>
Wed, 18 May 2016 21:40:06 +0000 (14:40 -0700)
Some Windows SDK lacks pthread_sigmask() implementation and fails
to compile the recently updated "git push" codepath that uses it.

* jk/push-client-deadlock-fix:
Windows: only add a no-op pthread_sigmask() when needed
Windows: add pthread_sigmask() that does nothing
t5504: drop sigpipe=ok from push tests
fetch-pack: isolate sigpipe in demuxer thread
send-pack: isolate sigpipe in demuxer thread
run-command: teach async threads to ignore SIGPIPE
send-pack: close demux pipe before finishing async process

1  2 
compat/mingw.h
compat/win32/pthread.h
run-command.c
run-command.h
diff --combined compat/mingw.h
index 1de70ffd62a63070ca79d27ad0b323dfbcacb5aa,c26b6e2958c456647e96d98bfd79275417cc3e14..edec9e0253560445db6863ab3ebdd7307e486339
@@@ -1,43 -1,27 +1,43 @@@
 +#ifdef __MINGW64_VERSION_MAJOR
 +#include <stdint.h>
 +#include <wchar.h>
 +typedef _sigset_t sigset_t;
 +#endif
  #include <winsock2.h>
  #include <ws2tcpip.h>
  
 +/* MinGW-w64 reports to have flockfile, but it does not actually have it. */
 +#ifdef __MINGW64_VERSION_MAJOR
 +#undef _POSIX_THREAD_SAFE_FUNCTIONS
 +#endif
 +
  /*
   * things that are not available in header files
   */
  
 -typedef int pid_t;
  typedef int uid_t;
  typedef int socklen_t;
 +#ifndef __MINGW64_VERSION_MAJOR
 +typedef int pid_t;
  #define hstrerror strerror
 +#endif
  
  #define S_IFLNK    0120000 /* Symbolic link */
  #define S_ISLNK(x) (((x) & S_IFMT) == S_IFLNK)
  #define S_ISSOCK(x) 0
  
 +#ifndef S_IRWXG
  #define S_IRGRP 0
  #define S_IWGRP 0
  #define S_IXGRP 0
  #define S_IRWXG (S_IRGRP | S_IWGRP | S_IXGRP)
 +#endif
 +#ifndef S_IRWXO
  #define S_IROTH 0
  #define S_IWOTH 0
  #define S_IXOTH 0
  #define S_IRWXO (S_IROTH | S_IWOTH | S_IXOTH)
 +#endif
  
  #define S_ISUID 0004000
  #define S_ISGID 0002000
@@@ -116,10 -100,8 +116,10 @@@ static inline int symlink(const char *o
  { errno = ENOSYS; return -1; }
  static inline int fchmod(int fildes, mode_t mode)
  { errno = ENOSYS; return -1; }
 +#ifndef __MINGW64_VERSION_MAJOR
  static inline pid_t fork(void)
  { errno = ENOSYS; return -1; }
 +#endif
  static inline unsigned int alarm(unsigned int seconds)
  { return 0; }
  static inline int fsync(int fd)
@@@ -142,6 -124,7 +142,7 @@@ static inline int fcntl(int fd, int cmd
  #define sigemptyset(x) (void)0
  static inline int sigaddset(sigset_t *set, int signum)
  { return 0; }
+ #define SIG_BLOCK 0
  #define SIG_UNBLOCK 0
  static inline int sigprocmask(int how, const sigset_t *set, sigset_t *oldset)
  { return 0; }
@@@ -194,10 -177,8 +195,10 @@@ int pipe(int filedes[2])
  unsigned int sleep (unsigned int seconds);
  int mkstemp(char *template);
  int gettimeofday(struct timeval *tv, void *tz);
 +#ifndef __MINGW64_VERSION_MAJOR
  struct tm *gmtime_r(const time_t *timep, struct tm *result);
  struct tm *localtime_r(const time_t *timep, struct tm *result);
 +#endif
  int getpagesize(void);        /* defined in MinGW's libgcc.a */
  struct passwd *getpwuid(uid_t uid);
  int setitimer(int type, struct itimerval *in, struct itimerval *out);
@@@ -321,10 -302,8 +322,10 @@@ static inline int getrlimit(int resourc
  /*
   * Use mingw specific stat()/lstat()/fstat() implementations on Windows.
   */
 +#ifndef __MINGW64_VERSION_MAJOR
  #define off_t off64_t
  #define lseek _lseeki64
 +#endif
  
  /* use struct stat with 64 bit st_size */
  #ifdef stat
@@@ -396,22 -375,12 +397,22 @@@ static inline char *mingw_find_last_dir
                        ret = (char *)path;
        return ret;
  }
 +static inline void convert_slashes(char *path)
 +{
 +      for (; *path; path++)
 +              if (*path == '\\')
 +                      *path = '/';
 +}
  #define find_last_dir_sep mingw_find_last_dir_sep
  int mingw_offset_1st_component(const char *path);
  #define offset_1st_component mingw_offset_1st_component
  #define PATH_SEP ';'
 +#if !defined(__MINGW64_VERSION_MAJOR) && (!defined(_MSC_VER) || _MSC_VER < 1800)
  #define PRIuMAX "I64u"
  #define PRId64 "I64d"
 +#else
 +#include <inttypes.h>
 +#endif
  
  void mingw_open_html(const char *path);
  #define open_html mingw_open_html
diff --combined compat/win32/pthread.h
index b6ed9e74621fe0bc506beb893e477bff26e88756,7eac560d85a2a10f63673e763fa47203af216f4b..1c164088fbb64d2f0143f536f3186f481d876d28
   */
  #define pthread_mutex_t CRITICAL_SECTION
  
 -#define pthread_mutex_init(a,b) (InitializeCriticalSection((a)), 0)
 +static inline int return_0(int i) {
 +      return 0;
 +}
 +#define pthread_mutex_init(a,b) return_0((InitializeCriticalSection((a)), 0))
  #define pthread_mutex_destroy(a) DeleteCriticalSection((a))
  #define pthread_mutex_lock EnterCriticalSection
  #define pthread_mutex_unlock LeaveCriticalSection
@@@ -78,9 -75,9 +78,9 @@@ extern int win32_pthread_join(pthread_
  #define pthread_equal(t1, t2) ((t1).tid == (t2).tid)
  extern pthread_t pthread_self(void);
  
 -static inline int pthread_exit(void *ret)
 +static inline void NORETURN pthread_exit(void *ret)
  {
 -      ExitThread((DWORD)ret);
 +      ExitThread((DWORD)(intptr_t)ret);
  }
  
  typedef DWORD pthread_key_t;
@@@ -104,4 -101,11 +104,11 @@@ static inline void *pthread_getspecific
        return TlsGetValue(key);
  }
  
+ #ifndef __MINGW64_VERSION_MAJOR
+ static inline int pthread_sigmask(int how, const sigset_t *set, sigset_t *oset)
+ {
+       return 0;
+ }
+ #endif
  #endif /* PTHREAD_H */
diff --combined run-command.c
index c72601056cf5ae7be2593ae89af4effc26a1b043,11ac060490345854171a5fde17ab922d6ba72642..2d6628012d7f3dc555d9bf385e990ac2c4ec6885
@@@ -3,8 -3,6 +3,8 @@@
  #include "exec_cmd.h"
  #include "sigchain.h"
  #include "argv-array.h"
 +#include "thread-utils.h"
 +#include "strbuf.h"
  
  void child_process_init(struct child_process *child)
  {
@@@ -238,7 -236,7 +238,7 @@@ static int wait_or_whine(pid_t pid, con
                error("waitpid is confused (%s)", argv0);
        } else if (WIFSIGNALED(status)) {
                code = WTERMSIG(status);
 -              if (code != SIGINT && code != SIGQUIT)
 +              if (code != SIGINT && code != SIGQUIT && code != SIGPIPE)
                        error("%s died of signal %d", argv0, code);
                /*
                 * This return value is chosen so that code & 0xff
@@@ -590,6 -588,16 +590,16 @@@ static void *run_thread(void *data
        struct async *async = data;
        intptr_t ret;
  
+       if (async->isolate_sigpipe) {
+               sigset_t mask;
+               sigemptyset(&mask);
+               sigaddset(&mask, SIGPIPE);
+               if (pthread_sigmask(SIG_BLOCK, &mask, NULL) < 0) {
+                       ret = error("unable to block SIGPIPE in async thread");
+                       return (void *)ret;
+               }
+       }
        pthread_setspecific(async_key, async);
        ret = async->proc(async->proc_in, async->proc_out, async->data);
        return (void *)ret;
@@@ -867,318 -875,3 +877,318 @@@ int capture_command(struct child_proces
        close(cmd->out);
        return finish_command(cmd);
  }
 +
 +enum child_state {
 +      GIT_CP_FREE,
 +      GIT_CP_WORKING,
 +      GIT_CP_WAIT_CLEANUP,
 +};
 +
 +struct parallel_processes {
 +      void *data;
 +
 +      int max_processes;
 +      int nr_processes;
 +
 +      get_next_task_fn get_next_task;
 +      start_failure_fn start_failure;
 +      task_finished_fn task_finished;
 +
 +      struct {
 +              enum child_state state;
 +              struct child_process process;
 +              struct strbuf err;
 +              void *data;
 +      } *children;
 +      /*
 +       * The struct pollfd is logically part of *children,
 +       * but the system call expects it as its own array.
 +       */
 +      struct pollfd *pfd;
 +
 +      unsigned shutdown : 1;
 +
 +      int output_owner;
 +      struct strbuf buffered_output; /* of finished children */
 +};
 +
 +static int default_start_failure(struct strbuf *err,
 +                               void *pp_cb,
 +                               void *pp_task_cb)
 +{
 +      return 0;
 +}
 +
 +static int default_task_finished(int result,
 +                               struct strbuf *err,
 +                               void *pp_cb,
 +                               void *pp_task_cb)
 +{
 +      return 0;
 +}
 +
 +static void kill_children(struct parallel_processes *pp, int signo)
 +{
 +      int i, n = pp->max_processes;
 +
 +      for (i = 0; i < n; i++)
 +              if (pp->children[i].state == GIT_CP_WORKING)
 +                      kill(pp->children[i].process.pid, signo);
 +}
 +
 +static struct parallel_processes *pp_for_signal;
 +
 +static void handle_children_on_signal(int signo)
 +{
 +      kill_children(pp_for_signal, signo);
 +      sigchain_pop(signo);
 +      raise(signo);
 +}
 +
 +static void pp_init(struct parallel_processes *pp,
 +                  int n,
 +                  get_next_task_fn get_next_task,
 +                  start_failure_fn start_failure,
 +                  task_finished_fn task_finished,
 +                  void *data)
 +{
 +      int i;
 +
 +      if (n < 1)
 +              n = online_cpus();
 +
 +      pp->max_processes = n;
 +
 +      trace_printf("run_processes_parallel: preparing to run up to %d tasks", n);
 +
 +      pp->data = data;
 +      if (!get_next_task)
 +              die("BUG: you need to specify a get_next_task function");
 +      pp->get_next_task = get_next_task;
 +
 +      pp->start_failure = start_failure ? start_failure : default_start_failure;
 +      pp->task_finished = task_finished ? task_finished : default_task_finished;
 +
 +      pp->nr_processes = 0;
 +      pp->output_owner = 0;
 +      pp->shutdown = 0;
 +      pp->children = xcalloc(n, sizeof(*pp->children));
 +      pp->pfd = xcalloc(n, sizeof(*pp->pfd));
 +      strbuf_init(&pp->buffered_output, 0);
 +
 +      for (i = 0; i < n; i++) {
 +              strbuf_init(&pp->children[i].err, 0);
 +              child_process_init(&pp->children[i].process);
 +              pp->pfd[i].events = POLLIN | POLLHUP;
 +              pp->pfd[i].fd = -1;
 +      }
 +
 +      pp_for_signal = pp;
 +      sigchain_push_common(handle_children_on_signal);
 +}
 +
 +static void pp_cleanup(struct parallel_processes *pp)
 +{
 +      int i;
 +
 +      trace_printf("run_processes_parallel: done");
 +      for (i = 0; i < pp->max_processes; i++) {
 +              strbuf_release(&pp->children[i].err);
 +              child_process_clear(&pp->children[i].process);
 +      }
 +
 +      free(pp->children);
 +      free(pp->pfd);
 +
 +      /*
 +       * When get_next_task added messages to the buffer in its last
 +       * iteration, the buffered output is non empty.
 +       */
 +      fputs(pp->buffered_output.buf, stderr);
 +      strbuf_release(&pp->buffered_output);
 +
 +      sigchain_pop_common();
 +}
 +
 +/* returns
 + *  0 if a new task was started.
 + *  1 if no new jobs was started (get_next_task ran out of work, non critical
 + *    problem with starting a new command)
 + * <0 no new job was started, user wishes to shutdown early. Use negative code
 + *    to signal the children.
 + */
 +static int pp_start_one(struct parallel_processes *pp)
 +{
 +      int i, code;
 +
 +      for (i = 0; i < pp->max_processes; i++)
 +              if (pp->children[i].state == GIT_CP_FREE)
 +                      break;
 +      if (i == pp->max_processes)
 +              die("BUG: bookkeeping is hard");
 +
 +      code = pp->get_next_task(&pp->children[i].process,
 +                               &pp->children[i].err,
 +                               pp->data,
 +                               &pp->children[i].data);
 +      if (!code) {
 +              strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
 +              strbuf_reset(&pp->children[i].err);
 +              return 1;
 +      }
 +      pp->children[i].process.err = -1;
 +      pp->children[i].process.stdout_to_stderr = 1;
 +      pp->children[i].process.no_stdin = 1;
 +
 +      if (start_command(&pp->children[i].process)) {
 +              code = pp->start_failure(&pp->children[i].err,
 +                                       pp->data,
 +                                       &pp->children[i].data);
 +              strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
 +              strbuf_reset(&pp->children[i].err);
 +              if (code)
 +                      pp->shutdown = 1;
 +              return code;
 +      }
 +
 +      pp->nr_processes++;
 +      pp->children[i].state = GIT_CP_WORKING;
 +      pp->pfd[i].fd = pp->children[i].process.err;
 +      return 0;
 +}
 +
 +static void pp_buffer_stderr(struct parallel_processes *pp, int output_timeout)
 +{
 +      int i;
 +
 +      while ((i = poll(pp->pfd, pp->max_processes, output_timeout)) < 0) {
 +              if (errno == EINTR)
 +                      continue;
 +              pp_cleanup(pp);
 +              die_errno("poll");
 +      }
 +
 +      /* Buffer output from all pipes. */
 +      for (i = 0; i < pp->max_processes; i++) {
 +              if (pp->children[i].state == GIT_CP_WORKING &&
 +                  pp->pfd[i].revents & (POLLIN | POLLHUP)) {
 +                      int n = strbuf_read_once(&pp->children[i].err,
 +                                               pp->children[i].process.err, 0);
 +                      if (n == 0) {
 +                              close(pp->children[i].process.err);
 +                              pp->children[i].state = GIT_CP_WAIT_CLEANUP;
 +                      } else if (n < 0)
 +                              if (errno != EAGAIN)
 +                                      die_errno("read");
 +              }
 +      }
 +}
 +
 +static void pp_output(struct parallel_processes *pp)
 +{
 +      int i = pp->output_owner;
 +      if (pp->children[i].state == GIT_CP_WORKING &&
 +          pp->children[i].err.len) {
 +              fputs(pp->children[i].err.buf, stderr);
 +              strbuf_reset(&pp->children[i].err);
 +      }
 +}
 +
 +static int pp_collect_finished(struct parallel_processes *pp)
 +{
 +      int i, code;
 +      int n = pp->max_processes;
 +      int result = 0;
 +
 +      while (pp->nr_processes > 0) {
 +              for (i = 0; i < pp->max_processes; i++)
 +                      if (pp->children[i].state == GIT_CP_WAIT_CLEANUP)
 +                              break;
 +              if (i == pp->max_processes)
 +                      break;
 +
 +              code = finish_command(&pp->children[i].process);
 +
 +              code = pp->task_finished(code,
 +                                       &pp->children[i].err, pp->data,
 +                                       &pp->children[i].data);
 +
 +              if (code)
 +                      result = code;
 +              if (code < 0)
 +                      break;
 +
 +              pp->nr_processes--;
 +              pp->children[i].state = GIT_CP_FREE;
 +              pp->pfd[i].fd = -1;
 +              child_process_init(&pp->children[i].process);
 +
 +              if (i != pp->output_owner) {
 +                      strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
 +                      strbuf_reset(&pp->children[i].err);
 +              } else {
 +                      fputs(pp->children[i].err.buf, stderr);
 +                      strbuf_reset(&pp->children[i].err);
 +
 +                      /* Output all other finished child processes */
 +                      fputs(pp->buffered_output.buf, stderr);
 +                      strbuf_reset(&pp->buffered_output);
 +
 +                      /*
 +                       * Pick next process to output live.
 +                       * NEEDSWORK:
 +                       * For now we pick it randomly by doing a round
 +                       * robin. Later we may want to pick the one with
 +                       * the most output or the longest or shortest
 +                       * running process time.
 +                       */
 +                      for (i = 0; i < n; i++)
 +                              if (pp->children[(pp->output_owner + i) % n].state == GIT_CP_WORKING)
 +                                      break;
 +                      pp->output_owner = (pp->output_owner + i) % n;
 +              }
 +      }
 +      return result;
 +}
 +
 +int run_processes_parallel(int n,
 +                         get_next_task_fn get_next_task,
 +                         start_failure_fn start_failure,
 +                         task_finished_fn task_finished,
 +                         void *pp_cb)
 +{
 +      int i, code;
 +      int output_timeout = 100;
 +      int spawn_cap = 4;
 +      struct parallel_processes pp;
 +
 +      pp_init(&pp, n, get_next_task, start_failure, task_finished, pp_cb);
 +      while (1) {
 +              for (i = 0;
 +                  i < spawn_cap && !pp.shutdown &&
 +                  pp.nr_processes < pp.max_processes;
 +                  i++) {
 +                      code = pp_start_one(&pp);
 +                      if (!code)
 +                              continue;
 +                      if (code < 0) {
 +                              pp.shutdown = 1;
 +                              kill_children(&pp, -code);
 +                      }
 +                      break;
 +              }
 +              if (!pp.nr_processes)
 +                      break;
 +              pp_buffer_stderr(&pp, output_timeout);
 +              pp_output(&pp);
 +              code = pp_collect_finished(&pp);
 +              if (code) {
 +                      pp.shutdown = 1;
 +                      if (code < 0)
 +                              kill_children(&pp, -code);
 +              }
 +      }
 +
 +      pp_cleanup(&pp);
 +      return 0;
 +}
diff --combined run-command.h
index 3d1e59e26e33d062a10698fc139f7fc0f4ae14ec,a3043b16dfffdae5662aef2b0bef1c1b91fc7606..49ba764c6c1d6b6d14e2e1cd56f3a5b6e8a37c12
@@@ -116,6 -116,7 +116,7 @@@ struct async 
        int proc_in;
        int proc_out;
  #endif
+       int isolate_sigpipe;
  };
  
  int start_async(struct async *async);
@@@ -123,81 -124,4 +124,81 @@@ int finish_async(struct async *async)
  int in_async(void);
  void NORETURN async_exit(int code);
  
 +/**
 + * This callback should initialize the child process and preload the
 + * error channel if desired. The preloading of is useful if you want to
 + * have a message printed directly before the output of the child process.
 + * pp_cb is the callback cookie as passed to run_processes_parallel.
 + * You can store a child process specific callback cookie in pp_task_cb.
 + *
 + * Even after returning 0 to indicate that there are no more processes,
 + * this function will be called again until there are no more running
 + * child processes.
 + *
 + * Return 1 if the next child is ready to run.
 + * Return 0 if there are currently no more tasks to be processed.
 + * To send a signal to other child processes for abortion,
 + * return the negative signal number.
 + */
 +typedef int (*get_next_task_fn)(struct child_process *cp,
 +                              struct strbuf *err,
 +                              void *pp_cb,
 +                              void **pp_task_cb);
 +
 +/**
 + * This callback is called whenever there are problems starting
 + * a new process.
 + *
 + * You must not write to stdout or stderr in this function. Add your
 + * message to the strbuf err instead, which will be printed without
 + * messing up the output of the other parallel processes.
 + *
 + * pp_cb is the callback cookie as passed into run_processes_parallel,
 + * pp_task_cb is the callback cookie as passed into get_next_task_fn.
 + *
 + * Return 0 to continue the parallel processing. To abort return non zero.
 + * To send a signal to other child processes for abortion, return
 + * the negative signal number.
 + */
 +typedef int (*start_failure_fn)(struct strbuf *err,
 +                              void *pp_cb,
 +                              void *pp_task_cb);
 +
 +/**
 + * This callback is called on every child process that finished processing.
 + *
 + * You must not write to stdout or stderr in this function. Add your
 + * message to the strbuf err instead, which will be printed without
 + * messing up the output of the other parallel processes.
 + *
 + * pp_cb is the callback cookie as passed into run_processes_parallel,
 + * pp_task_cb is the callback cookie as passed into get_next_task_fn.
 + *
 + * Return 0 to continue the parallel processing.  To abort return non zero.
 + * To send a signal to other child processes for abortion, return
 + * the negative signal number.
 + */
 +typedef int (*task_finished_fn)(int result,
 +                              struct strbuf *err,
 +                              void *pp_cb,
 +                              void *pp_task_cb);
 +
 +/**
 + * Runs up to n processes at the same time. Whenever a process can be
 + * started, the callback get_next_task_fn is called to obtain the data
 + * required to start another child process.
 + *
 + * The children started via this function run in parallel. Their output
 + * (both stdout and stderr) is routed to stderr in a manner that output
 + * from different tasks does not interleave.
 + *
 + * start_failure_fn and task_finished_fn can be NULL to omit any
 + * special handling.
 + */
 +int run_processes_parallel(int n,
 +                         get_next_task_fn,
 +                         start_failure_fn,
 +                         task_finished_fn,
 +                         void *pp_cb);
 +
  #endif