Merge branch 'nd/threaded-index-pack'
authorJunio C Hamano <gitster@pobox.com>
Mon, 14 May 2012 18:50:40 +0000 (11:50 -0700)
committerJunio C Hamano <gitster@pobox.com>
Mon, 14 May 2012 18:50:40 +0000 (11:50 -0700)
Enables threading in index-pack to resolve base data in parallel.

By Nguyễn Thái Ngọc Duy (3) and Ramsay Jones (1)
* nd/threaded-index-pack:
index-pack: disable threading if NO_PREAD is defined
index-pack: support multithreaded delta resolving
index-pack: restructure pack processing into three main functions
compat/win32/pthread.h: Add an pthread_key_delete() implementation

1  2 
Makefile
builtin/index-pack.c
diff --cc Makefile
Simple merge
index 83555e56353a8787a8582d69e50ec2147d9c4929,807ee56f79865cb04376268b051dceda5133f329..dc2cfe6e6f63b6628f8a358f2f3b3e65c14e3b8d
@@@ -223,9 -305,28 +308,28 @@@ static NORETURN void bad_object(unsigne
        va_start(params, format);
        vsnprintf(buf, sizeof(buf), format, params);
        va_end(params);
 -      die("pack has bad object at offset %lu: %s", offset, buf);
 +      die(_("pack has bad object at offset %lu: %s"), offset, buf);
  }
  
