From: Junio C Hamano Date: Wed, 18 May 2016 21:40:06 +0000 (-0700) Subject: Merge branch 'jk/push-client-deadlock-fix' into HEAD X-Git-Tag: v2.8.3~19 X-Git-Url: https://git.lorimer.id.au/gitweb.git/diff_plain/c555e529ac1d29ed98229698d38f77ebb684a017?hp=-c Merge branch 'jk/push-client-deadlock-fix' into HEAD Some Windows SDK lacks pthread_sigmask() implementation and fails to compile the recently updated "git push" codepath that uses it. * jk/push-client-deadlock-fix: Windows: only add a no-op pthread_sigmask() when needed Windows: add pthread_sigmask() that does nothing t5504: drop sigpipe=ok from push tests fetch-pack: isolate sigpipe in demuxer thread send-pack: isolate sigpipe in demuxer thread run-command: teach async threads to ignore SIGPIPE send-pack: close demux pipe before finishing async process --- c555e529ac1d29ed98229698d38f77ebb684a017 diff --combined compat/mingw.h index 1de70ffd62,c26b6e2958..edec9e0253 --- a/compat/mingw.h +++ b/compat/mingw.h @@@ -1,43 -1,27 +1,43 @@@ +#ifdef __MINGW64_VERSION_MAJOR +#include +#include +typedef _sigset_t sigset_t; +#endif #include #include +/* MinGW-w64 reports to have flockfile, but it does not actually have it. */ +#ifdef __MINGW64_VERSION_MAJOR +#undef _POSIX_THREAD_SAFE_FUNCTIONS +#endif + /* * things that are not available in header files */ -typedef int pid_t; typedef int uid_t; typedef int socklen_t; +#ifndef __MINGW64_VERSION_MAJOR +typedef int pid_t; #define hstrerror strerror +#endif #define S_IFLNK 0120000 /* Symbolic link */ #define S_ISLNK(x) (((x) & S_IFMT) == S_IFLNK) #define S_ISSOCK(x) 0 +#ifndef S_IRWXG #define S_IRGRP 0 #define S_IWGRP 0 #define S_IXGRP 0 #define S_IRWXG (S_IRGRP | S_IWGRP | S_IXGRP) +#endif +#ifndef S_IRWXO #define S_IROTH 0 #define S_IWOTH 0 #define S_IXOTH 0 #define S_IRWXO (S_IROTH | S_IWOTH | S_IXOTH) +#endif #define S_ISUID 0004000 #define S_ISGID 0002000 @@@ -116,10 -100,8 +116,10 @@@ static inline int symlink(const char *o { errno = ENOSYS; return -1; } static inline int fchmod(int fildes, mode_t mode) { errno = ENOSYS; return -1; } +#ifndef __MINGW64_VERSION_MAJOR static inline pid_t fork(void) { errno = ENOSYS; return -1; } +#endif static inline unsigned int alarm(unsigned int seconds) { return 0; } static inline int fsync(int fd) @@@ -142,6 -124,7 +142,7 @@@ static inline int fcntl(int fd, int cmd #define sigemptyset(x) (void)0 static inline int sigaddset(sigset_t *set, int signum) { return 0; } + #define SIG_BLOCK 0 #define SIG_UNBLOCK 0 static inline int sigprocmask(int how, const sigset_t *set, sigset_t *oldset) { return 0; } @@@ -194,10 -177,8 +195,10 @@@ int pipe(int filedes[2]) unsigned int sleep (unsigned int seconds); int mkstemp(char *template); int gettimeofday(struct timeval *tv, void *tz); +#ifndef __MINGW64_VERSION_MAJOR struct tm *gmtime_r(const time_t *timep, struct tm *result); struct tm *localtime_r(const time_t *timep, struct tm *result); +#endif int getpagesize(void); /* defined in MinGW's libgcc.a */ struct passwd *getpwuid(uid_t uid); int setitimer(int type, struct itimerval *in, struct itimerval *out); @@@ -321,10 -302,8 +322,10 @@@ static inline int getrlimit(int resourc /* * Use mingw specific stat()/lstat()/fstat() implementations on Windows. */ +#ifndef __MINGW64_VERSION_MAJOR #define off_t off64_t #define lseek _lseeki64 +#endif /* use struct stat with 64 bit st_size */ #ifdef stat @@@ -396,22 -375,12 +397,22 @@@ static inline char *mingw_find_last_dir ret = (char *)path; return ret; } +static inline void convert_slashes(char *path) +{ + for (; *path; path++) + if (*path == '\\') + *path = '/'; +} #define find_last_dir_sep mingw_find_last_dir_sep int mingw_offset_1st_component(const char *path); #define offset_1st_component mingw_offset_1st_component #define PATH_SEP ';' +#if !defined(__MINGW64_VERSION_MAJOR) && (!defined(_MSC_VER) || _MSC_VER < 1800) #define PRIuMAX "I64u" #define PRId64 "I64d" +#else +#include +#endif void mingw_open_html(const char *path); #define open_html mingw_open_html diff --combined compat/win32/pthread.h index b6ed9e7462,7eac560d85..1c164088fb --- a/compat/win32/pthread.h +++ b/compat/win32/pthread.h @@@ -18,10 -18,7 +18,10 @@@ */ #define pthread_mutex_t CRITICAL_SECTION -#define pthread_mutex_init(a,b) (InitializeCriticalSection((a)), 0) +static inline int return_0(int i) { + return 0; +} +#define pthread_mutex_init(a,b) return_0((InitializeCriticalSection((a)), 0)) #define pthread_mutex_destroy(a) DeleteCriticalSection((a)) #define pthread_mutex_lock EnterCriticalSection #define pthread_mutex_unlock LeaveCriticalSection @@@ -78,9 -75,9 +78,9 @@@ extern int win32_pthread_join(pthread_ #define pthread_equal(t1, t2) ((t1).tid == (t2).tid) extern pthread_t pthread_self(void); -static inline int pthread_exit(void *ret) +static inline void NORETURN pthread_exit(void *ret) { - ExitThread((DWORD)ret); + ExitThread((DWORD)(intptr_t)ret); } typedef DWORD pthread_key_t; @@@ -104,4 -101,11 +104,11 @@@ static inline void *pthread_getspecific return TlsGetValue(key); } + #ifndef __MINGW64_VERSION_MAJOR + static inline int pthread_sigmask(int how, const sigset_t *set, sigset_t *oset) + { + return 0; + } + #endif + #endif /* PTHREAD_H */ diff --combined run-command.c index c72601056c,11ac060490..2d6628012d --- a/run-command.c +++ b/run-command.c @@@ -3,8 -3,6 +3,8 @@@ #include "exec_cmd.h" #include "sigchain.h" #include "argv-array.h" +#include "thread-utils.h" +#include "strbuf.h" void child_process_init(struct child_process *child) { @@@ -238,7 -236,7 +238,7 @@@ static int wait_or_whine(pid_t pid, con error("waitpid is confused (%s)", argv0); } else if (WIFSIGNALED(status)) { code = WTERMSIG(status); - if (code != SIGINT && code != SIGQUIT) + if (code != SIGINT && code != SIGQUIT && code != SIGPIPE) error("%s died of signal %d", argv0, code); /* * This return value is chosen so that code & 0xff @@@ -590,6 -588,16 +590,16 @@@ static void *run_thread(void *data struct async *async = data; intptr_t ret; + if (async->isolate_sigpipe) { + sigset_t mask; + sigemptyset(&mask); + sigaddset(&mask, SIGPIPE); + if (pthread_sigmask(SIG_BLOCK, &mask, NULL) < 0) { + ret = error("unable to block SIGPIPE in async thread"); + return (void *)ret; + } + } + pthread_setspecific(async_key, async); ret = async->proc(async->proc_in, async->proc_out, async->data); return (void *)ret; @@@ -867,318 -875,3 +877,318 @@@ int capture_command(struct child_proces close(cmd->out); return finish_command(cmd); } + +enum child_state { + GIT_CP_FREE, + GIT_CP_WORKING, + GIT_CP_WAIT_CLEANUP, +}; + +struct parallel_processes { + void *data; + + int max_processes; + int nr_processes; + + get_next_task_fn get_next_task; + start_failure_fn start_failure; + task_finished_fn task_finished; + + struct { + enum child_state state; + struct child_process process; + struct strbuf err; + void *data; + } *children; + /* + * The struct pollfd is logically part of *children, + * but the system call expects it as its own array. + */ + struct pollfd *pfd; + + unsigned shutdown : 1; + + int output_owner; + struct strbuf buffered_output; /* of finished children */ +}; + +static int default_start_failure(struct strbuf *err, + void *pp_cb, + void *pp_task_cb) +{ + return 0; +} + +static int default_task_finished(int result, + struct strbuf *err, + void *pp_cb, + void *pp_task_cb) +{ + return 0; +} + +static void kill_children(struct parallel_processes *pp, int signo) +{ + int i, n = pp->max_processes; + + for (i = 0; i < n; i++) + if (pp->children[i].state == GIT_CP_WORKING) + kill(pp->children[i].process.pid, signo); +} + +static struct parallel_processes *pp_for_signal; + +static void handle_children_on_signal(int signo) +{ + kill_children(pp_for_signal, signo); + sigchain_pop(signo); + raise(signo); +} + +static void pp_init(struct parallel_processes *pp, + int n, + get_next_task_fn get_next_task, + start_failure_fn start_failure, + task_finished_fn task_finished, + void *data) +{ + int i; + + if (n < 1) + n = online_cpus(); + + pp->max_processes = n; + + trace_printf("run_processes_parallel: preparing to run up to %d tasks", n); + + pp->data = data; + if (!get_next_task) + die("BUG: you need to specify a get_next_task function"); + pp->get_next_task = get_next_task; + + pp->start_failure = start_failure ? start_failure : default_start_failure; + pp->task_finished = task_finished ? task_finished : default_task_finished; + + pp->nr_processes = 0; + pp->output_owner = 0; + pp->shutdown = 0; + pp->children = xcalloc(n, sizeof(*pp->children)); + pp->pfd = xcalloc(n, sizeof(*pp->pfd)); + strbuf_init(&pp->buffered_output, 0); + + for (i = 0; i < n; i++) { + strbuf_init(&pp->children[i].err, 0); + child_process_init(&pp->children[i].process); + pp->pfd[i].events = POLLIN | POLLHUP; + pp->pfd[i].fd = -1; + } + + pp_for_signal = pp; + sigchain_push_common(handle_children_on_signal); +} + +static void pp_cleanup(struct parallel_processes *pp) +{ + int i; + + trace_printf("run_processes_parallel: done"); + for (i = 0; i < pp->max_processes; i++) { + strbuf_release(&pp->children[i].err); + child_process_clear(&pp->children[i].process); + } + + free(pp->children); + free(pp->pfd); + + /* + * When get_next_task added messages to the buffer in its last + * iteration, the buffered output is non empty. + */ + fputs(pp->buffered_output.buf, stderr); + strbuf_release(&pp->buffered_output); + + sigchain_pop_common(); +} + +/* returns + * 0 if a new task was started. + * 1 if no new jobs was started (get_next_task ran out of work, non critical + * problem with starting a new command) + * <0 no new job was started, user wishes to shutdown early. Use negative code + * to signal the children. + */ +static int pp_start_one(struct parallel_processes *pp) +{ + int i, code; + + for (i = 0; i < pp->max_processes; i++) + if (pp->children[i].state == GIT_CP_FREE) + break; + if (i == pp->max_processes) + die("BUG: bookkeeping is hard"); + + code = pp->get_next_task(&pp->children[i].process, + &pp->children[i].err, + pp->data, + &pp->children[i].data); + if (!code) { + strbuf_addbuf(&pp->buffered_output, &pp->children[i].err); + strbuf_reset(&pp->children[i].err); + return 1; + } + pp->children[i].process.err = -1; + pp->children[i].process.stdout_to_stderr = 1; + pp->children[i].process.no_stdin = 1; + + if (start_command(&pp->children[i].process)) { + code = pp->start_failure(&pp->children[i].err, + pp->data, + &pp->children[i].data); + strbuf_addbuf(&pp->buffered_output, &pp->children[i].err); + strbuf_reset(&pp->children[i].err); + if (code) + pp->shutdown = 1; + return code; + } + + pp->nr_processes++; + pp->children[i].state = GIT_CP_WORKING; + pp->pfd[i].fd = pp->children[i].process.err; + return 0; +} + +static void pp_buffer_stderr(struct parallel_processes *pp, int output_timeout) +{ + int i; + + while ((i = poll(pp->pfd, pp->max_processes, output_timeout)) < 0) { + if (errno == EINTR) + continue; + pp_cleanup(pp); + die_errno("poll"); + } + + /* Buffer output from all pipes. */ + for (i = 0; i < pp->max_processes; i++) { + if (pp->children[i].state == GIT_CP_WORKING && + pp->pfd[i].revents & (POLLIN | POLLHUP)) { + int n = strbuf_read_once(&pp->children[i].err, + pp->children[i].process.err, 0); + if (n == 0) { + close(pp->children[i].process.err); + pp->children[i].state = GIT_CP_WAIT_CLEANUP; + } else if (n < 0) + if (errno != EAGAIN) + die_errno("read"); + } + } +} + +static void pp_output(struct parallel_processes *pp) +{ + int i = pp->output_owner; + if (pp->children[i].state == GIT_CP_WORKING && + pp->children[i].err.len) { + fputs(pp->children[i].err.buf, stderr); + strbuf_reset(&pp->children[i].err); + } +} + +static int pp_collect_finished(struct parallel_processes *pp) +{ + int i, code; + int n = pp->max_processes; + int result = 0; + + while (pp->nr_processes > 0) { + for (i = 0; i < pp->max_processes; i++) + if (pp->children[i].state == GIT_CP_WAIT_CLEANUP) + break; + if (i == pp->max_processes) + break; + + code = finish_command(&pp->children[i].process); + + code = pp->task_finished(code, + &pp->children[i].err, pp->data, + &pp->children[i].data); + + if (code) + result = code; + if (code < 0) + break; + + pp->nr_processes--; + pp->children[i].state = GIT_CP_FREE; + pp->pfd[i].fd = -1; + child_process_init(&pp->children[i].process); + + if (i != pp->output_owner) { + strbuf_addbuf(&pp->buffered_output, &pp->children[i].err); + strbuf_reset(&pp->children[i].err); + } else { + fputs(pp->children[i].err.buf, stderr); + strbuf_reset(&pp->children[i].err); + + /* Output all other finished child processes */ + fputs(pp->buffered_output.buf, stderr); + strbuf_reset(&pp->buffered_output); + + /* + * Pick next process to output live. + * NEEDSWORK: + * For now we pick it randomly by doing a round + * robin. Later we may want to pick the one with + * the most output or the longest or shortest + * running process time. + */ + for (i = 0; i < n; i++) + if (pp->children[(pp->output_owner + i) % n].state == GIT_CP_WORKING) + break; + pp->output_owner = (pp->output_owner + i) % n; + } + } + return result; +} + +int run_processes_parallel(int n, + get_next_task_fn get_next_task, + start_failure_fn start_failure, + task_finished_fn task_finished, + void *pp_cb) +{ + int i, code; + int output_timeout = 100; + int spawn_cap = 4; + struct parallel_processes pp; + + pp_init(&pp, n, get_next_task, start_failure, task_finished, pp_cb); + while (1) { + for (i = 0; + i < spawn_cap && !pp.shutdown && + pp.nr_processes < pp.max_processes; + i++) { + code = pp_start_one(&pp); + if (!code) + continue; + if (code < 0) { + pp.shutdown = 1; + kill_children(&pp, -code); + } + break; + } + if (!pp.nr_processes) + break; + pp_buffer_stderr(&pp, output_timeout); + pp_output(&pp); + code = pp_collect_finished(&pp); + if (code) { + pp.shutdown = 1; + if (code < 0) + kill_children(&pp, -code); + } + } + + pp_cleanup(&pp); + return 0; +} diff --combined run-command.h index 3d1e59e26e,a3043b16df..49ba764c6c --- a/run-command.h +++ b/run-command.h @@@ -116,6 -116,7 +116,7 @@@ struct async int proc_in; int proc_out; #endif + int isolate_sigpipe; }; int start_async(struct async *async); @@@ -123,81 -124,4 +124,81 @@@ int finish_async(struct async *async) int in_async(void); void NORETURN async_exit(int code); +/** + * This callback should initialize the child process and preload the + * error channel if desired. The preloading of is useful if you want to + * have a message printed directly before the output of the child process. + * pp_cb is the callback cookie as passed to run_processes_parallel. + * You can store a child process specific callback cookie in pp_task_cb. + * + * Even after returning 0 to indicate that there are no more processes, + * this function will be called again until there are no more running + * child processes. + * + * Return 1 if the next child is ready to run. + * Return 0 if there are currently no more tasks to be processed. + * To send a signal to other child processes for abortion, + * return the negative signal number. + */ +typedef int (*get_next_task_fn)(struct child_process *cp, + struct strbuf *err, + void *pp_cb, + void **pp_task_cb); + +/** + * This callback is called whenever there are problems starting + * a new process. + * + * You must not write to stdout or stderr in this function. Add your + * message to the strbuf err instead, which will be printed without + * messing up the output of the other parallel processes. + * + * pp_cb is the callback cookie as passed into run_processes_parallel, + * pp_task_cb is the callback cookie as passed into get_next_task_fn. + * + * Return 0 to continue the parallel processing. To abort return non zero. + * To send a signal to other child processes for abortion, return + * the negative signal number. + */ +typedef int (*start_failure_fn)(struct strbuf *err, + void *pp_cb, + void *pp_task_cb); + +/** + * This callback is called on every child process that finished processing. + * + * You must not write to stdout or stderr in this function. Add your + * message to the strbuf err instead, which will be printed without + * messing up the output of the other parallel processes. + * + * pp_cb is the callback cookie as passed into run_processes_parallel, + * pp_task_cb is the callback cookie as passed into get_next_task_fn. + * + * Return 0 to continue the parallel processing. To abort return non zero. + * To send a signal to other child processes for abortion, return + * the negative signal number. + */ +typedef int (*task_finished_fn)(int result, + struct strbuf *err, + void *pp_cb, + void *pp_task_cb); + +/** + * Runs up to n processes at the same time. Whenever a process can be + * started, the callback get_next_task_fn is called to obtain the data + * required to start another child process. + * + * The children started via this function run in parallel. Their output + * (both stdout and stderr) is routed to stderr in a manner that output + * from different tasks does not interleave. + * + * start_failure_fn and task_finished_fn can be NULL to omit any + * special handling. + */ +int run_processes_parallel(int n, + get_next_task_fn, + start_failure_fn, + task_finished_fn, + void *pp_cb); + #endif