static const char *base_name;
static int progress = 1;
static int window = 10;
-static uint32_t pack_size_limit;
+static uint32_t pack_size_limit, pack_size_limit_cfg;
static int depth = 50;
static int delta_search_threads = 1;
static int pack_to_stdout;
/* nothing */;
deflateEnd(&stream);
datalen = stream.total_out;
- deflateEnd(&stream);
+
/*
* The object header is a byte of 'type' followed by zero or
* more bytes of length.
return freed_mem;
}
-static void find_deltas(struct object_entry **list, unsigned list_size,
+static void find_deltas(struct object_entry **list, unsigned *list_size,
int window, int depth, unsigned *processed)
{
- uint32_t i = 0, idx = 0, count = 0;
+ uint32_t i, idx = 0, count = 0;
unsigned int array_size = window * sizeof(struct unpacked);
struct unpacked *array;
unsigned long mem_usage = 0;
array = xmalloc(array_size);
memset(array, 0, array_size);
- do {
- struct object_entry *entry = list[i++];
+ for (;;) {
+ struct object_entry *entry = *list++;
struct unpacked *n = array + idx;
int j, max_depth, best_base = -1;
+ progress_lock();
+ if (!*list_size) {
+ progress_unlock();
+ break;
+ }
+ (*list_size)--;
+ if (!entry->preferred_base) {
+ (*processed)++;
+ display_progress(progress_state, *processed);
+ }
+ progress_unlock();
+
mem_usage -= free_unpacked(n);
n->entry = entry;
if (entry->preferred_base)
goto next;
- progress_lock();
- (*processed)++;
- display_progress(progress_state, *processed);
- progress_unlock();
-
/*
* If the current object is at pack edge, take the depth the
* objects that depend on the current object into account
count++;
if (idx >= window)
idx = 0;
- } while (i < list_size);
+ }
for (i = 0; i < window; ++i) {
free_delta_index(array[i].index);
#ifdef THREADED_DELTA_SEARCH
+/*
+ * The main thread waits on the condition that (at least) one of the workers
+ * has stopped working (which is indicated in the .working member of
+ * struct thread_params).
+ * When a work thread has completed its work, it sets .working to 0 and
+ * signals the main thread and waits on the condition that .data_ready
+ * becomes 1.
+ */
+
struct thread_params {
pthread_t thread;
struct object_entry **list;
unsigned list_size;
+ unsigned remaining;
int window;
int depth;
+ int working;
+ int data_ready;
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
unsigned *processed;
};
-static pthread_mutex_t data_request = PTHREAD_MUTEX_INITIALIZER;
-static pthread_mutex_t data_ready = PTHREAD_MUTEX_INITIALIZER;
-static pthread_mutex_t data_provider = PTHREAD_MUTEX_INITIALIZER;
-static struct thread_params *data_requester;
+static pthread_cond_t progress_cond = PTHREAD_COND_INITIALIZER;
static void *threaded_find_deltas(void *arg)
{
struct thread_params *me = arg;
- for (;;) {
- pthread_mutex_lock(&data_request);
- data_requester = me;
- pthread_mutex_unlock(&data_provider);
- pthread_mutex_lock(&data_ready);
- pthread_mutex_unlock(&data_request);
+ while (me->remaining) {
+ find_deltas(me->list, &me->remaining,
+ me->window, me->depth, me->processed);
- if (!me->list_size)
- return NULL;
+ progress_lock();
+ me->working = 0;
+ pthread_cond_signal(&progress_cond);
+ progress_unlock();
- find_deltas(me->list, me->list_size,
- me->window, me->depth, me->processed);
+ /*
+ * We must not set ->data_ready before we wait on the
+ * condition because the main thread may have set it to 1
+ * before we get here. In order to be sure that new
+ * work is available if we see 1 in ->data_ready, it
+ * was initialized to 0 before this thread was spawned
+ * and we reset it to 0 right away.
+ */
+ pthread_mutex_lock(&me->mutex);
+ while (!me->data_ready)
+ pthread_cond_wait(&me->cond, &me->mutex);
+ me->data_ready = 0;
+ pthread_mutex_unlock(&me->mutex);
}
+ /* leave ->working 1 so that this doesn't get more work assigned */
+ return NULL;
}
static void ll_find_deltas(struct object_entry **list, unsigned list_size,
int window, int depth, unsigned *processed)
{
- struct thread_params *target, p[delta_search_threads];
- int i, ret;
- unsigned chunk_size;
+ struct thread_params p[delta_search_threads];
+ int i, ret, active_threads = 0;
if (delta_search_threads <= 1) {
- find_deltas(list, list_size, window, depth, processed);
+ find_deltas(list, &list_size, window, depth, processed);
return;
}
- pthread_mutex_lock(&data_provider);
- pthread_mutex_lock(&data_ready);
-
+ /* Partition the work amongst work threads. */
for (i = 0; i < delta_search_threads; i++) {
+ unsigned sub_size = list_size / (delta_search_threads - i);
+
p[i].window = window;
p[i].depth = depth;
p[i].processed = processed;
+ p[i].working = 1;
+ p[i].data_ready = 0;
+
+ /* try to split chunks on "path" boundaries */
+ while (sub_size && sub_size < list_size &&
+ list[sub_size]->hash &&
+ list[sub_size]->hash == list[sub_size-1]->hash)
+ sub_size++;
+
+ p[i].list = list;
+ p[i].list_size = sub_size;
+ p[i].remaining = sub_size;
+
+ list += sub_size;
+ list_size -= sub_size;
+ }
+
+ /* Start work threads. */
+ for (i = 0; i < delta_search_threads; i++) {
+ if (!p[i].list_size)
+ continue;
+ pthread_mutex_init(&p[i].mutex, NULL);
+ pthread_cond_init(&p[i].cond, NULL);
ret = pthread_create(&p[i].thread, NULL,
threaded_find_deltas, &p[i]);
if (ret)
die("unable to create thread: %s", strerror(ret));
+ active_threads++;
}
- /* this should be auto-tuned somehow */
- chunk_size = window * 1000;
+ /*
+ * Now let's wait for work completion. Each time a thread is done
+ * with its work, we steal half of the remaining work from the
+ * thread with the largest number of unprocessed objects and give
+ * it to that newly idle thread. This ensure good load balancing
+ * until the remaining object list segments are simply too short
+ * to be worth splitting anymore.
+ */
+ while (active_threads) {
+ struct thread_params *target = NULL;
+ struct thread_params *victim = NULL;
+ unsigned sub_size = 0;
- do {
- unsigned sublist_size = chunk_size;
- if (sublist_size > list_size)
- sublist_size = list_size;
+ progress_lock();
+ for (;;) {
+ for (i = 0; !target && i < delta_search_threads; i++)
+ if (!p[i].working)
+ target = &p[i];
+ if (target)
+ break;
+ pthread_cond_wait(&progress_cond, &progress_mutex);
+ }
- /* try to split chunks on "path" boundaries */
- while (sublist_size < list_size && list[sublist_size]->hash &&
- list[sublist_size]->hash == list[sublist_size-1]->hash)
- sublist_size++;
-
- pthread_mutex_lock(&data_provider);
- target = data_requester;
- target->list = list;
- target->list_size = sublist_size;
- pthread_mutex_unlock(&data_ready);
-
- list += sublist_size;
- list_size -= sublist_size;
- if (!sublist_size) {
+ for (i = 0; i < delta_search_threads; i++)
+ if (p[i].remaining > 2*window &&
+ (!victim || victim->remaining < p[i].remaining))
+ victim = &p[i];
+ if (victim) {
+ sub_size = victim->remaining / 2;
+ list = victim->list + victim->list_size - sub_size;
+ while (sub_size && list[0]->hash &&
+ list[0]->hash == list[-1]->hash) {
+ list++;
+ sub_size--;
+ }
+ if (!sub_size) {
+ /*
+ * It is possible for some "paths" to have
+ * so many objects that no hash boundary
+ * might be found. Let's just steal the
+ * exact half in that case.
+ */
+ sub_size = victim->remaining / 2;
+ list -= sub_size;
+ }
+ target->list = list;
+ victim->list_size -= sub_size;
+ victim->remaining -= sub_size;
+ }
+ target->list_size = sub_size;
+ target->remaining = sub_size;
+ target->working = 1;
+ progress_unlock();
+
+ pthread_mutex_lock(&target->mutex);
+ target->data_ready = 1;
+ pthread_cond_signal(&target->cond);
+ pthread_mutex_unlock(&target->mutex);
+
+ if (!sub_size) {
pthread_join(target->thread, NULL);
- i--;
+ pthread_cond_destroy(&target->cond);
+ pthread_mutex_destroy(&target->mutex);
+ active_threads--;
}
- } while (i);
+ }
}
#else
-#define ll_find_deltas find_deltas
+#define ll_find_deltas(l, s, w, d, p) find_deltas(l, &s, w, d, p)
#endif
static void prepare_pack(int window, int depth)
die("bad pack.indexversion=%d", pack_idx_default_version);
return 0;
}
+ if (!strcmp(k, "pack.packsizelimit")) {
+ pack_size_limit_cfg = git_config_ulong(k, v);
+ return 0;
+ }
return git_default_config(k, v);
}
while (fgets(line, sizeof(line), stdin) != NULL) {
int len = strlen(line);
- if (line[len - 1] == '\n')
+ if (len && line[len - 1] == '\n')
line[--len] = 0;
if (!len)
break;
die("bad revision '%s'", line);
}
- prepare_revision_walk(&revs);
+ if (prepare_revision_walk(&revs))
+ die("revision walk setup failed");
mark_edges_uninteresting(revs.commits, &revs, show_edge);
traverse_commit_list(&revs, show_commit, show_object);
}
if (!prefixcmp(arg, "--max-pack-size=")) {
char *end;
+ pack_size_limit_cfg = 0;
pack_size_limit = strtoul(arg+16, &end, 0) * 1024 * 1024;
if (!arg[16] || *end)
usage(pack_usage);
if (pack_to_stdout != !base_name)
usage(pack_usage);
+ if (!pack_to_stdout && !pack_size_limit)
+ pack_size_limit = pack_size_limit_cfg;
+
if (pack_to_stdout && pack_size_limit)
die("--max-pack-size cannot be used to build a pack for transfer.");