Merge branch 'sb/submodule-parallel-fetch'
authorJunio C Hamano <gitster@pobox.com>
Tue, 12 Jan 2016 23:16:54 +0000 (15:16 -0800)
committerJunio C Hamano <gitster@pobox.com>
Tue, 12 Jan 2016 23:16:54 +0000 (15:16 -0800)
Add a framework to spawn a group of processes in parallel, and use
it to run "git fetch --recurse-submodules" in parallel.

Rerolled and this seems to be a lot cleaner. The merge of the
earlier one to 'next' has been reverted.

* sb/submodule-parallel-fetch:
submodules: allow parallel fetching, add tests and documentation
fetch_populated_submodules: use new parallel job processing
run-command: add an asynchronous parallel child processor
sigchain: add command to pop all common signals
strbuf: add strbuf_read_once to read without blocking
xread: poll on non blocking fds
submodule.c: write "Fetching submodule <foo>" to stderr

15 files changed:
Documentation/fetch-options.txt
builtin/fetch.c
builtin/pull.c
run-command.c
run-command.h
sigchain.c
sigchain.h
strbuf.c
strbuf.h
submodule.c
submodule.h
t/t0061-run-command.sh
t/t5526-fetch-submodules.sh
test-run-command.c
wrapper.c
index 45583d8454b4cae9bdb2ddb904c077cf1d3a1223..6b109f687aa27e6ad8ee4b7b665d053193975af6 100644 (file)
@@ -100,6 +100,13 @@ ifndef::git-pull[]
        reference to a commit that isn't already in the local submodule
        clone.
 
+-j::
+--jobs=<n>::
+       Number of parallel children to be used for fetching submodules.
+       Each will fetch from different submodules, such that fetching many
+       submodules will be faster. By default submodules will be fetched
+       one at a time.
+
 --no-recurse-submodules::
        Disable recursive fetching of submodules (this has the same effect as
        using the '--recurse-submodules=no' option).
index c85f3471d4b53ce7db7662b5531ddebe5513b7b0..586840d761a9c14142ee35cdd53e2e4f96648209 100644 (file)
@@ -37,6 +37,7 @@ static int prune = -1; /* unspecified */
 static int all, append, dry_run, force, keep, multiple, update_head_ok, verbosity;
 static int progress = -1, recurse_submodules = RECURSE_SUBMODULES_DEFAULT;
 static int tags = TAGS_DEFAULT, unshallow, update_shallow;
+static int max_children = 1;
 static const char *depth;
 static const char *upload_pack;
 static struct strbuf default_rla = STRBUF_INIT;
@@ -99,6 +100,8 @@ static struct option builtin_fetch_options[] = {
                    N_("fetch all tags and associated objects"), TAGS_SET),
        OPT_SET_INT('n', NULL, &tags,
                    N_("do not fetch all tags (--no-tags)"), TAGS_UNSET),
+       OPT_INTEGER('j', "jobs", &max_children,
+                   N_("number of submodules fetched in parallel")),
        OPT_BOOL('p', "prune", &prune,
                 N_("prune remote-tracking branches no longer on remote")),
        { OPTION_CALLBACK, 0, "recurse-submodules", NULL, N_("on-demand"),
@@ -1213,7 +1216,8 @@ int cmd_fetch(int argc, const char **argv, const char *prefix)
                result = fetch_populated_submodules(&options,
                                                    submodule_prefix,
                                                    recurse_submodules,
-                                                   verbosity < 0);
+                                                   verbosity < 0,
+                                                   max_children);
                argv_array_clear(&options);
        }
 
index 5145fc60a0377a7fb799555892909f979aca9076..9e3c73809f83af636150cc3edd8628c339ba40c1 100644 (file)
@@ -95,6 +95,7 @@ static int opt_force;
 static char *opt_tags;
 static char *opt_prune;
 static char *opt_recurse_submodules;
+static char *max_children;
 static int opt_dry_run;
 static char *opt_keep;
 static char *opt_depth;
@@ -178,6 +179,9 @@ static struct option pull_options[] = {
                N_("on-demand"),
                N_("control recursive fetching of submodules"),
                PARSE_OPT_OPTARG),
