streaming.con commit sha1_file: convert read_object_with_reference to object_id (02f0547)
   1/*
   2 * Copyright (c) 2011, Google Inc.
   3 */
   4#include "cache.h"
   5#include "streaming.h"
   6#include "packfile.h"
   7
   8enum input_source {
   9        stream_error = -1,
  10        incore = 0,
  11        loose = 1,
  12        pack_non_delta = 2
  13};
  14
  15typedef int (*open_istream_fn)(struct git_istream *,
  16                               struct object_info *,
  17                               const struct object_id *,
  18                               enum object_type *);
  19typedef int (*close_istream_fn)(struct git_istream *);
  20typedef ssize_t (*read_istream_fn)(struct git_istream *, char *, size_t);
  21
  22struct stream_vtbl {
  23        close_istream_fn close;
  24        read_istream_fn read;
  25};
  26
  27#define open_method_decl(name) \
  28        int open_istream_ ##name \
  29        (struct git_istream *st, struct object_info *oi, \
  30         const struct object_id *oid, \
  31         enum object_type *type)
  32
  33#define close_method_decl(name) \
  34        int close_istream_ ##name \
  35        (struct git_istream *st)
  36
  37#define read_method_decl(name) \
  38        ssize_t read_istream_ ##name \
  39        (struct git_istream *st, char *buf, size_t sz)
  40
  41/* forward declaration */
  42static open_method_decl(incore);
  43static open_method_decl(loose);
  44static open_method_decl(pack_non_delta);
  45static struct git_istream *attach_stream_filter(struct git_istream *st,
  46                                                struct stream_filter *filter);
  47
  48
  49static open_istream_fn open_istream_tbl[] = {
  50        open_istream_incore,
  51        open_istream_loose,
  52        open_istream_pack_non_delta,
  53};
  54
  55#define FILTER_BUFFER (1024*16)
  56
  57struct filtered_istream {
  58        struct git_istream *upstream;
  59        struct stream_filter *filter;
  60        char ibuf[FILTER_BUFFER];
  61        char obuf[FILTER_BUFFER];
  62        int i_end, i_ptr;
  63        int o_end, o_ptr;
  64        int input_finished;
  65};
  66
  67struct git_istream {
  68        const struct stream_vtbl *vtbl;
  69        unsigned long size; /* inflated size of full object */
  70        git_zstream z;
  71        enum { z_unused, z_used, z_done, z_error } z_state;
  72
  73        union {
  74                struct {
  75                        char *buf; /* from read_object() */
  76                        unsigned long read_ptr;
  77                } incore;
  78
  79                struct {
  80                        void *mapped;
  81                        unsigned long mapsize;
  82                        char hdr[32];
  83                        int hdr_avail;
  84                        int hdr_used;
  85                } loose;
  86
  87                struct {
  88                        struct packed_git *pack;
  89                        off_t pos;
  90                } in_pack;
  91
  92                struct filtered_istream filtered;
  93        } u;
  94};
  95
  96int close_istream(struct git_istream *st)
  97{
  98        int r = st->vtbl->close(st);
  99        free(st);
 100        return r;
 101}
 102
 103ssize_t read_istream(struct git_istream *st, void *buf, size_t sz)
 104{
 105        return st->vtbl->read(st, buf, sz);
 106}
 107
 108static enum input_source istream_source(const unsigned char *sha1,
 109                                        enum object_type *type,
 110                                        struct object_info *oi)
 111{
 112        unsigned long size;
 113        int status;
 114        struct object_id oid;
 115
 116        hashcpy(oid.hash, sha1);
 117
 118        oi->typep = type;
 119        oi->sizep = &size;
 120        status = oid_object_info_extended(&oid, oi, 0);
 121        if (status < 0)
 122                return stream_error;
 123
 124        switch (oi->whence) {
 125        case OI_LOOSE:
 126                return loose;
 127        case OI_PACKED:
 128                if (!oi->u.packed.is_delta && big_file_threshold < size)
 129                        return pack_non_delta;
 130                /* fallthru */
 131        default:
 132                return incore;
 133        }
 134}
 135
 136struct git_istream *open_istream(const struct object_id *oid,
 137                                 enum object_type *type,
 138                                 unsigned long *size,
 139                                 struct stream_filter *filter)
 140{
 141        struct git_istream *st;
 142        struct object_info oi = OBJECT_INFO_INIT;
 143        const unsigned char *real = lookup_replace_object(oid->hash);
 144        enum input_source src = istream_source(real, type, &oi);
 145        struct object_id realoid;
 146
 147        hashcpy(realoid.hash, real);
 148
 149        if (src < 0)
 150                return NULL;
 151
 152        st = xmalloc(sizeof(*st));
 153        if (open_istream_tbl[src](st, &oi, &realoid, type)) {
 154                if (open_istream_incore(st, &oi, &realoid, type)) {
 155                        free(st);
 156                        return NULL;
 157                }
 158        }
 159        if (filter) {
 160                /* Add "&& !is_null_stream_filter(filter)" for performance */
 161                struct git_istream *nst = attach_stream_filter(st, filter);
 162                if (!nst) {
 163                        close_istream(st);
 164                        return NULL;
 165                }
 166                st = nst;
 167        }
 168
 169        *size = st->size;
 170        return st;
 171}
 172
 173
 174/*****************************************************************
 175 *
 176 * Common helpers
 177 *
 178 *****************************************************************/
 179
 180static void close_deflated_stream(struct git_istream *st)
 181{
 182        if (st->z_state == z_used)
 183                git_inflate_end(&st->z);
 184}
 185
 186
 187/*****************************************************************
 188 *
 189 * Filtered stream
 190 *
 191 *****************************************************************/
 192
 193static close_method_decl(filtered)
 194{
 195        free_stream_filter(st->u.filtered.filter);
 196        return close_istream(st->u.filtered.upstream);
 197}
 198
 199static read_method_decl(filtered)
 200{
 201        struct filtered_istream *fs = &(st->u.filtered);
 202        size_t filled = 0;
 203
 204        while (sz) {
 205                /* do we already have filtered output? */
 206                if (fs->o_ptr < fs->o_end) {
 207                        size_t to_move = fs->o_end - fs->o_ptr;
 208                        if (sz < to_move)
 209                                to_move = sz;
 210                        memcpy(buf + filled, fs->obuf + fs->o_ptr, to_move);
 211                        fs->o_ptr += to_move;
 212                        sz -= to_move;
 213                        filled += to_move;
 214                        continue;
 215                }
 216                fs->o_end = fs->o_ptr = 0;
 217
 218                /* do we have anything to feed the filter with? */
 219                if (fs->i_ptr < fs->i_end) {
 220                        size_t to_feed = fs->i_end - fs->i_ptr;
 221                        size_t to_receive = FILTER_BUFFER;
 222                        if (stream_filter(fs->filter,
 223                                          fs->ibuf + fs->i_ptr, &to_feed,
 224                                          fs->obuf, &to_receive))
 225                                return -1;
 226                        fs->i_ptr = fs->i_end - to_feed;
 227                        fs->o_end = FILTER_BUFFER - to_receive;
 228                        continue;
 229                }
 230
 231                /* tell the filter to drain upon no more input */
 232                if (fs->input_finished) {
 233                        size_t to_receive = FILTER_BUFFER;
 234                        if (stream_filter(fs->filter,
 235                                          NULL, NULL,
 236                                          fs->obuf, &to_receive))
 237                                return -1;
 238                        fs->o_end = FILTER_BUFFER - to_receive;
 239                        if (!fs->o_end)
 240                                break;
 241                        continue;
 242                }
 243                fs->i_end = fs->i_ptr = 0;
 244
 245                /* refill the input from the upstream */
 246                if (!fs->input_finished) {
 247                        fs->i_end = read_istream(fs->upstream, fs->ibuf, FILTER_BUFFER);
 248                        if (fs->i_end < 0)
 249                                return -1;
 250                        if (fs->i_end)
 251                                continue;
 252                }
 253                fs->input_finished = 1;
 254        }
 255        return filled;
 256}
 257
 258static struct stream_vtbl filtered_vtbl = {
 259        close_istream_filtered,
 260        read_istream_filtered,
 261};
 262
 263static struct git_istream *attach_stream_filter(struct git_istream *st,
 264                                                struct stream_filter *filter)
 265{
 266        struct git_istream *ifs = xmalloc(sizeof(*ifs));
 267        struct filtered_istream *fs = &(ifs->u.filtered);
 268
 269        ifs->vtbl = &filtered_vtbl;
 270        fs->upstream = st;
 271        fs->filter = filter;
 272        fs->i_end = fs->i_ptr = 0;
 273        fs->o_end = fs->o_ptr = 0;
 274        fs->input_finished = 0;
 275        ifs->size = -1; /* unknown */
 276        return ifs;
 277}
 278
 279/*****************************************************************
 280 *
 281 * Loose object stream
 282 *
 283 *****************************************************************/
 284
 285static read_method_decl(loose)
 286{
 287        size_t total_read = 0;
 288
 289        switch (st->z_state) {
 290        case z_done:
 291                return 0;
 292        case z_error:
 293                return -1;
 294        default:
 295                break;
 296        }
 297
 298        if (st->u.loose.hdr_used < st->u.loose.hdr_avail) {
 299                size_t to_copy = st->u.loose.hdr_avail - st->u.loose.hdr_used;
 300                if (sz < to_copy)
 301                        to_copy = sz;
 302                memcpy(buf, st->u.loose.hdr + st->u.loose.hdr_used, to_copy);
 303                st->u.loose.hdr_used += to_copy;
 304                total_read += to_copy;
 305        }
 306
 307        while (total_read < sz) {
 308                int status;
 309
 310                st->z.next_out = (unsigned char *)buf + total_read;
 311                st->z.avail_out = sz - total_read;
 312                status = git_inflate(&st->z, Z_FINISH);
 313
 314                total_read = st->z.next_out - (unsigned char *)buf;
 315
 316                if (status == Z_STREAM_END) {
 317                        git_inflate_end(&st->z);
 318                        st->z_state = z_done;
 319                        break;
 320                }
 321                if (status != Z_OK && (status != Z_BUF_ERROR || total_read < sz)) {
 322                        git_inflate_end(&st->z);
 323                        st->z_state = z_error;
 324                        return -1;
 325                }
 326        }
 327        return total_read;
 328}
 329
 330static close_method_decl(loose)
 331{
 332        close_deflated_stream(st);
 333        munmap(st->u.loose.mapped, st->u.loose.mapsize);
 334        return 0;
 335}
 336
 337static struct stream_vtbl loose_vtbl = {
 338        close_istream_loose,
 339        read_istream_loose,
 340};
 341
 342static open_method_decl(loose)
 343{
 344        st->u.loose.mapped = map_sha1_file(oid->hash, &st->u.loose.mapsize);
 345        if (!st->u.loose.mapped)
 346                return -1;
 347        if ((unpack_sha1_header(&st->z,
 348                                st->u.loose.mapped,
 349                                st->u.loose.mapsize,
 350                                st->u.loose.hdr,
 351                                sizeof(st->u.loose.hdr)) < 0) ||
 352            (parse_sha1_header(st->u.loose.hdr, &st->size) < 0)) {
 353                git_inflate_end(&st->z);
 354                munmap(st->u.loose.mapped, st->u.loose.mapsize);
 355                return -1;
 356        }
 357
 358        st->u.loose.hdr_used = strlen(st->u.loose.hdr) + 1;
 359        st->u.loose.hdr_avail = st->z.total_out;
 360        st->z_state = z_used;
 361
 362        st->vtbl = &loose_vtbl;
 363        return 0;
 364}
 365
 366
 367/*****************************************************************
 368 *
 369 * Non-delta packed object stream
 370 *
 371 *****************************************************************/
 372
 373static read_method_decl(pack_non_delta)
 374{
 375        size_t total_read = 0;
 376
 377        switch (st->z_state) {
 378        case z_unused:
 379                memset(&st->z, 0, sizeof(st->z));
 380                git_inflate_init(&st->z);
 381                st->z_state = z_used;
 382                break;
 383        case z_done:
 384                return 0;
 385        case z_error:
 386                return -1;
 387        case z_used:
 388                break;
 389        }
 390
 391        while (total_read < sz) {
 392                int status;
 393                struct pack_window *window = NULL;
 394                unsigned char *mapped;
 395
 396                mapped = use_pack(st->u.in_pack.pack, &window,
 397                                  st->u.in_pack.pos, &st->z.avail_in);
 398
 399                st->z.next_out = (unsigned char *)buf + total_read;
 400                st->z.avail_out = sz - total_read;
 401                st->z.next_in = mapped;
 402                status = git_inflate(&st->z, Z_FINISH);
 403
 404                st->u.in_pack.pos += st->z.next_in - mapped;
 405                total_read = st->z.next_out - (unsigned char *)buf;
 406                unuse_pack(&window);
 407
 408                if (status == Z_STREAM_END) {
 409                        git_inflate_end(&st->z);
 410                        st->z_state = z_done;
 411                        break;
 412                }
 413                if (status != Z_OK && status != Z_BUF_ERROR) {
 414                        git_inflate_end(&st->z);
 415                        st->z_state = z_error;
 416                        return -1;
 417                }
 418        }
 419        return total_read;
 420}
 421
 422static close_method_decl(pack_non_delta)
 423{
 424        close_deflated_stream(st);
 425        return 0;
 426}
 427
 428static struct stream_vtbl pack_non_delta_vtbl = {
 429        close_istream_pack_non_delta,
 430        read_istream_pack_non_delta,
 431};
 432
 433static open_method_decl(pack_non_delta)
 434{
 435        struct pack_window *window;
 436        enum object_type in_pack_type;
 437
 438        st->u.in_pack.pack = oi->u.packed.pack;
 439        st->u.in_pack.pos = oi->u.packed.offset;
 440        window = NULL;
 441
 442        in_pack_type = unpack_object_header(st->u.in_pack.pack,
 443                                            &window,
 444                                            &st->u.in_pack.pos,
 445                                            &st->size);
 446        unuse_pack(&window);
 447        switch (in_pack_type) {
 448        default:
 449                return -1; /* we do not do deltas for now */
 450        case OBJ_COMMIT:
 451        case OBJ_TREE:
 452        case OBJ_BLOB:
 453        case OBJ_TAG:
 454                break;
 455        }
 456        st->z_state = z_unused;
 457        st->vtbl = &pack_non_delta_vtbl;
 458        return 0;
 459}
 460
 461
 462/*****************************************************************
 463 *
 464 * In-core stream
 465 *
 466 *****************************************************************/
 467
 468static close_method_decl(incore)
 469{
 470        free(st->u.incore.buf);
 471        return 0;
 472}
 473
 474static read_method_decl(incore)
 475{
 476        size_t read_size = sz;
 477        size_t remainder = st->size - st->u.incore.read_ptr;
 478
 479        if (remainder <= read_size)
 480                read_size = remainder;
 481        if (read_size) {
 482                memcpy(buf, st->u.incore.buf + st->u.incore.read_ptr, read_size);
 483                st->u.incore.read_ptr += read_size;
 484        }
 485        return read_size;
 486}
 487
 488static struct stream_vtbl incore_vtbl = {
 489        close_istream_incore,
 490        read_istream_incore,
 491};
 492
 493static open_method_decl(incore)
 494{
 495        st->u.incore.buf = read_sha1_file_extended(oid->hash, type, &st->size, 0);
 496        st->u.incore.read_ptr = 0;
 497        st->vtbl = &incore_vtbl;
 498
 499        return st->u.incore.buf ? 0 : -1;
 500}
 501
 502
 503/****************************************************************
 504 * Users of streaming interface
 505 ****************************************************************/
 506
 507int stream_blob_to_fd(int fd, const struct object_id *oid, struct stream_filter *filter,
 508                      int can_seek)
 509{
 510        struct git_istream *st;
 511        enum object_type type;
 512        unsigned long sz;
 513        ssize_t kept = 0;
 514        int result = -1;
 515
 516        st = open_istream(oid, &type, &sz, filter);
 517        if (!st) {
 518                if (filter)
 519                        free_stream_filter(filter);
 520                return result;
 521        }
 522        if (type != OBJ_BLOB)
 523                goto close_and_exit;
 524        for (;;) {
 525                char buf[1024 * 16];
 526                ssize_t wrote, holeto;
 527                ssize_t readlen = read_istream(st, buf, sizeof(buf));
 528
 529                if (readlen < 0)
 530                        goto close_and_exit;
 531                if (!readlen)
 532                        break;
 533                if (can_seek && sizeof(buf) == readlen) {
 534                        for (holeto = 0; holeto < readlen; holeto++)
 535                                if (buf[holeto])
 536                                        break;
 537                        if (readlen == holeto) {
 538                                kept += holeto;
 539                                continue;
 540                        }
 541                }
 542
 543                if (kept && lseek(fd, kept, SEEK_CUR) == (off_t) -1)
 544                        goto close_and_exit;
 545                else
 546                        kept = 0;
 547                wrote = write_in_full(fd, buf, readlen);
 548
 549                if (wrote < 0)
 550                        goto close_and_exit;
 551        }
 552        if (kept && (lseek(fd, kept - 1, SEEK_CUR) == (off_t) -1 ||
 553                     xwrite(fd, "", 1) != 1))
 554                goto close_and_exit;
 555        result = 0;
 556
 557 close_and_exit:
 558        close_istream(st);
 559        return result;
 560}