From: Junio C Hamano Date: Wed, 20 Mar 2019 06:16:06 +0000 (+0900) Subject: Merge branch 'jk/no-sigpipe-during-network-transport' X-Git-Tag: v2.22.0-rc0~139 X-Git-Url: https://git.lorimer.id.au/gitweb.git/diff_plain/27cdbdd134f181fc97f9589039ed7c0d12759b5a?hp=-c Merge branch 'jk/no-sigpipe-during-network-transport' On platforms where "git fetch" is killed with SIGPIPE (e.g. OSX), the upload-pack that runs on the other end that hangs up after detecting an error could cause "git fetch" to die with a signal, which led to a flakey test. "git fetch" now ignores SIGPIPE during the network portion of its operation (this is not a problem as we check the return status from our write(2)s). * jk/no-sigpipe-during-network-transport: fetch: ignore SIGPIPE during network operation fetch: avoid calling write_or_die() --- 27cdbdd134f181fc97f9589039ed7c0d12759b5a diff --combined builtin/fetch.c index b620fd54b4,d10a67cc1c..4ba63d5ac6 --- a/builtin/fetch.c +++ b/builtin/fetch.c @@@ -317,7 -317,8 +317,7 @@@ static void find_non_local_tags(const s !has_object_file_with_flags(&ref->old_oid, OBJECT_INFO_QUICK) && !will_fetch(head, ref->old_oid.hash) && - !has_sha1_file_with_flags(item->oid.hash, - OBJECT_INFO_QUICK) && + !has_object_file_with_flags(&item->oid, OBJECT_INFO_QUICK) && !will_fetch(head, item->oid.hash)) oidclr(&item->oid); item = NULL; @@@ -331,7 -332,7 +331,7 @@@ * fetch. */ if (item && - !has_sha1_file_with_flags(item->oid.hash, OBJECT_INFO_QUICK) && + !has_object_file_with_flags(&item->oid, OBJECT_INFO_QUICK) && !will_fetch(head, item->oid.hash)) oidclr(&item->oid); @@@ -352,7 -353,7 +352,7 @@@ * checked to see if it needs fetching. */ if (item && - !has_sha1_file_with_flags(item->oid.hash, OBJECT_INFO_QUICK) && + !has_object_file_with_flags(&item->oid, OBJECT_INFO_QUICK) && !will_fetch(head, item->oid.hash)) oidclr(&item->oid); @@@ -628,14 -629,9 +628,14 @@@ static int find_and_replace(struct strb const char *needle, const char *placeholder) { - const char *p = strstr(haystack->buf, needle); + const char *p = NULL; int plen, nlen; + nlen = strlen(needle); + if (ends_with(haystack->buf, needle)) + p = haystack->buf + haystack->len - nlen; + else + p = strstr(haystack->buf, needle); if (!p) return 0; @@@ -643,6 -639,7 +643,6 @@@ return 0; plen = strlen(p); - nlen = strlen(needle); if (plen > nlen && p[nlen] != '/') return 0; @@@ -766,6 -763,9 +766,6 @@@ static int update_local_ref(struct ref what = _("[new ref]"); } - if ((recurse_submodules != RECURSE_SUBMODULES_OFF) && - (recurse_submodules != RECURSE_SUBMODULES_ON)) - check_for_new_submodule_commits(&ref->new_oid); r = s_update_ref(msg, ref, 0); format_display(display, r ? '!' : '*', what, r ? _("unable to update local ref") : NULL, @@@ -779,6 -779,9 +779,6 @@@ strbuf_add_unique_abbrev(&quickref, ¤t->object.oid, DEFAULT_ABBREV); strbuf_addstr(&quickref, ".."); strbuf_add_unique_abbrev(&quickref, &ref->new_oid, DEFAULT_ABBREV); - if ((recurse_submodules != RECURSE_SUBMODULES_OFF) && - (recurse_submodules != RECURSE_SUBMODULES_ON)) - check_for_new_submodule_commits(&ref->new_oid); r = s_update_ref("fast-forward", ref, 1); format_display(display, r ? '!' : ' ', quickref.buf, r ? _("unable to update local ref") : NULL, @@@ -791,6 -794,9 +791,6 @@@ strbuf_add_unique_abbrev(&quickref, ¤t->object.oid, DEFAULT_ABBREV); strbuf_addstr(&quickref, "..."); strbuf_add_unique_abbrev(&quickref, &ref->new_oid, DEFAULT_ABBREV); - if ((recurse_submodules != RECURSE_SUBMODULES_OFF) && - (recurse_submodules != RECURSE_SUBMODULES_ON)) - check_for_new_submodule_commits(&ref->new_oid); r = s_update_ref("forced-update", ref, 1); format_display(display, r ? '!' : '+', quickref.buf, r ? _("unable to update local ref") : _("forced update"), @@@ -886,8 -892,6 +886,8 @@@ static int store_updated_refs(const cha ref->force = rm->peer_ref->force; } + if (recurse_submodules != RECURSE_SUBMODULES_OFF) + check_for_new_submodule_commits(&rm->old_oid); if (!strcmp(rm->name, "HEAD")) { kind = ""; @@@ -1168,7 -1172,6 +1168,7 @@@ static void add_negotiation_tips(struc static struct transport *prepare_transport(struct remote *remote, int deepen) { struct transport *transport; + transport = transport_get(remote, NULL); transport_set_verbosity(transport, verbosity, progress); transport->family = family; @@@ -1188,13 -1191,9 +1188,13 @@@ if (update_shallow) set_option(transport, TRANS_OPT_UPDATE_SHALLOW, "yes"); if (filter_options.choice) { + struct strbuf expanded_filter_spec = STRBUF_INIT; + expand_list_objects_filter_spec(&filter_options, + &expanded_filter_spec); set_option(transport, TRANS_OPT_LIST_OBJECTS_FILTER, - filter_options.filter_spec); + expanded_filter_spec.buf); set_option(transport, TRANS_OPT_FROM_PROMISOR, "1"); + strbuf_release(&expanded_filter_spec); } if (negotiation_tip.nr) { if (transport->smart_options) @@@ -1479,8 -1478,7 +1479,8 @@@ static inline void fetch_one_setup_part */ if (strcmp(remote->name, repository_format_partial_clone)) { if (filter_options.choice) - die(_("--filter can only be used with the remote configured in core.partialClone")); + die(_("--filter can only be used with the remote " + "configured in extensions.partialClone")); return; } @@@ -1556,7 -1554,9 +1556,9 @@@ static int fetch_one(struct remote *rem sigchain_push_common(unlock_pack_on_signal); atexit(unlock_pack); + sigchain_push(SIGPIPE, SIG_IGN); exit_code = do_fetch(gtransport, &rs); + sigchain_pop(SIGPIPE); refspec_clear(&rs); transport_disconnect(gtransport); gtransport = NULL; @@@ -1648,8 -1648,7 +1650,8 @@@ int cmd_fetch(int argc, const char **ar result = fetch_one(remote, argc, argv, prune_tags_ok); } else { if (filter_options.choice) - die(_("--filter can only be used with the remote configured in core.partialClone")); + die(_("--filter can only be used with the remote " + "configured in extensions.partialclone")); /* TODO should this also die if we have a previous partial-clone? */ result = fetch_multiple(&list); } diff --combined fetch-pack.c index 812be15d7e,1dcb0f7cd7..e69993b2eb --- a/fetch-pack.c +++ b/fetch-pack.c @@@ -135,42 -135,38 +135,42 @@@ enum ack_type ACK_ready }; -static void consume_shallow_list(struct fetch_pack_args *args, int fd) +static void consume_shallow_list(struct fetch_pack_args *args, + struct packet_reader *reader) { if (args->stateless_rpc && args->deepen) { /* If we sent a depth we will get back "duplicate" * shallow and unshallow commands every time there * is a block of have lines exchanged. */ - char *line; - while ((line = packet_read_line(fd, NULL))) { - if (starts_with(line, "shallow ")) + while (packet_reader_read(reader) == PACKET_READ_NORMAL) { + if (starts_with(reader->line, "shallow ")) continue; - if (starts_with(line, "unshallow ")) + if (starts_with(reader->line, "unshallow ")) continue; die(_("git fetch-pack: expected shallow list")); } + if (reader->status != PACKET_READ_FLUSH) + die(_("git fetch-pack: expected a flush packet after shallow list")); } } -static enum ack_type get_ack(int fd, struct object_id *result_oid) +static enum ack_type get_ack(struct packet_reader *reader, + struct object_id *result_oid) { int len; - char *line = packet_read_line(fd, &len); const char *arg; - if (!line) + if (packet_reader_read(reader) != PACKET_READ_NORMAL) die(_("git fetch-pack: expected ACK/NAK, got a flush packet")); - if (!strcmp(line, "NAK")) + len = reader->pktlen; + + if (!strcmp(reader->line, "NAK")) return NAK; - if (skip_prefix(line, "ACK ", &arg)) { + if (skip_prefix(reader->line, "ACK ", &arg)) { if (!get_oid_hex(arg, result_oid)) { arg += 40; - len -= arg - line; + len -= arg - reader->line; if (len < 1) return ACK; if (strstr(arg, "continue")) @@@ -182,7 -178,9 +182,7 @@@ return ACK; } } - if (skip_prefix(line, "ERR ", &arg)) - die(_("remote error: %s"), arg); - die(_("git fetch-pack: expected ACK/NAK, got '%s'"), line); + die(_("git fetch-pack: expected ACK/NAK, got '%s'"), reader->line); } static void send_request(struct fetch_pack_args *args, @@@ -191,8 -189,10 +191,10 @@@ if (args->stateless_rpc) { send_sideband(fd, -1, buf->buf, buf->len, LARGE_PACKET_MAX); packet_flush(fd); - } else - write_or_die(fd, buf->buf, buf->len); + } else { + if (write_in_full(fd, buf->buf, buf->len) < 0) + die_errno(_("unable to write to remote")); + } } static void insert_one_alternate_object(struct fetch_negotiator *negotiator, @@@ -250,15 -250,10 +252,15 @@@ static int find_common(struct fetch_neg int got_ready = 0; struct strbuf req_buf = STRBUF_INIT; size_t state_len = 0; + struct packet_reader reader; if (args->stateless_rpc && multi_ack == 1) die(_("--stateless-rpc requires multi_ack_detailed")); + packet_reader_init(&reader, fd[0], NULL, 0, + PACKET_READ_CHOMP_NEWLINE | + PACKET_READ_DIE_ON_ERR_PACKET); + if (!args->no_dependents) { mark_tips(negotiator, args->negotiation_tips); for_each_cached_alternate(negotiator, insert_one_alternate_object); @@@ -336,42 -331,38 +338,42 @@@ packet_buf_write(&req_buf, "deepen-not %s", s->string); } } - if (server_supports_filtering && args->filter_options.choice) + if (server_supports_filtering && args->filter_options.choice) { + struct strbuf expanded_filter_spec = STRBUF_INIT; + expand_list_objects_filter_spec(&args->filter_options, + &expanded_filter_spec); packet_buf_write(&req_buf, "filter %s", - args->filter_options.filter_spec); + expanded_filter_spec.buf); + strbuf_release(&expanded_filter_spec); + } packet_buf_flush(&req_buf); state_len = req_buf.len; if (args->deepen) { - char *line; const char *arg; struct object_id oid; send_request(args, fd[1], &req_buf); - while ((line = packet_read_line(fd[0], NULL))) { - if (skip_prefix(line, "shallow ", &arg)) { + while (packet_reader_read(&reader) == PACKET_READ_NORMAL) { + if (skip_prefix(reader.line, "shallow ", &arg)) { if (get_oid_hex(arg, &oid)) - die(_("invalid shallow line: %s"), line); + die(_("invalid shallow line: %s"), reader.line); register_shallow(the_repository, &oid); continue; } - if (skip_prefix(line, "unshallow ", &arg)) { + if (skip_prefix(reader.line, "unshallow ", &arg)) { if (get_oid_hex(arg, &oid)) - die(_("invalid unshallow line: %s"), line); + die(_("invalid unshallow line: %s"), reader.line); if (!lookup_object(the_repository, oid.hash)) - die(_("object not found: %s"), line); + die(_("object not found: %s"), reader.line); /* make sure that it is parsed as shallow */ if (!parse_object(the_repository, &oid)) - die(_("error in object: %s"), line); + die(_("error in object: %s"), reader.line); if (unregister_shallow(&oid)) - die(_("no shallow found: %s"), line); + die(_("no shallow found: %s"), reader.line); continue; } - die(_("expected shallow/unshallow, got %s"), line); + die(_("expected shallow/unshallow, got %s"), reader.line); } } else if (!args->stateless_rpc) send_request(args, fd[1], &req_buf); @@@ -408,9 -399,9 +410,9 @@@ if (!args->stateless_rpc && count == INITIAL_FLUSH) continue; - consume_shallow_list(args, fd[0]); + consume_shallow_list(args, &reader); do { - ack = get_ack(fd[0], result_oid); + ack = get_ack(&reader, result_oid); if (ack) print_verbose(args, _("got %s %d %s"), "ack", ack, oid_to_hex(result_oid)); @@@ -480,9 -471,9 +482,9 @@@ done strbuf_release(&req_buf); if (!got_ready || !no_done) - consume_shallow_list(args, fd[0]); + consume_shallow_list(args, &reader); while (flushes || multi_ack) { - int ack = get_ack(fd[0], result_oid); + int ack = get_ack(&reader, result_oid); if (ack) { print_verbose(args, _("got %s (%d) %s"), "ack", ack, oid_to_hex(result_oid)); @@@ -647,6 -638,23 +649,6 @@@ struct loose_object_iter struct ref *refs; }; -/* - * If the number of refs is not larger than the number of loose objects, - * this function stops inserting. - */ -static int add_loose_objects_to_set(const struct object_id *oid, - const char *path, - void *data) -{ - struct loose_object_iter *iter = data; - oidset_insert(iter->loose_object_set, oid); - if (iter->refs == NULL) - return 1; - - iter->refs = iter->refs->next; - return 0; -} - /* * Mark recent commits available locally and reachable from a local ref as * COMPLETE. If args->no_dependents is false, also mark COMPLETE remote refs as @@@ -664,14 -672,30 +666,14 @@@ static void mark_complete_and_common_re struct ref *ref; int old_save_commit_buffer = save_commit_buffer; timestamp_t cutoff = 0; - struct oidset loose_oid_set = OIDSET_INIT; - int use_oidset = 0; - struct loose_object_iter iter = {&loose_oid_set, *refs}; - - /* Enumerate all loose objects or know refs are not so many. */ - use_oidset = !for_each_loose_object(add_loose_objects_to_set, - &iter, 0); save_commit_buffer = 0; for (ref = *refs; ref; ref = ref->next) { struct object *o; - unsigned int flags = OBJECT_INFO_QUICK; - if (use_oidset && - !oidset_contains(&loose_oid_set, &ref->old_oid)) { - /* - * I know this does not exist in the loose form, - * so check if it exists in a non-loose form. - */ - flags |= OBJECT_INFO_IGNORE_LOOSE; - } - - if (!has_object_file_with_flags(&ref->old_oid, flags)) + if (!has_object_file_with_flags(&ref->old_oid, + OBJECT_INFO_QUICK)) continue; o = parse_object(the_repository, &ref->old_oid); if (!o) @@@ -688,6 -712,8 +690,6 @@@ } } - oidset_clear(&loose_oid_set); - if (!args->deepen) { for_each_ref(mark_complete_oid, NULL); for_each_cached_alternate(NULL, mark_alternate_complete); @@@ -1018,8 -1044,6 +1020,8 @@@ static void add_shallow_requests(struc packet_buf_write(req_buf, "deepen-not %s", s->string); } } + if (args->deepen_relative) + packet_buf_write(req_buf, "deepen-relative\n"); } static void add_wants(int no_dependents, const struct ref *wants, struct strbuf *req_buf) @@@ -1097,8 -1121,7 +1099,8 @@@ static int add_haves(struct fetch_negot static int send_fetch_request(struct fetch_negotiator *negotiator, int fd_out, const struct fetch_pack_args *args, const struct ref *wants, struct oidset *common, - int *haves_to_send, int *in_vain) + int *haves_to_send, int *in_vain, + int sideband_all) { int ret = 0; struct strbuf req_buf = STRBUF_INIT; @@@ -1124,8 -1147,6 +1126,8 @@@ packet_buf_write(&req_buf, "include-tag"); if (prefer_ofs_delta) packet_buf_write(&req_buf, "ofs-delta"); + if (sideband_all) + packet_buf_write(&req_buf, "sideband-all"); /* Add shallow-info and deepen request */ if (server_supports_feature("fetch", "shallow", 0)) @@@ -1136,13 -1157,9 +1138,13 @@@ /* Add filter */ if (server_supports_feature("fetch", "filter", 0) && args->filter_options.choice) { + struct strbuf expanded_filter_spec = STRBUF_INIT; print_verbose(args, _("Server supports filter")); + expand_list_objects_filter_spec(&args->filter_options, + &expanded_filter_spec); packet_buf_write(&req_buf, "filter %s", - args->filter_options.filter_spec); + expanded_filter_spec.buf); + strbuf_release(&expanded_filter_spec); } else if (args->filter_options.choice) { warning("filtering not recognized by server, ignoring"); } @@@ -1163,7 -1180,8 +1165,8 @@@ /* Send request */ packet_buf_flush(&req_buf); - write_or_die(fd_out, req_buf.buf, req_buf.len); + if (write_in_full(fd_out, req_buf.buf, req_buf.len) < 0) + die_errno(_("unable to write request to remote")); strbuf_release(&req_buf); return ret; @@@ -1252,8 -1270,6 +1255,8 @@@ static int process_acks(struct fetch_ne static void receive_shallow_info(struct fetch_pack_args *args, struct packet_reader *reader) { + int line_received = 0; + process_section_header(reader, "shallow-info", 0); while (packet_reader_read(reader) == PACKET_READ_NORMAL) { const char *arg; @@@ -1263,7 -1279,6 +1266,7 @@@ if (get_oid_hex(arg, &oid)) die(_("invalid shallow line: %s"), reader->line); register_shallow(the_repository, &oid); + line_received = 1; continue; } if (skip_prefix(reader->line, "unshallow ", &arg)) { @@@ -1276,7 -1291,6 +1279,7 @@@ die(_("error in object: %s"), reader->line); if (unregister_shallow(&oid)) die(_("no shallow found: %s"), reader->line); + line_received = 1; continue; } die(_("expected shallow/unshallow, got %s"), reader->line); @@@ -1286,13 -1300,8 +1289,13 @@@ reader->status != PACKET_READ_DELIM) die(_("error processing shallow info: %d"), reader->status); - setup_alternate_shallow(&shallow_lock, &alternate_shallow_file, NULL); - args->deepen = 1; + if (line_received) { + setup_alternate_shallow(&shallow_lock, &alternate_shallow_file, + NULL); + args->deepen = 1; + } else { + alternate_shallow_file = NULL; + } } static void receive_wanted_refs(struct packet_reader *reader, @@@ -1345,13 -1354,7 +1348,13 @@@ static struct ref *do_fetch_pack_v2(str struct fetch_negotiator negotiator; fetch_negotiator_init(&negotiator, negotiation_algorithm); packet_reader_init(&reader, fd[0], NULL, 0, - PACKET_READ_CHOMP_NEWLINE); + PACKET_READ_CHOMP_NEWLINE | + PACKET_READ_DIE_ON_ERR_PACKET); + if (git_env_bool("GIT_TEST_SIDEBAND_ALL", 1) && + server_supports_feature("fetch", "sideband-all", 0)) { + reader.use_sideband = 1; + reader.me = "fetch-pack"; + } while (state != FETCH_DONE) { switch (state) { @@@ -1385,8 -1388,7 +1388,8 @@@ case FETCH_SEND_REQUEST: if (send_fetch_request(&negotiator, fd[1], args, ref, &common, - &haves_to_send, &in_vain)) + &haves_to_send, &in_vain, + reader.use_sideband)) state = FETCH_GET_PACK; else state = FETCH_PROCESS_ACKS; @@@ -1516,7 -1518,6 +1519,7 @@@ static void update_shallow(struct fetch rollback_lock_file(&shallow_lock); } else commit_lock_file(&shallow_lock); + alternate_shallow_file = NULL; return; } @@@ -1540,7 -1541,6 +1543,7 @@@ &alternate_shallow_file, &extra); commit_lock_file(&shallow_lock); + alternate_shallow_file = NULL; } oid_array_clear(&extra); return; @@@ -1580,7 -1580,6 +1583,7 @@@ commit_lock_file(&shallow_lock); oid_array_clear(&extra); oid_array_clear(&ref); + alternate_shallow_file = NULL; return; } diff --combined pkt-line.c index 60329b301b,9944c7c75d..ffd7220544 --- a/pkt-line.c +++ b/pkt-line.c @@@ -88,13 -88,15 +88,15 @@@ static void packet_trace(const char *bu void packet_flush(int fd) { packet_trace("0000", 4, 1); - write_or_die(fd, "0000", 4); + if (write_in_full(fd, "0000", 4) < 0) + die_errno(_("unable to write flush packet")); } void packet_delim(int fd) { packet_trace("0001", 4, 1); - write_or_die(fd, "0001", 4); + if (write_in_full(fd, "0001", 4) < 0) + die_errno(_("unable to write delim packet")); } int packet_flush_gently(int fd) @@@ -117,7 -119,7 +119,7 @@@ void packet_buf_delim(struct strbuf *bu strbuf_add(buf, "0001", 4); } -static void set_packet_header(char *buf, const int size) +void set_packet_header(char *buf, const int size) { static char hexchar[] = "0123456789abcdef"; @@@ -129,14 -131,12 +131,14 @@@ #undef hex } -static void format_packet(struct strbuf *out, const char *fmt, va_list args) +static void format_packet(struct strbuf *out, const char *prefix, + const char *fmt, va_list args) { size_t orig_len, n; orig_len = out->len; strbuf_addstr(out, "0000"); + strbuf_addstr(out, prefix); strbuf_vaddf(out, fmt, args); n = out->len - orig_len; @@@ -147,13 -147,13 +149,13 @@@ packet_trace(out->buf + orig_len + 4, n - 4, 1); } -static int packet_write_fmt_1(int fd, int gently, +static int packet_write_fmt_1(int fd, int gently, const char *prefix, const char *fmt, va_list args) { static struct strbuf buf = STRBUF_INIT; strbuf_reset(&buf); - format_packet(&buf, fmt, args); + format_packet(&buf, prefix, fmt, args); if (write_in_full(fd, buf.buf, buf.len) < 0) { if (!gently) { check_pipe(errno); @@@ -170,7 -170,7 +172,7 @@@ void packet_write_fmt(int fd, const cha va_list args; va_start(args, fmt); - packet_write_fmt_1(fd, 0, fmt, args); + packet_write_fmt_1(fd, 0, "", fmt, args); va_end(args); } @@@ -180,7 -180,7 +182,7 @@@ int packet_write_fmt_gently(int fd, con va_list args; va_start(args, fmt); - status = packet_write_fmt_1(fd, 1, fmt, args); + status = packet_write_fmt_1(fd, 1, "", fmt, args); va_end(args); return status; } @@@ -213,7 -213,7 +215,7 @@@ void packet_buf_write(struct strbuf *bu va_list args; va_start(args, fmt); - format_packet(buf, fmt, args); + format_packet(buf, "", fmt, args); va_end(args); } @@@ -348,10 -348,6 +350,10 @@@ enum packet_read_status packet_read_wit return PACKET_READ_EOF; } + if ((options & PACKET_READ_DIE_ON_ERR_PACKET) && + starts_with(buffer, "ERR ")) + die(_("remote error: %s"), buffer + 4); + if ((options & PACKET_READ_CHOMP_NEWLINE) && len && buffer[len-1] == '\n') len--; @@@ -439,29 -435,6 +441,29 @@@ ssize_t read_packetized_to_strbuf(int f return sb_out->len - orig_len; } +int recv_sideband(const char *me, int in_stream, int out) +{ + char buf[LARGE_PACKET_MAX + 1]; + int len; + struct strbuf scratch = STRBUF_INIT; + enum sideband_type sideband_type; + + while (1) { + len = packet_read(in_stream, NULL, NULL, buf, LARGE_PACKET_MAX, + 0); + if (!demultiplex_sideband(me, buf, len, 0, &scratch, + &sideband_type)) + continue; + switch (sideband_type) { + case SIDEBAND_PRIMARY: + write_or_die(out, buf + 1, len - 1); + break; + default: /* errors: message already written */ + return sideband_type; + } + } +} + /* Packet Reader Functions */ void packet_reader_init(struct packet_reader *reader, int fd, char *src_buffer, size_t src_len, @@@ -475,43 -448,25 +477,43 @@@ reader->buffer = packet_buffer; reader->buffer_size = sizeof(packet_buffer); reader->options = options; + reader->me = "git"; } enum packet_read_status packet_reader_read(struct packet_reader *reader) { + struct strbuf scratch = STRBUF_INIT; + if (reader->line_peeked) { reader->line_peeked = 0; return reader->status; } - reader->status = packet_read_with_status(reader->fd, - &reader->src_buffer, - &reader->src_len, - reader->buffer, - reader->buffer_size, - &reader->pktlen, - reader->options); + /* + * Consume all progress packets until a primary payload packet is + * received + */ + while (1) { + enum sideband_type sideband_type; + reader->status = packet_read_with_status(reader->fd, + &reader->src_buffer, + &reader->src_len, + reader->buffer, + reader->buffer_size, + &reader->pktlen, + reader->options); + if (!reader->use_sideband) + break; + if (demultiplex_sideband(reader->me, reader->buffer, + reader->pktlen, 1, &scratch, + &sideband_type)) + break; + } if (reader->status == PACKET_READ_NORMAL) - reader->line = reader->buffer; + /* Skip the sideband designator if sideband is used */ + reader->line = reader->use_sideband ? + reader->buffer + 1 : reader->buffer; else reader->line = NULL; @@@ -529,39 -484,3 +531,39 @@@ enum packet_read_status packet_reader_p reader->line_peeked = 1; return reader->status; } + +void packet_writer_init(struct packet_writer *writer, int dest_fd) +{ + writer->dest_fd = dest_fd; + writer->use_sideband = 0; +} + +void packet_writer_write(struct packet_writer *writer, const char *fmt, ...) +{ + va_list args; + + va_start(args, fmt); + packet_write_fmt_1(writer->dest_fd, 0, + writer->use_sideband ? "\001" : "", fmt, args); + va_end(args); +} + +void packet_writer_error(struct packet_writer *writer, const char *fmt, ...) +{ + va_list args; + + va_start(args, fmt); + packet_write_fmt_1(writer->dest_fd, 0, + writer->use_sideband ? "\003" : "ERR ", fmt, args); + va_end(args); +} + +void packet_writer_delim(struct packet_writer *writer) +{ + packet_delim(writer->dest_fd); +} + +void packet_writer_flush(struct packet_writer *writer) +{ + packet_flush(writer->dest_fd); +}