+       OPT_PASSTHRU('j', "jobs", &max_children, N_("n"),
+               N_("number of submodules pulled in parallel"),
+               PARSE_OPT_OPTARG),
        OPT_BOOL(0, "dry-run", &opt_dry_run,
                N_("dry run")),
        OPT_PASSTHRU('k', "keep", &opt_keep, NULL,
@@ -525,6 +529,8 @@ static int run_fetch(const char *repo, const char **refspecs)
                argv_array_push(&args, opt_prune);
        if (opt_recurse_submodules)
                argv_array_push(&args, opt_recurse_submodules);
+       if (max_children)
+               argv_array_push(&args, max_children);
        if (opt_dry_run)
                argv_array_push(&args, "--dry-run");
        if (opt_keep)
index 13fa452e8c3d5a5e20e24a969d53ce7ade4019bf..51fd72c4273c3ee59246285e40c9a5f9bd0e2f98 100644 (file)
@@ -3,6 +3,8 @@
 #include "exec_cmd.h"
 #include "sigchain.h"
 #include "argv-array.h"
+#include "thread-utils.h"
+#include "strbuf.h"
 
 void child_process_init(struct child_process *child)
 {
@@ -865,3 +867,336 @@ int capture_command(struct child_process *cmd, struct strbuf *buf, size_t hint)
        close(cmd->out);
        return finish_command(cmd);
 }
+
+enum child_state {
+       GIT_CP_FREE,
+       GIT_CP_WORKING,
+       GIT_CP_WAIT_CLEANUP,
+};
+
+struct parallel_processes {
+       void *data;
+
+       int max_processes;
+       int nr_processes;
+
+       get_next_task_fn get_next_task;
+       start_failure_fn start_failure;
+       task_finished_fn task_finished;
+
+       struct {
+               enum child_state state;
+               struct child_process process;
+               struct strbuf err;
+               void *data;
+       } *children;
+       /*
+        * The struct pollfd is logically part of *children,
+        * but the system call expects it as its own array.
+        */
+       struct pollfd *pfd;
+
+       unsigned shutdown : 1;
+
+       int output_owner;
+       struct strbuf buffered_output; /* of finished children */
+};
+
+static int default_start_failure(struct child_process *cp,
+                                struct strbuf *err,
+                                void *pp_cb,
+                                void *pp_task_cb)
+{
+       int i;
+
+       strbuf_addstr(err, "Starting a child failed:");
+       for (i = 0; cp->argv[i]; i++)
+               strbuf_addf(err, " %s", cp->argv[i]);
+
+       return 0;
+}
+
+static int default_task_finished(int result,
+                                struct child_process *cp,
+                                struct strbuf *err,
+                                void *pp_cb,
+                                void *pp_task_cb)
+{
+       int i;
+
+       if (!result)
+               return 0;
+
+       strbuf_addf(err, "A child failed with return code %d:", result);
+       for (i = 0; cp->argv[i]; i++)
+               strbuf_addf(err, " %s", cp->argv[i]);
+
+       return 0;
+}
+
+static void kill_children(struct parallel_processes *pp, int signo)
+{
+       int i, n = pp->max_processes;
+
+       for (i = 0; i < n; i++)
+               if (pp->children[i].state == GIT_CP_WORKING)
+                       kill(pp->children[i].process.pid, signo);
+}
+
+static struct parallel_processes *pp_for_signal;
+
+static void handle_children_on_signal(int signo)
+{
+       kill_children(pp_for_signal, signo);
+       sigchain_pop(signo);
+       raise(signo);
+}
+
+static void pp_init(struct parallel_processes *pp,
+                   int n,
+                   get_next_task_fn get_next_task,
+                   start_failure_fn start_failure,
+                   task_finished_fn task_finished,
+                   void *data)
+{
+       int i;
+
+       if (n < 1)
+               n = online_cpus();
+
+       pp->max_processes = n;
+
+       trace_printf("run_processes_parallel: preparing to run up to %d tasks", n);
+
+       pp->data = data;
+       if (!get_next_task)
+               die("BUG: you need to specify a get_next_task function");
+       pp->get_next_task = get_next_task;
+
+       pp->start_failure = start_failure ? start_failure : default_start_failure;
+       pp->task_finished = task_finished ? task_finished : default_task_finished;
+
+       pp->nr_processes = 0;
+       pp->output_owner = 0;
+       pp->shutdown = 0;
+       pp->children = xcalloc(n, sizeof(*pp->children));
+       pp->pfd = xcalloc(n, sizeof(*pp->pfd));
+       strbuf_init(&pp->buffered_output, 0);
+
+       for (i = 0; i < n; i++) {
+               strbuf_init(&pp->children[i].err, 0);
+               child_process_init(&pp->children[i].process);
+               pp->pfd[i].events = POLLIN | POLLHUP;
+               pp->pfd[i].fd = -1;
+       }
+
+       pp_for_signal = pp;
+       sigchain_push_common(handle_children_on_signal);
+}
+
+static void pp_cleanup(struct parallel_processes *pp)
+{
+       int i;
+
+       trace_printf("run_processes_parallel: done");
+       for (i = 0; i < pp->max_processes; i++) {
+               strbuf_release(&pp->children[i].err);
+               child_process_clear(&pp->children[i].process);
+       }
+
+       free(pp->children);
+       free(pp->pfd);
+
+       /*
+        * When get_next_task added messages to the buffer in its last
+        * iteration, the buffered output is non empty.
+        */
+       fputs(pp->buffered_output.buf, stderr);
+       strbuf_release(&pp->buffered_output);
+
+       sigchain_pop_common();
+}
+
+/* returns
+ *  0 if a new task was started.
+ *  1 if no new jobs was started (get_next_task ran out of work, non critical
+ *    problem with starting a new command)
+ * <0 no new job was started, user wishes to shutdown early. Use negative code
+ *    to signal the children.
+ */
+static int pp_start_one(struct parallel_processes *pp)
+{
+       int i, code;
+
+       for (i = 0; i < pp->max_processes; i++)
+               if (pp->children[i].state == GIT_CP_FREE)
+                       break;
+       if (i == pp->max_processes)
+               die("BUG: bookkeeping is hard");
+
+       code = pp->get_next_task(&pp->children[i].process,
+                                &pp->children[i].err,
+                                pp->data,
+                                &pp->children[i].data);
+       if (!code) {
+               strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
+               strbuf_reset(&pp->children[i].err);
+               return 1;
+       }
+       pp->children[i].process.err = -1;
+       pp->children[i].process.stdout_to_stderr = 1;
+       pp->children[i].process.no_stdin = 1;
+
+       if (start_command(&pp->children[i].process)) {
+               code = pp->start_failure(&pp->children[i].process,
+                                        &pp->children[i].err,
+                                        pp->data,
+                                        &pp->children[i].data);
+               strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
+               strbuf_reset(&pp->children[i].err);
+               if (code)
+                       pp->shutdown = 1;
+               return code;
+       }
+
+       pp->nr_processes++;
+       pp->children[i].state = GIT_CP_WORKING;
+       pp->pfd[i].fd = pp->children[i].process.err;
+       return 0;
+}
+
+static void pp_buffer_stderr(struct parallel_processes *pp, int output_timeout)
+{
+       int i;
+
+       while ((i = poll(pp->pfd, pp->max_processes, output_timeout)) < 0) {
+               if (errno == EINTR)
+                       continue;
+               pp_cleanup(pp);
+               die_errno("poll");
+       }
+
+       /* Buffer output from all pipes. */
+       for (i = 0; i < pp->max_processes; i++) {
+               if (pp->children[i].state == GIT_CP_WORKING &&
+                   pp->pfd[i].revents & (POLLIN | POLLHUP)) {
+                       int n = strbuf_read_once(&pp->children[i].err,
+                                                pp->children[i].process.err, 0);
+                       if (n == 0) {
+                               close(pp->children[i].process.err);
+                               pp->children[i].state = GIT_CP_WAIT_CLEANUP;
+                       } else if (n < 0)
+                               if (errno != EAGAIN)
+                                       die_errno("read");
+               }
+       }
+}
+
+static void pp_output(struct parallel_processes *pp)
+{
+       int i = pp->output_owner;
+       if (pp->children[i].state == GIT_CP_WORKING &&
+           pp->children[i].err.len) {
+               fputs(pp->children[i].err.buf, stderr);
+               strbuf_reset(&pp->children[i].err);
+       }
+}
+
+static int pp_collect_finished(struct parallel_processes *pp)
+{
+       int i, code;
+       int n = pp->max_processes;
+       int result = 0;
+
+       while (pp->nr_processes > 0) {
+               for (i = 0; i < pp->max_processes; i++)
+                       if (pp->children[i].state == GIT_CP_WAIT_CLEANUP)
+                               break;
+               if (i == pp->max_processes)
+                       break;
+
+               code = finish_command(&pp->children[i].process);
+
+               code = pp->task_finished(code, &pp->children[i].process,
+                                        &pp->children[i].err, pp->data,
+                                        &pp->children[i].data);
+
+               if (code)
+                       result = code;
+               if (code < 0)
+                       break;
+
+               pp->nr_processes--;
+               pp->children[i].state = GIT_CP_FREE;
+               pp->pfd[i].fd = -1;
+               child_process_init(&pp->children[i].process);
+
+               if (i != pp->output_owner) {
+                       strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
+                       strbuf_reset(&pp->children[i].err);
+               } else {
+                       fputs(pp->children[i].err.buf, stderr);
+                       strbuf_reset(&pp->children[i].err);
+
+                       /* Output all other finished child processes */
+                       fputs(pp->buffered_output.buf, stderr);
+                       strbuf_reset(&pp->buffered_output);
+
+                       /*
+                        * Pick next process to output live.
+                        * NEEDSWORK:
+                        * For now we pick it randomly by doing a round
+                        * robin. Later we may want to pick the one with
+                        * the most output or the longest or shortest
+                        * running process time.
+                        */
+                       for (i = 0; i < n; i++)
+                               if (pp->children[(pp->output_owner + i) % n].state == GIT_CP_WORKING)
+                                       break;
+                       pp->output_owner = (pp->output_owner + i) % n;
+               }
+       }
+       return result;
+}
+
+int run_processes_parallel(int n,
+                          get_next_task_fn get_next_task,
+                          start_failure_fn start_failure,
+                          task_finished_fn task_finished,
+                          void *pp_cb)
+{
+       int i, code;
+       int output_timeout = 100;
+       int spawn_cap = 4;
+       struct parallel_processes pp;
+
+       pp_init(&pp, n, get_next_task, start_failure, task_finished, pp_cb);
+       while (1) {
+               for (i = 0;
+                   i < spawn_cap && !pp.shutdown &&
+                   pp.nr_processes < pp.max_processes;
+                   i++) {
+                       code = pp_start_one(&pp);
+                       if (!code)
+                               continue;
+                       if (code < 0) {
+                               pp.shutdown = 1;
+                               kill_children(&pp, -code);
+                       }
+                       break;
+               }
+               if (!pp.nr_processes)
+                       break;
+               pp_buffer_stderr(&pp, output_timeout);
+               pp_output(&pp);
+               code = pp_collect_finished(&pp);
+               if (code) {
+                       pp.shutdown = 1;
+                       if (code < 0)
+                               kill_children(&pp, -code);
+               }
+       }
+
+       pp_cleanup(&pp);
+       return 0;
+}
index 12bb26c2a6155750203babfec47b08e8bde0ad27..d5a57f922781de57d8ee6ae56f90af1456ccd5d5 100644 (file)
@@ -122,4 +122,84 @@ int start_async(struct async *async);
 int finish_async(struct async *async);
 int in_async(void);
 
+/**
+ * This callback should initialize the child process and preload the
+ * error channel if desired. The preloading of is useful if you want to
+ * have a message printed directly before the output of the child process.
+ * pp_cb is the callback cookie as passed to run_processes_parallel.
+ * You can store a child process specific callback cookie in pp_task_cb.
+ *
+ * Even after returning 0 to indicate that there are no more processes,
+ * this function will be called again until there are no more running
+ * child processes.
+ *
+ * Return 1 if the next child is ready to run.
+ * Return 0 if there are currently no more tasks to be processed.
+ * To send a signal to other child processes for abortion,
+ * return the negative signal number.
+ */
+typedef int (*get_next_task_fn)(struct child_process *cp,
+                               struct strbuf *err,
+                               void *pp_cb,
+                               void **pp_task_cb);
+
+/**
+ * This callback is called whenever there are problems starting
+ * a new process.
+ *
+ * You must not write to stdout or stderr in this function. Add your
+ * message to the strbuf err instead, which will be printed without
+ * messing up the output of the other parallel processes.
+ *
+ * pp_cb is the callback cookie as passed into run_processes_parallel,
+ * pp_task_cb is the callback cookie as passed into get_next_task_fn.
+ *
+ * Return 0 to continue the parallel processing. To abort return non zero.
+ * To send a signal to other child processes for abortion, return
+ * the negative signal number.
+ */
+typedef int (*start_failure_fn)(struct child_process *cp,
+                               struct strbuf *err,
+                               void *pp_cb,
+                               void *pp_task_cb);
+
+/**
+ * This callback is called on every child process that finished processing.
+ *
+ * You must not write to stdout or stderr in this function. Add your
+ * message to the strbuf err instead, which will be printed without
+ * messing up the output of the other parallel processes.
+ *
+ * pp_cb is the callback cookie as passed into run_processes_parallel,
+ * pp_task_cb is the callback cookie as passed into get_next_task_fn.
+ *
+ * Return 0 to continue the parallel processing.  To abort return non zero.
+ * To send a signal to other child processes for abortion, return
+ * the negative signal number.
+ */
+typedef int (*task_finished_fn)(int result,
+                               struct child_process *cp,
+                               struct strbuf *err,
+                               void *pp_cb,
+                               void *pp_task_cb);
+
+/**
+ * Runs up to n processes at the same time. Whenever a process can be
+ * started, the callback get_next_task_fn is called to obtain the data
+ * required to start another child process.
+ *
+ * The children started via this function run in parallel. Their output
+ * (both stdout and stderr) is routed to stderr in a manner that output
+ * from different tasks does not interleave.
+ *
+ * If start_failure_fn or task_finished_fn are NULL, default handlers
+ * will be used. The default handlers will print an error message on
+ * error without issuing an emergency stop.
+ */
+int run_processes_parallel(int n,
+                          get_next_task_fn,
+                          start_failure_fn,
+                          task_finished_fn,
+                          void *pp_cb);
+
 #endif
index faa375d5d8e7cf6bbba599034308028efaba1e0f..2ac43bbd282492e3c35c6ac5f96b14a541ed74c6 100644 (file)
@@ -50,3 +50,12 @@ void sigchain_push_common(sigchain_fun f)
        sigchain_push(SIGQUIT, f);
        sigchain_push(SIGPIPE, f);
 }