+ static inline struct thread_local *get_thread_data(void)
+ {
+ #ifndef NO_PTHREADS
+       if (threads_active)
+               return pthread_getspecific(key);
+       assert(!threads_active &&
+              "This should only be reached when all threads are gone");
+ #endif
+       return &nothread_data;
+ }
+ #ifndef NO_PTHREADS
+ static void set_thread_data(struct thread_local *data)
+ {
+       if (threads_active)
+               pthread_setspecific(key, data);
+ }
+ #endif
  static struct base_data *alloc_base_data(void)
  {
        struct base_data *base = xmalloc(sizeof(struct base_data));
@@@ -472,14 -572,18 +578,18 @@@ static void sha1_object(const void *dat
                enum object_type has_type;
                unsigned long has_size;
                has_data = read_sha1_file(sha1, &has_type, &has_size);
+               read_unlock();
                if (!has_data)
 -                      die("cannot read existing object %s", sha1_to_hex(sha1));
 +                      die(_("cannot read existing object %s"), sha1_to_hex(sha1));
                if (size != has_size || type != has_type ||
                    memcmp(data, has_data, size) != 0)
 -                      die("SHA1 COLLISION FOUND WITH %s !", sha1_to_hex(sha1));
 +                      die(_("SHA1 COLLISION FOUND WITH %s !"), sha1_to_hex(sha1));
                free(has_data);
-       }
+       } else
+               read_unlock();
        if (strict) {
+               read_lock();
                if (type == OBJ_BLOB) {
                        struct blob *blob = lookup_blob(sha1);
                        if (blob)
@@@ -573,8 -678,8 +684,8 @@@ static void *get_base_data(struct base_
                                &c->size);
                        free(raw);
                        if (!c->data)
 -                              bad_object(obj->idx.offset, "failed to apply delta");
 +                              bad_object(obj->idx.offset, _("failed to apply delta"));
-                       base_cache_used += c->size;
+                       get_thread_data()->base_cache_used += c->size;
                        prune_base_data(c);
                }
                free(delta);
@@@ -599,10 -704,12 +710,12 @@@ static void resolve_delta(struct object
                                   delta_data, delta_obj->size, &result->size);
        free(delta_data);
        if (!result->data)
 -              bad_object(delta_obj->idx.offset, "failed to apply delta");
 +              bad_object(delta_obj->idx.offset, _("failed to apply delta"));
        sha1_object(result->data, result->size, delta_obj->real_type,
                    delta_obj->idx.sha1);
+       counter_lock();
        nr_resolved_deltas++;
+       counter_unlock();
  }
  
  static struct base_data *find_unresolved_deltas_1(struct base_data *base,
@@@ -695,15 -839,9 +845,9 @@@ static void parse_pack_objects(unsigne
        struct delta_entry *delta = deltas;
        struct stat st;
  
-       /*
-        * First pass:
-        * - find locations of all objects;
-        * - calculate SHA1 of all non-delta objects;
-        * - remember base (SHA1 or offset) for all deltas.
-        */
        if (verbose)
                progress = start_progress(
 -                              from_stdin ? "Receiving objects" : "Indexing objects",
 +                              from_stdin ? _("Receiving objects") : _("Indexing objects"),
                                nr_objects);
        for (i = 0; i < nr_objects; i++) {
                struct object_entry *obj = &objects[i];
  
        /* If input_fd is a file, we should have reached its end now. */
        if (fstat(input_fd, &st))
 -              die_errno("cannot fstat packfile");
 +              die_errno(_("cannot fstat packfile"));
        if (S_ISREG(st.st_mode) &&
                        lseek(input_fd, 0, SEEK_CUR) - input_len != st.st_size)
 -              die("pack has junk at the end");
 +              die(_("pack has junk at the end"));
+ }
+ /*
+  * Second pass:
+  * - for all non-delta objects, look if it is used as a base for
+  *   deltas;
+  * - if used as a base, uncompress the object and apply all deltas,
+  *   recursively checking if the resulting object is used as a base
+  *   for some more deltas.
+  */
+ static void resolve_deltas(void)
+ {
+       int i;
  
        if (!nr_deltas)
                return;
        qsort(deltas, nr_deltas, sizeof(struct delta_entry),
              compare_delta_entry);
  
-       /*
-        * Second pass:
-        * - for all non-delta objects, look if it is used as a base for
-        *   deltas;
-        * - if used as a base, uncompress the object and apply all deltas,
-        *   recursively checking if the resulting object is used as a base
-        *   for some more deltas.
-        */
        if (verbose)
 -              progress = start_progress("Resolving deltas", nr_deltas);
 +              progress = start_progress(_("Resolving deltas"), nr_deltas);
+ #ifndef NO_PTHREADS
+       nr_dispatched = 0;
+       if (nr_threads > 1 || getenv("GIT_FORCE_THREADS")) {
+               init_thread();
+               for (i = 0; i < nr_threads; i++) {
+                       int ret = pthread_create(&thread_data[i].thread, NULL,
+                                                threaded_second_pass, thread_data + i);
+                       if (ret)
+                               die("unable to create thread: %s", strerror(ret));
+               }
+               for (i = 0; i < nr_threads; i++)
+                       pthread_join(thread_data[i].thread, NULL);
+               cleanup_thread();
+               return;
+       }
+ #endif
        for (i = 0; i < nr_objects; i++) {
                struct object_entry *obj = &objects[i];
-               struct base_data *base_obj = alloc_base_data();
  
                if (is_delta_type(obj->type))
                        continue;
        }
  }
  
 -                      die("confusion beyond insanity");
+ /*
+  * Third pass:
+  * - append objects to convert thin pack to full pack if required
+  * - write the final 20-byte SHA-1
+  */
+ static void fix_unresolved_deltas(struct sha1file *f, int nr_unresolved);
+ static void conclude_pack(int fix_thin_pack, const char *curr_pack, unsigned char *pack_sha1)
+ {
+       if (nr_deltas == nr_resolved_deltas) {
+               stop_progress(&progress);
+               /* Flush remaining pack final 20-byte SHA1. */
+               flush();
+               return;
+       }
+       if (fix_thin_pack) {
+               struct sha1file *f;
+               unsigned char read_sha1[20], tail_sha1[20];
+               char msg[48];
+               int nr_unresolved = nr_deltas - nr_resolved_deltas;
+               int nr_objects_initial = nr_objects;
+               if (nr_unresolved <= 0)
 -              die("pack has %d unresolved deltas",
++                      die(_("confusion beyond insanity"));
+               objects = xrealloc(objects,
+                                  (nr_objects + nr_unresolved + 1)
+                                  * sizeof(*objects));
+               f = sha1fd(output_fd, curr_pack);
+               fix_unresolved_deltas(f, nr_unresolved);
+               sprintf(msg, "completed with %d local objects",
+                       nr_objects - nr_objects_initial);
+               stop_progress_msg(&progress, msg);
+               sha1close(f, tail_sha1, 0);
+               hashcpy(read_sha1, pack_sha1);
+               fixup_pack_header_footer(output_fd, pack_sha1,
+                                        curr_pack, nr_objects,
+                                        read_sha1, consumed_bytes-20);
+               if (hashcmp(read_sha1, tail_sha1) != 0)
+                       die("Unexpected tail checksum for %s "
+                           "(disk corruption?)", curr_pack);
+       }
+       if (nr_deltas != nr_resolved_deltas)
++              die(Q_("pack has %d unresolved delta",
++                     "pack has %d unresolved deltas",
++                     nr_deltas - nr_resolved_deltas),
+                   nr_deltas - nr_resolved_deltas);
+ }
  static int write_compressed(struct sha1file *f, void *in, unsigned int size)
  {
        git_zstream stream;