streaming.con commit Merge branch 'dl/use-sq-from-test-lib' (d693345)
   1/*
   2 * Copyright (c) 2011, Google Inc.
   3 */
   4#include "cache.h"
   5#include "streaming.h"
   6#include "repository.h"
   7#include "object-store.h"
   8#include "replace-object.h"
   9#include "packfile.h"
  10
  11enum input_source {
  12        stream_error = -1,
  13        incore = 0,
  14        loose = 1,
  15        pack_non_delta = 2
  16};
  17
  18typedef int (*open_istream_fn)(struct git_istream *,
  19                               struct object_info *,
  20                               const struct object_id *,
  21                               enum object_type *);
  22typedef int (*close_istream_fn)(struct git_istream *);
  23typedef ssize_t (*read_istream_fn)(struct git_istream *, char *, size_t);
  24
  25struct stream_vtbl {
  26        close_istream_fn close;
  27        read_istream_fn read;
  28};
  29
  30#define open_method_decl(name) \
  31        int open_istream_ ##name \
  32        (struct git_istream *st, struct object_info *oi, \
  33         const struct object_id *oid, \
  34         enum object_type *type)
  35
  36#define close_method_decl(name) \
  37        int close_istream_ ##name \
  38        (struct git_istream *st)
  39
  40#define read_method_decl(name) \
  41        ssize_t read_istream_ ##name \
  42        (struct git_istream *st, char *buf, size_t sz)
  43
  44/* forward declaration */
  45static open_method_decl(incore);
  46static open_method_decl(loose);
  47static open_method_decl(pack_non_delta);
  48static struct git_istream *attach_stream_filter(struct git_istream *st,
  49                                                struct stream_filter *filter);
  50
  51
  52static open_istream_fn open_istream_tbl[] = {
  53        open_istream_incore,
  54        open_istream_loose,
  55        open_istream_pack_non_delta,
  56};
  57
  58#define FILTER_BUFFER (1024*16)
  59
  60struct filtered_istream {
  61        struct git_istream *upstream;
  62        struct stream_filter *filter;
  63        char ibuf[FILTER_BUFFER];
  64        char obuf[FILTER_BUFFER];
  65        int i_end, i_ptr;
  66        int o_end, o_ptr;
  67        int input_finished;
  68};
  69
  70struct git_istream {
  71        const struct stream_vtbl *vtbl;
  72        unsigned long size; /* inflated size of full object */
  73        git_zstream z;
  74        enum { z_unused, z_used, z_done, z_error } z_state;
  75
  76        union {
  77                struct {
  78                        char *buf; /* from read_object() */
  79                        unsigned long read_ptr;
  80                } incore;
  81
  82                struct {
  83                        void *mapped;
  84                        unsigned long mapsize;
  85                        char hdr[32];
  86                        int hdr_avail;
  87                        int hdr_used;
  88                } loose;
  89
  90                struct {
  91                        struct packed_git *pack;
  92                        off_t pos;
  93                } in_pack;
  94
  95                struct filtered_istream filtered;
  96        } u;
  97};
  98
  99int close_istream(struct git_istream *st)
 100{
 101        int r = st->vtbl->close(st);
 102        free(st);
 103        return r;
 104}
 105
 106ssize_t read_istream(struct git_istream *st, void *buf, size_t sz)
 107{
 108        return st->vtbl->read(st, buf, sz);
 109}
 110
 111static enum input_source istream_source(const struct object_id *oid,
 112                                        enum object_type *type,
 113                                        struct object_info *oi)
 114{
 115        unsigned long size;
 116        int status;
 117
 118        oi->typep = type;
 119        oi->sizep = &size;
 120        status = oid_object_info_extended(the_repository, oid, oi, 0);
 121        if (status < 0)
 122                return stream_error;
 123
 124        switch (oi->whence) {
 125        case OI_LOOSE:
 126                return loose;
 127        case OI_PACKED:
 128                if (!oi->u.packed.is_delta && big_file_threshold < size)
 129                        return pack_non_delta;
 130                /* fallthru */
 131        default:
 132                return incore;
 133        }
 134}
 135
 136struct git_istream *open_istream(const struct object_id *oid,
 137                                 enum object_type *type,
 138                                 unsigned long *size,
 139                                 struct stream_filter *filter)
 140{
 141        struct git_istream *st;
 142        struct object_info oi = OBJECT_INFO_INIT;
 143        const struct object_id *real = lookup_replace_object(the_repository, oid);
 144        enum input_source src = istream_source(real, type, &oi);
 145
 146        if (src < 0)
 147                return NULL;
 148
 149        st = xmalloc(sizeof(*st));
 150        if (open_istream_tbl[src](st, &oi, real, type)) {
 151                if (open_istream_incore(st, &oi, real, type)) {
 152                        free(st);
 153                        return NULL;
 154                }
 155        }
 156        if (filter) {
 157                /* Add "&& !is_null_stream_filter(filter)" for performance */
 158                struct git_istream *nst = attach_stream_filter(st, filter);
 159                if (!nst) {
 160                        close_istream(st);
 161                        return NULL;
 162                }
 163                st = nst;
 164        }
 165
 166        *size = st->size;
 167        return st;
 168}
 169
 170
 171/*****************************************************************
 172 *
 173 * Common helpers
 174 *
 175 *****************************************************************/
 176
 177static void close_deflated_stream(struct git_istream *st)
 178{
 179        if (st->z_state == z_used)
 180                git_inflate_end(&st->z);
 181}
 182
 183
 184/*****************************************************************
 185 *
 186 * Filtered stream
 187 *
 188 *****************************************************************/
 189
 190static close_method_decl(filtered)
 191{
 192        free_stream_filter(st->u.filtered.filter);
 193        return close_istream(st->u.filtered.upstream);
 194}
 195
 196static read_method_decl(filtered)
 197{
 198        struct filtered_istream *fs = &(st->u.filtered);
 199        size_t filled = 0;
 200
 201        while (sz) {
 202                /* do we already have filtered output? */
 203                if (fs->o_ptr < fs->o_end) {
 204                        size_t to_move = fs->o_end - fs->o_ptr;
 205                        if (sz < to_move)
 206                                to_move = sz;
 207                        memcpy(buf + filled, fs->obuf + fs->o_ptr, to_move);
 208                        fs->o_ptr += to_move;
 209                        sz -= to_move;
 210                        filled += to_move;
 211                        continue;
 212                }
 213                fs->o_end = fs->o_ptr = 0;
 214
 215                /* do we have anything to feed the filter with? */
 216                if (fs->i_ptr < fs->i_end) {
 217                        size_t to_feed = fs->i_end - fs->i_ptr;
 218                        size_t to_receive = FILTER_BUFFER;
 219                        if (stream_filter(fs->filter,
 220                                          fs->ibuf + fs->i_ptr, &to_feed,
 221                                          fs->obuf, &to_receive))
 222                                return -1;
 223                        fs->i_ptr = fs->i_end - to_feed;
 224                        fs->o_end = FILTER_BUFFER - to_receive;
 225                        continue;
 226                }
 227
 228                /* tell the filter to drain upon no more input */
 229                if (fs->input_finished) {
 230                        size_t to_receive = FILTER_BUFFER;
 231                        if (stream_filter(fs->filter,
 232                                          NULL, NULL,
 233                                          fs->obuf, &to_receive))
 234                                return -1;
 235                        fs->o_end = FILTER_BUFFER - to_receive;
 236                        if (!fs->o_end)
 237                                break;
 238                        continue;
 239                }
 240                fs->i_end = fs->i_ptr = 0;
 241
 242                /* refill the input from the upstream */
 243                if (!fs->input_finished) {
 244                        fs->i_end = read_istream(fs->upstream, fs->ibuf, FILTER_BUFFER);
 245                        if (fs->i_end < 0)
 246                                return -1;
 247                        if (fs->i_end)
 248                                continue;
 249                }
 250                fs->input_finished = 1;
 251        }
 252        return filled;
 253}
 254
 255static struct stream_vtbl filtered_vtbl = {
 256        close_istream_filtered,
 257        read_istream_filtered,
 258};
 259
 260static struct git_istream *attach_stream_filter(struct git_istream *st,
 261                                                struct stream_filter *filter)
 262{
 263        struct git_istream *ifs = xmalloc(sizeof(*ifs));
 264        struct filtered_istream *fs = &(ifs->u.filtered);
 265
 266        ifs->vtbl = &filtered_vtbl;
 267        fs->upstream = st;
 268        fs->filter = filter;
 269        fs->i_end = fs->i_ptr = 0;
 270        fs->o_end = fs->o_ptr = 0;
 271        fs->input_finished = 0;
 272        ifs->size = -1; /* unknown */
 273        return ifs;
 274}
 275
 276/*****************************************************************
 277 *
 278 * Loose object stream
 279 *
 280 *****************************************************************/
 281
 282static read_method_decl(loose)
 283{
 284        size_t total_read = 0;
 285
 286        switch (st->z_state) {
 287        case z_done:
 288                return 0;
 289        case z_error:
 290                return -1;
 291        default:
 292                break;
 293        }
 294
 295        if (st->u.loose.hdr_used < st->u.loose.hdr_avail) {
 296                size_t to_copy = st->u.loose.hdr_avail - st->u.loose.hdr_used;
 297                if (sz < to_copy)
 298                        to_copy = sz;
 299                memcpy(buf, st->u.loose.hdr + st->u.loose.hdr_used, to_copy);
 300                st->u.loose.hdr_used += to_copy;
 301                total_read += to_copy;
 302        }
 303
 304        while (total_read < sz) {
 305                int status;
 306
 307                st->z.next_out = (unsigned char *)buf + total_read;
 308                st->z.avail_out = sz - total_read;
 309                status = git_inflate(&st->z, Z_FINISH);
 310
 311                total_read = st->z.next_out - (unsigned char *)buf;
 312
 313                if (status == Z_STREAM_END) {
 314                        git_inflate_end(&st->z);
 315                        st->z_state = z_done;
 316                        break;
 317                }
 318                if (status != Z_OK && (status != Z_BUF_ERROR || total_read < sz)) {
 319                        git_inflate_end(&st->z);
 320                        st->z_state = z_error;
 321                        return -1;
 322                }
 323        }
 324        return total_read;
 325}
 326
 327static close_method_decl(loose)
 328{
 329        close_deflated_stream(st);
 330        munmap(st->u.loose.mapped, st->u.loose.mapsize);
 331        return 0;
 332}
 333
 334static struct stream_vtbl loose_vtbl = {
 335        close_istream_loose,
 336        read_istream_loose,
 337};
 338
 339static open_method_decl(loose)
 340{
 341        st->u.loose.mapped = map_loose_object(the_repository,
 342                                              oid, &st->u.loose.mapsize);
 343        if (!st->u.loose.mapped)
 344                return -1;
 345        if ((unpack_loose_header(&st->z,
 346                                 st->u.loose.mapped,
 347                                 st->u.loose.mapsize,
 348                                 st->u.loose.hdr,
 349                                 sizeof(st->u.loose.hdr)) < 0) ||
 350            (parse_loose_header(st->u.loose.hdr, &st->size) < 0)) {
 351                git_inflate_end(&st->z);
 352                munmap(st->u.loose.mapped, st->u.loose.mapsize);
 353                return -1;
 354        }
 355
 356        st->u.loose.hdr_used = strlen(st->u.loose.hdr) + 1;
 357        st->u.loose.hdr_avail = st->z.total_out;
 358        st->z_state = z_used;
 359
 360        st->vtbl = &loose_vtbl;
 361        return 0;
 362}
 363
 364
 365/*****************************************************************
 366 *
 367 * Non-delta packed object stream
 368 *
 369 *****************************************************************/
 370
 371static read_method_decl(pack_non_delta)
 372{
 373        size_t total_read = 0;
 374
 375        switch (st->z_state) {
 376        case z_unused:
 377                memset(&st->z, 0, sizeof(st->z));
 378                git_inflate_init(&st->z);
 379                st->z_state = z_used;
 380                break;
 381        case z_done:
 382                return 0;
 383        case z_error:
 384                return -1;
 385        case z_used:
 386                break;
 387        }
 388
 389        while (total_read < sz) {
 390                int status;
 391                struct pack_window *window = NULL;
 392                unsigned char *mapped;
 393
 394                mapped = use_pack(st->u.in_pack.pack, &window,
 395                                  st->u.in_pack.pos, &st->z.avail_in);
 396
 397                st->z.next_out = (unsigned char *)buf + total_read;
 398                st->z.avail_out = sz - total_read;
 399                st->z.next_in = mapped;
 400                status = git_inflate(&st->z, Z_FINISH);
 401
 402                st->u.in_pack.pos += st->z.next_in - mapped;
 403                total_read = st->z.next_out - (unsigned char *)buf;
 404                unuse_pack(&window);
 405
 406                if (status == Z_STREAM_END) {
 407                        git_inflate_end(&st->z);
 408                        st->z_state = z_done;
 409                        break;
 410                }
 411
 412                /*
 413                 * Unlike the loose object case, we do not have to worry here
 414                 * about running out of input bytes and spinning infinitely. If
 415                 * we get Z_BUF_ERROR due to too few input bytes, then we'll
 416                 * replenish them in the next use_pack() call when we loop. If
 417                 * we truly hit the end of the pack (i.e., because it's corrupt
 418                 * or truncated), then use_pack() catches that and will die().
 419                 */
 420                if (status != Z_OK && status != Z_BUF_ERROR) {
 421                        git_inflate_end(&st->z);
 422                        st->z_state = z_error;
 423                        return -1;
 424                }
 425        }
 426        return total_read;
 427}
 428
 429static close_method_decl(pack_non_delta)
 430{
 431        close_deflated_stream(st);
 432        return 0;
 433}
 434
 435static struct stream_vtbl pack_non_delta_vtbl = {
 436        close_istream_pack_non_delta,
 437        read_istream_pack_non_delta,
 438};
 439
 440static open_method_decl(pack_non_delta)
 441{
 442        struct pack_window *window;
 443        enum object_type in_pack_type;
 444
 445        st->u.in_pack.pack = oi->u.packed.pack;
 446        st->u.in_pack.pos = oi->u.packed.offset;
 447        window = NULL;
 448
 449        in_pack_type = unpack_object_header(st->u.in_pack.pack,
 450                                            &window,
 451                                            &st->u.in_pack.pos,
 452                                            &st->size);
 453        unuse_pack(&window);
 454        switch (in_pack_type) {
 455        default:
 456                return -1; /* we do not do deltas for now */
 457        case OBJ_COMMIT:
 458        case OBJ_TREE:
 459        case OBJ_BLOB:
 460        case OBJ_TAG:
 461                break;
 462        }
 463        st->z_state = z_unused;
 464        st->vtbl = &pack_non_delta_vtbl;
 465        return 0;
 466}
 467
 468
 469/*****************************************************************
 470 *
 471 * In-core stream
 472 *
 473 *****************************************************************/
 474
 475static close_method_decl(incore)
 476{
 477        free(st->u.incore.buf);
 478        return 0;
 479}
 480
 481static read_method_decl(incore)
 482{
 483        size_t read_size = sz;
 484        size_t remainder = st->size - st->u.incore.read_ptr;
 485
 486        if (remainder <= read_size)
 487                read_size = remainder;
 488        if (read_size) {
 489                memcpy(buf, st->u.incore.buf + st->u.incore.read_ptr, read_size);
 490                st->u.incore.read_ptr += read_size;
 491        }
 492        return read_size;
 493}
 494
 495static struct stream_vtbl incore_vtbl = {
 496        close_istream_incore,
 497        read_istream_incore,
 498};
 499
 500static open_method_decl(incore)
 501{
 502        st->u.incore.buf = read_object_file_extended(the_repository, oid, type, &st->size, 0);
 503        st->u.incore.read_ptr = 0;
 504        st->vtbl = &incore_vtbl;
 505
 506        return st->u.incore.buf ? 0 : -1;
 507}
 508
 509
 510/****************************************************************
 511 * Users of streaming interface
 512 ****************************************************************/
 513
 514int stream_blob_to_fd(int fd, const struct object_id *oid, struct stream_filter *filter,
 515                      int can_seek)
 516{
 517        struct git_istream *st;
 518        enum object_type type;
 519        unsigned long sz;
 520        ssize_t kept = 0;
 521        int result = -1;
 522
 523        st = open_istream(oid, &type, &sz, filter);
 524        if (!st) {
 525                if (filter)
 526                        free_stream_filter(filter);
 527                return result;
 528        }
 529        if (type != OBJ_BLOB)
 530                goto close_and_exit;
 531        for (;;) {
 532                char buf[1024 * 16];
 533                ssize_t wrote, holeto;
 534                ssize_t readlen = read_istream(st, buf, sizeof(buf));
 535
 536                if (readlen < 0)
 537                        goto close_and_exit;
 538                if (!readlen)
 539                        break;
 540                if (can_seek && sizeof(buf) == readlen) {
 541                        for (holeto = 0; holeto < readlen; holeto++)
 542                                if (buf[holeto])
 543                                        break;
 544                        if (readlen == holeto) {
 545                                kept += holeto;
 546                                continue;
 547                        }
 548                }
 549
 550                if (kept && lseek(fd, kept, SEEK_CUR) == (off_t) -1)
 551                        goto close_and_exit;
 552                else
 553                        kept = 0;
 554                wrote = write_in_full(fd, buf, readlen);
 555
 556                if (wrote < 0)
 557                        goto close_and_exit;
 558        }
 559        if (kept && (lseek(fd, kept - 1, SEEK_CUR) == (off_t) -1 ||
 560                     xwrite(fd, "", 1) != 1))
 561                goto close_and_exit;
 562        result = 0;
 563
 564 close_and_exit:
 565        close_istream(st);
 566        return result;
 567}