+
+void sigchain_pop_common(void)
+{
+       sigchain_pop(SIGPIPE);
+       sigchain_pop(SIGQUIT);
+       sigchain_pop(SIGTERM);
+       sigchain_pop(SIGHUP);
+       sigchain_pop(SIGINT);
+}
index 618083bce0c66a551fd4d894b31520a67b25bac9..138b20f54b4017c1f38036935f483c628094affb 100644 (file)
@@ -7,5 +7,6 @@ int sigchain_push(int sig, sigchain_fun f);
 int sigchain_pop(int sig);
 
 void sigchain_push_common(sigchain_fun f);
+void sigchain_pop_common(void);
 
 #endif /* SIGCHAIN_H */
index d76f0aed85c4ec6eafdcfd1a8ee2b22d3d20df96..38686ffb65c2de953f9b224a5eecd22188081417 100644 (file)
--- a/strbuf.c
+++ b/strbuf.c
@@ -384,6 +384,17 @@ ssize_t strbuf_read(struct strbuf *sb, int fd, size_t hint)
        return sb->len - oldlen;
 }
 
+ssize_t strbuf_read_once(struct strbuf *sb, int fd, size_t hint)
+{
+       ssize_t cnt;
+
+       strbuf_grow(sb, hint ? hint : 8192);
+       cnt = xread(fd, sb->buf + sb->len, sb->alloc - sb->len - 1);
+       if (cnt > 0)
+               strbuf_setlen(sb, sb->len + cnt);
+       return cnt;
+}
+
 #define STRBUF_MAXLINK (2*PATH_MAX)
 
 int strbuf_readlink(struct strbuf *sb, const char *path, size_t hint)
index 7123fca7aff5182b66af6d76991552081139467c..2bf90e70fcf1e9bcbfcbfdf4bc8b30e1d7b61d10 100644 (file)
--- a/strbuf.h
+++ b/strbuf.h
@@ -366,6 +366,14 @@ extern size_t strbuf_fread(struct strbuf *, size_t, FILE *);
  */
 extern ssize_t strbuf_read(struct strbuf *, int fd, size_t hint);
 
+/**
+ * Read the contents of a given file descriptor partially by using only one
+ * attempt of xread. The third argument can be used to give a hint about the
+ * file size, to avoid reallocs. Returns the number of new bytes appended to
+ * the sb.
+ */
+extern ssize_t strbuf_read_once(struct strbuf *, int fd, size_t hint);
+
 /**
  * Read the contents of a file, specified by its path. The third argument
  * can be used to give a hint about the file size, to avoid reallocs.
index 14e76247bf107da916cf100b8078cf10238e6b76..b83939c294782ac59e5c78c8882064a1e272e309 100644 (file)
@@ -12,6 +12,7 @@
 #include "sha1-array.h"
 #include "argv-array.h"
 #include "blob.h"
+#include "thread-utils.h"
 
 static int config_fetch_recurse_submodules = RECURSE_SUBMODULES_ON_DEMAND;
 static struct string_list changed_submodule_paths;
@@ -610,37 +611,28 @@ static void calculate_changed_submodule_paths(void)
        initialized_fetch_ref_tips = 0;
 }
 
-int fetch_populated_submodules(const struct argv_array *options,
-                              const char *prefix, int command_line_option,
-                              int quiet)
+struct submodule_parallel_fetch {
+       int count;
+       struct argv_array args;
+       const char *work_tree;
+       const char *prefix;
+       int command_line_option;
+       int quiet;
+       int result;
+};
+#define SPF_INIT {0, ARGV_ARRAY_INIT, NULL, NULL, 0, 0, 0}
+
+static int get_next_submodule(struct child_process *cp,
+                             struct strbuf *err, void *data, void **task_cb)
 {
-       int i, result = 0;
-       struct child_process cp = CHILD_PROCESS_INIT;
-       struct argv_array argv = ARGV_ARRAY_INIT;
-       const char *work_tree = get_git_work_tree();
-       if (!work_tree)
-               goto out;
-
-       if (read_cache() < 0)
-               die("index file corrupt");
-
-       argv_array_push(&argv, "fetch");
-       for (i = 0; i < options->argc; i++)
-               argv_array_push(&argv, options->argv[i]);
-       argv_array_push(&argv, "--recurse-submodules-default");
-       /* default value, "--submodule-prefix" and its value are added later */
-
-       cp.env = local_repo_env;
-       cp.git_cmd = 1;
-       cp.no_stdin = 1;
-
-       calculate_changed_submodule_paths();
+       int ret = 0;
+       struct submodule_parallel_fetch *spf = data;
 
-       for (i = 0; i < active_nr; i++) {
+       for (; spf->count < active_nr; spf->count++) {
                struct strbuf submodule_path = STRBUF_INIT;
                struct strbuf submodule_git_dir = STRBUF_INIT;
                struct strbuf submodule_prefix = STRBUF_INIT;
-               const struct cache_entry *ce = active_cache[i];
+               const struct cache_entry *ce = active_cache[spf->count];
                const char *git_dir, *default_argv;
                const struct submodule *submodule;
 
@@ -652,7 +644,7 @@ int fetch_populated_submodules(const struct argv_array *options,
                        submodule = submodule_from_name(null_sha1, ce->name);
 
                default_argv = "yes";
-               if (command_line_option == RECURSE_SUBMODULES_DEFAULT) {
+               if (spf->command_line_option == RECURSE_SUBMODULES_DEFAULT) {
                        if (submodule &&
                            submodule->fetch_recurse !=
                                                RECURSE_SUBMODULES_NONE) {
@@ -675,40 +667,101 @@ int fetch_populated_submodules(const struct argv_array *options,
                                        default_argv = "on-demand";
                                }
                        }
-               } else if (command_line_option == RECURSE_SUBMODULES_ON_DEMAND) {
+               } else if (spf->command_line_option == RECURSE_SUBMODULES_ON_DEMAND) {
                        if (!unsorted_string_list_lookup(&changed_submodule_paths, ce->name))
                                continue;
                        default_argv = "on-demand";
                }
 
-               strbuf_addf(&submodule_path, "%s/%s", work_tree, ce->name);
+               strbuf_addf(&submodule_path, "%s/%s", spf->work_tree, ce->name);
                strbuf_addf(&submodule_git_dir, "%s/.git", submodule_path.buf);
-               strbuf_addf(&submodule_prefix, "%s%s/", prefix, ce->name);
+               strbuf_addf(&submodule_prefix, "%s%s/", spf->prefix, ce->name);
                git_dir = read_gitfile(submodule_git_dir.buf);
                if (!git_dir)
                        git_dir = submodule_git_dir.buf;
                if (is_directory(git_dir)) {
-                       if (!quiet)
-                               printf("Fetching submodule %s%s\n", prefix, ce->name);
-                       cp.dir = submodule_path.buf;
-                       argv_array_push(&argv, default_argv);
-                       argv_array_push(&argv, "--submodule-prefix");
-                       argv_array_push(&argv, submodule_prefix.buf);
-                       cp.argv = argv.argv;
-                       if (run_command(&cp))
-                               result = 1;
-                       argv_array_pop(&argv);
-                       argv_array_pop(&argv);
-                       argv_array_pop(&argv);
+                       child_process_init(cp);
+                       cp->dir = strbuf_detach(&submodule_path, NULL);
+                       cp->env = local_repo_env;
+                       cp->git_cmd = 1;
+                       if (!spf->quiet)
+                               strbuf_addf(err, "Fetching submodule %s%s\n",
+                                           spf->prefix, ce->name);
+                       argv_array_init(&cp->args);
+                       argv_array_pushv(&cp->args, spf->args.argv);
+                       argv_array_push(&cp->args, default_argv);
+                       argv_array_push(&cp->args, "--submodule-prefix");
+                       argv_array_push(&cp->args, submodule_prefix.buf);
+                       ret = 1;
                }
                strbuf_release(&submodule_path);
                strbuf_release(&submodule_git_dir);
                strbuf_release(&submodule_prefix);
+               if (ret) {
+                       spf->count++;
+                       return 1;
+               }
        }
-       argv_array_clear(&argv);
+       return 0;
+}
+
+static int fetch_start_failure(struct child_process *cp,
+                              struct strbuf *err,
+                              void *cb, void *task_cb)
+{
+       struct submodule_parallel_fetch *spf = cb;
+
+       spf->result = 1;
+
+       return 0;
+}
+
+static int fetch_finish(int retvalue, struct child_process *cp,
+                       struct strbuf *err, void *cb, void *task_cb)
+{
+       struct submodule_parallel_fetch *spf = cb;
+
+       if (retvalue)
+               spf->result = 1;
+
+       return 0;
+}
+
+int fetch_populated_submodules(const struct argv_array *options,
+                              const char *prefix, int command_line_option,
+                              int quiet, int max_parallel_jobs)
+{
+       int i;
+       struct submodule_parallel_fetch spf = SPF_INIT;
+
+       spf.work_tree = get_git_work_tree();
+       spf.command_line_option = command_line_option;
+       spf.quiet = quiet;
+       spf.prefix = prefix;
+
+       if (!spf.work_tree)
+               goto out;
+
+       if (read_cache() < 0)
+               die("index file corrupt");
+
+       argv_array_push(&spf.args, "fetch");
+       for (i = 0; i < options->argc; i++)
+               argv_array_push(&spf.args, options->argv[i]);
+       argv_array_push(&spf.args, "--recurse-submodules-default");
+       /* default value, "--submodule-prefix" and its value are added later */
+
+       calculate_changed_submodule_paths();
+       run_processes_parallel(max_parallel_jobs,
+                              get_next_submodule,
+                              fetch_start_failure,
+                              fetch_finish,
+                              &spf);
+
+       argv_array_clear(&spf.args);
 out:
        string_list_clear(&changed_submodule_paths, 1);
-       return result;
+       return spf.result;
 }
 
 unsigned is_submodule_modified(const char *path, int ignore_untracked)
index ddff512109cbf24f0ead87676f1fc8e6cb92f897..e06eaa5ebb30e825fd0721c76e7d194b0b854706 100644 (file)
@@ -32,7 +32,7 @@ void set_config_fetch_recurse_submodules(int value);
 void check_for_new_submodule_commits(unsigned char new_sha1[20]);
 int fetch_populated_submodules(const struct argv_array *options,
                               const char *prefix, int command_line_option,
-                              int quiet);
+                              int quiet, int max_parallel_jobs);
 unsigned is_submodule_modified(const char *path, int ignore_untracked);
 int submodule_uses_gitfile(const char *path);
 int ok_to_remove_submodule(const char *path);
index 9acf628726fe0c648279fb724f1917435db206f7..12228b4aa6cb6a393a006130b31c73625911cec6 100755 (executable)
@@ -47,4 +47,57 @@ test_expect_success POSIXPERM,SANITY 'unreadable directory in PATH' '
        test_cmp expect actual
 '
 
+cat >expect <<-EOF
+preloaded output of a child
+Hello
+World
+preloaded output of a child
+Hello
+World
+preloaded output of a child
+Hello
+World
+preloaded output of a child
+Hello
+World
+EOF
+
+test_expect_success 'run_command runs in parallel with more jobs available than tasks' '
+       test-run-command run-command-parallel 5 sh -c "printf \"%s\n%s\n\" Hello World" 2>actual &&
+       test_cmp expect actual
+'
+
+test_expect_success 'run_command runs in parallel with as many jobs as tasks' '
+       test-run-command run-command-parallel 4 sh -c "printf \"%s\n%s\n\" Hello World" 2>actual &&
+       test_cmp expect actual
+'
+
+test_expect_success 'run_command runs in parallel with more tasks than jobs available' '
+       test-run-command run-command-parallel 3 sh -c "printf \"%s\n%s\n\" Hello World" 2>actual &&
+       test_cmp expect actual
+'
+
+cat >expect <<-EOF
+preloaded output of a child
+asking for a quick stop
+preloaded output of a child
+asking for a quick stop
+preloaded output of a child
+asking for a quick stop
+EOF
+
+test_expect_success 'run_command is asked to abort gracefully' '
+       test-run-command run-command-abort 3 false 2>actual &&
+       test_cmp expect actual
+'
+
+cat >expect <<-EOF
+no further jobs available
+EOF
+
+test_expect_success 'run_command outputs ' '
+       test-run-command run-command-no-jobs 3 sh -c "printf \"%s\n%s\n\" Hello World" 2>actual &&
+       test_cmp expect actual
+'
+
 test_done
index a4532b00d6cbf791a6654b00742138728d4bf66d..1241146227aead97022ade343152347bc911444c 100755 (executable)
@@ -16,7 +16,8 @@ add_upstream_commit() {
                git add subfile &&
                git commit -m new subfile &&
                head2=$(git rev-parse --short HEAD) &&
-               echo "From $pwd/submodule" > ../expect.err &&
+               echo "Fetching submodule submodule" > ../expect.err &&
+               echo "From $pwd/submodule" >> ../expect.err &&
                echo "   $head1..$head2  master     -> origin/master" >> ../expect.err
        ) &&
        (
@@ -27,6 +28,7 @@ add_upstream_commit() {
                git add deepsubfile &&
                git commit -m new deepsubfile &&
                head2=$(git rev-parse --short HEAD) &&
+               echo "Fetching submodule submodule/subdir/deepsubmodule" >> ../expect.err
                echo "From $pwd/deepsubmodule" >> ../expect.err &&
                echo "   $head1..$head2  master     -> origin/master" >> ../expect.err
        )
@@ -56,9 +58,7 @@ test_expect_success setup '
        (
                cd downstream &&
                git submodule update --init --recursive
-       ) &&
-       echo "Fetching submodule submodule" > expect.out &&
-       echo "Fetching submodule submodule/subdir/deepsubmodule" >> expect.out
+       )
 '
 
 test_expect_success "fetch --recurse-submodules recurses into submodules" '
@@ -67,10 +67,21 @@ test_expect_success "fetch --recurse-submodules recurses into submodules" '
                cd downstream &&
                git fetch --recurse-submodules >../actual.out 2>../actual.err
        ) &&
-       test_i18ncmp expect.out actual.out &&
+       test_must_be_empty actual.out &&
        test_i18ncmp expect.err actual.err
 '
 
+test_expect_success "fetch --recurse-submodules -j2 has the same output behaviour" '
+       add_upstream_commit &&
+       (
+               cd downstream &&
+               GIT_TRACE=$(pwd)/../trace.out git fetch --recurse-submodules -j2 2>../actual.err
+       ) &&
+       test_must_be_empty actual.out &&
+       test_i18ncmp expect.err actual.err &&
+       grep "2 tasks" trace.out
+'
+
 test_expect_success "fetch alone only fetches superproject" '
        add_upstream_commit &&
        (
@@ -96,7 +107,7 @@ test_expect_success "using fetchRecurseSubmodules=true in .gitmodules recurses i
                git config -f .gitmodules submodule.submodule.fetchRecurseSubmodules true &&
                git fetch >../actual.out 2>../actual.err
        ) &&
-       test_i18ncmp expect.out actual.out &&
+       test_must_be_empty actual.out &&
        test_i18ncmp expect.err actual.err
 '
 
@@ -127,7 +138,7 @@ test_expect_success "--recurse-submodules overrides fetchRecurseSubmodules setti
                git config --unset -f .gitmodules submodule.submodule.fetchRecurseSubmodules &&
                git config --unset submodule.submodule.fetchRecurseSubmodules
        ) &&
-       test_i18ncmp expect.out actual.out &&
+       test_must_be_empty actual.out &&
        test_i18ncmp expect.err actual.err
 '
 
@@ -140,13 +151,22 @@ test_expect_success "--quiet propagates to submodules" '
        ! test -s actual.err
 '
 
+test_expect_success "--quiet propagates to parallel submodules" '
+       (
+               cd downstream &&
+               git fetch --recurse-submodules -j 2 --quiet  >../actual.out 2>../actual.err
+       ) &&
+       ! test -s actual.out &&
+       ! test -s actual.err
+'
+
 test_expect_success "--dry-run propagates to submodules" '
        add_upstream_commit &&
        (
                cd downstream &&
                git fetch --recurse-submodules --dry-run >../actual.out 2>../actual.err
        ) &&
-       test_i18ncmp expect.out actual.out &&
+       test_must_be_empty actual.out &&
        test_i18ncmp expect.err actual.err
 '
 
@@ -155,7 +175,7 @@ test_expect_success "Without --dry-run propagates to submodules" '
                cd downstream &&
                git fetch --recurse-submodules >../actual.out 2>../actual.err
        ) &&
-       test_i18ncmp expect.out actual.out &&
+       test_must_be_empty actual.out &&
        test_i18ncmp expect.err actual.err
 '
 
@@ -166,7 +186,7 @@ test_expect_success "recurseSubmodules=true propagates into submodules" '
                git config fetch.recurseSubmodules true
                git fetch >../actual.out 2>../actual.err
        ) &&
-       test_i18ncmp expect.out actual.out &&
+       test_must_be_empty actual.out &&
        test_i18ncmp expect.err actual.err
 '
 
@@ -180,7 +200,7 @@ test_expect_success "--recurse-submodules overrides config in submodule" '
                ) &&
                git fetch --recurse-submodules >../actual.out 2>../actual.err
        ) &&
-       test_i18ncmp expect.out actual.out &&
+       test_must_be_empty actual.out &&
        test_i18ncmp expect.err actual.err
 '
 
@@ -214,16 +234,15 @@ test_expect_success "Recursion stops when no new submodule commits are fetched"
        git add submodule &&
        git commit -m "new submodule" &&
        head2=$(git rev-parse --short HEAD) &&
-       echo "Fetching submodule submodule" > expect.out.sub &&
        echo "From $pwd/." > expect.err.sub &&
        echo "   $head1..$head2  master     -> origin/master" >>expect.err.sub &&
-       head -2 expect.err >> expect.err.sub &&
+       head -3 expect.err >> expect.err.sub &&
        (
                cd downstream &&
                git fetch >../actual.out 2>../actual.err
        ) &&
        test_i18ncmp expect.err.sub actual.err &&
-       test_i18ncmp expect.out.sub actual.out
+       test_must_be_empty actual.out
 '
 
 test_expect_success "Recursion doesn't happen when new superproject commits don't change any submodules" '
@@ -269,7 +288,7 @@ test_expect_success "Recursion picks up config in submodule" '
                )
        ) &&
        test_i18ncmp expect.err.sub actual.err &&
-       test_i18ncmp expect.out actual.out
+       test_must_be_empty actual.out
 '
 
 test_expect_success "Recursion picks up all submodules when necessary" '
@@ -285,7 +304,8 @@ test_expect_success "Recursion picks up all submodules when necessary" '
                git add subdir/deepsubmodule &&
                git commit -m "new deepsubmodule"
                head2=$(git rev-parse --short HEAD) &&
-               echo "From $pwd/submodule" > ../expect.err.sub &&
+               echo "Fetching submodule submodule" > ../expect.err.sub &&
+               echo "From $pwd/submodule" >> ../expect.err.sub &&
                echo "   $head1..$head2  master     -> origin/master" >> ../expect.err.sub
        ) &&
        head1=$(git rev-parse --short HEAD) &&
@@ -295,13 +315,13 @@ test_expect_success "Recursion picks up all submodules when necessary" '
        echo "From $pwd/." > expect.err.2 &&
        echo "   $head1..$head2  master     -> origin/master" >> expect.err.2 &&
        cat expect.err.sub >> expect.err.2 &&
-       tail -2 expect.err >> expect.err.2 &&
+       tail -3 expect.err >> expect.err.2 &&
        (
                cd downstream &&
                git fetch >../actual.out 2>../actual.err
        ) &&
        test_i18ncmp expect.err.2 actual.err &&
-       test_i18ncmp expect.out actual.out
+       test_must_be_empty actual.out
 '
 
 test_expect_success "'--recurse-submodules=on-demand' doesn't recurse when no new commits are fetched in the superproject (and ignores config)" '
@@ -317,7 +337,8 @@ test_expect_success "'--recurse-submodules=on-demand' doesn't recurse when no ne
                git add subdir/deepsubmodule &&
                git commit -m "new deepsubmodule" &&
                head2=$(git rev-parse --short HEAD) &&
-               echo "From $pwd/submodule" > ../expect.err.sub &&
+               echo Fetching submodule submodule > ../expect.err.sub &&
+               echo "From $pwd/submodule" >> ../expect.err.sub &&
                echo "   $head1..$head2  master     -> origin/master" >> ../expect.err.sub
        ) &&
        (
@@ -335,7 +356,7 @@ test_expect_success "'--recurse-submodules=on-demand' recurses as deep as necess
        git add submodule &&
        git commit -m "new submodule" &&
        head2=$(git rev-parse --short HEAD) &&
-       tail -2 expect.err > expect.err.deepsub &&
+       tail -3 expect.err > expect.err.deepsub &&
        echo "From $pwd/." > expect.err &&
        echo "   $head1..$head2  master     -> origin/master" >>expect.err &&
        cat expect.err.sub >> expect.err &&
@@ -354,7 +375,7 @@ test_expect_success "'--recurse-submodules=on-demand' recurses as deep as necess
                        git config --unset -f .gitmodules submodule.subdir/deepsubmodule.fetchRecursive
                )
        ) &&
-       test_i18ncmp expect.out actual.out &&
+       test_must_be_empty actual.out &&
        test_i18ncmp expect.err actual.err
 '
 
@@ -388,7 +409,7 @@ test_expect_success "'fetch.recurseSubmodules=on-demand' overrides global config
        head2=$(git rev-parse --short HEAD) &&
        echo "From $pwd/." > expect.err.2 &&
        echo "   $head1..$head2  master     -> origin/master" >>expect.err.2 &&
-       head -2 expect.err >> expect.err.2 &&
+       head -3 expect.err >> expect.err.2 &&
        (
                cd downstream &&
                git config fetch.recurseSubmodules on-demand &&
@@ -399,7 +420,7 @@ test_expect_success "'fetch.recurseSubmodules=on-demand' overrides global config
                cd downstream &&
                git config --unset fetch.recurseSubmodules
        ) &&
-       test_i18ncmp expect.out.sub actual.out &&
+       test_must_be_empty actual.out &&
        test_i18ncmp expect.err.2 actual.err
 '
 
@@ -416,7 +437,7 @@ test_expect_success "'submodule.<sub>.fetchRecurseSubmodules=on-demand' override
        head2=$(git rev-parse --short HEAD) &&
        echo "From $pwd/." > expect.err.2 &&
        echo "   $head1..$head2  master     -> origin/master" >>expect.err.2 &&
-       head -2 expect.err >> expect.err.2 &&
+       head -3 expect.err >> expect.err.2 &&
        (
                cd downstream &&
                git config submodule.submodule.fetchRecurseSubmodules on-demand &&
@@ -427,7 +448,7 @@ test_expect_success "'submodule.<sub>.fetchRecurseSubmodules=on-demand' override
                cd downstream &&
                git config --unset submodule.submodule.fetchRecurseSubmodules
        ) &&
-       test_i18ncmp expect.out.sub actual.out &&
+       test_must_be_empty actual.out &&
        test_i18ncmp expect.err.2 actual.err
 '
 
index 89c7de2c600ce2781ce666324e703d555937740a..fbe0a27ef3295fe3312fb72a478571bfb450cb25 100644 (file)
 
 #include "git-compat-util.h"
 #include "run-command.h"
+#include "argv-array.h"
+#include "strbuf.h"
 #include <string.h>
 #include <errno.h>
 
+static int number_callbacks;
+static int parallel_next(struct child_process *cp,
+                        struct strbuf *err,
+                        void *cb,
+                        void **task_cb)
+{
+       struct child_process *d = cb;
+       if (number_callbacks >= 4)
+               return 0;
+
+       argv_array_pushv(&cp->args, d->argv);
+       strbuf_addf(err, "preloaded output of a child\n");
+       number_callbacks++;
+       return 1;
+}
+
+static int no_job(struct child_process *cp,
+                 struct strbuf *err,
+                 void *cb,
+                 void **task_cb)
+{
+       strbuf_addf(err, "no further jobs available\n");
+       return 0;
+}
+
+static int task_finished(int result,
+                        struct child_process *cp,
+                        struct strbuf *err,
+                        void *pp_cb,
+                        void *pp_task_cb)
+{
+       strbuf_addf(err, "asking for a quick stop\n");
+       return 1;
+}
+
 int main(int argc, char **argv)
 {
        struct child_process proc = CHILD_PROCESS_INIT;
+       int jobs;
 
        if (argc < 3)
                return 1;
-       proc.argv = (const char **)argv+2;
+       proc.argv = (const char **)argv + 2;
 
        if (!strcmp(argv[1], "start-command-ENOENT")) {
                if (start_command(&proc) < 0 && errno == ENOENT)
@@ -30,6 +68,21 @@ int main(int argc, char **argv)
        if (!strcmp(argv[1], "run-command"))
                exit(run_command(&proc));
 
+       jobs = atoi(argv[2]);
+       proc.argv = (const char **)argv + 3;
+
+       if (!strcmp(argv[1], "run-command-parallel"))
+               exit(run_processes_parallel(jobs, parallel_next,
+                                           NULL, NULL, &proc));
+
+       if (!strcmp(argv[1], "run-command-abort"))
+               exit(run_processes_parallel(jobs, parallel_next,
+                                           NULL, task_finished, &proc));
+
+       if (!strcmp(argv[1], "run-command-no-jobs"))
+               exit(run_processes_parallel(jobs, no_job,
+                                           NULL, task_finished, &proc));
+
        fprintf(stderr, "check usage\n");
        return 1;
 }
index c95e2906b850749cbeb109050359b671979969f2..b43d437630c9245a6a48a0940df1607e21357362 100644 (file)
--- a/wrapper.c
+++ b/wrapper.c
@@ -236,8 +236,24 @@ ssize_t xread(int fd, void *buf, size_t len)
            len = MAX_IO_SIZE;
        while (1) {
                nr = read(fd, buf, len);
-               if ((nr < 0) && (errno == EAGAIN || errno == EINTR))
-                       continue;
+               if (nr < 0) {
+                       if (errno == EINTR)
+                               continue;
+                       if (errno == EAGAIN || errno == EWOULDBLOCK) {
+                               struct pollfd pfd;
+                               pfd.events = POLLIN;
+                               pfd.fd = fd;
+                               /*
+                                * it is OK if this poll() failed; we
+                                * want to leave this infinite loop
+                                * only when read() returns with
+                                * success, or an expected failure,
+                                * which would be checked by the next
+                                * call to read(2).
+                                */
+                               poll(&pfd, 1, -1);
+                       }
+               }
                return nr;
        }
 }