streaming.con commit Merge branch 'jn/vcs-svn-cleanup' into next (c184f08)
   1/*
   2 * Copyright (c) 2011, Google Inc.
   3 */
   4#include "cache.h"
   5#include "streaming.h"
   6#include "packfile.h"
   7
   8enum input_source {
   9        stream_error = -1,
  10        incore = 0,
  11        loose = 1,
  12        pack_non_delta = 2
  13};
  14
  15typedef int (*open_istream_fn)(struct git_istream *,
  16                               struct object_info *,
  17                               const unsigned char *,
  18                               enum object_type *);
  19typedef int (*close_istream_fn)(struct git_istream *);
  20typedef ssize_t (*read_istream_fn)(struct git_istream *, char *, size_t);
  21
  22struct stream_vtbl {
  23        close_istream_fn close;
  24        read_istream_fn read;
  25};
  26
  27#define open_method_decl(name) \
  28        int open_istream_ ##name \
  29        (struct git_istream *st, struct object_info *oi, \
  30         const unsigned char *sha1, \
  31         enum object_type *type)
  32
  33#define close_method_decl(name) \
  34        int close_istream_ ##name \
  35        (struct git_istream *st)
  36
  37#define read_method_decl(name) \
  38        ssize_t read_istream_ ##name \
  39        (struct git_istream *st, char *buf, size_t sz)
  40
  41/* forward declaration */
  42static open_method_decl(incore);
  43static open_method_decl(loose);
  44static open_method_decl(pack_non_delta);
  45static struct git_istream *attach_stream_filter(struct git_istream *st,
  46                                                struct stream_filter *filter);
  47
  48
  49static open_istream_fn open_istream_tbl[] = {
  50        open_istream_incore,
  51        open_istream_loose,
  52        open_istream_pack_non_delta,
  53};
  54
  55#define FILTER_BUFFER (1024*16)
  56
  57struct filtered_istream {
  58        struct git_istream *upstream;
  59        struct stream_filter *filter;
  60        char ibuf[FILTER_BUFFER];
  61        char obuf[FILTER_BUFFER];
  62        int i_end, i_ptr;
  63        int o_end, o_ptr;
  64        int input_finished;
  65};
  66
  67struct git_istream {
  68        const struct stream_vtbl *vtbl;
  69        unsigned long size; /* inflated size of full object */
  70        git_zstream z;
  71        enum { z_unused, z_used, z_done, z_error } z_state;
  72
  73        union {
  74                struct {
  75                        char *buf; /* from read_object() */
  76                        unsigned long read_ptr;
  77                } incore;
  78
  79                struct {
  80                        void *mapped;
  81                        unsigned long mapsize;
  82                        char hdr[32];
  83                        int hdr_avail;
  84                        int hdr_used;
  85                } loose;
  86
  87                struct {
  88                        struct packed_git *pack;
  89                        off_t pos;
  90                } in_pack;
  91
  92                struct filtered_istream filtered;
  93        } u;
  94};
  95
  96int close_istream(struct git_istream *st)
  97{
  98        int r = st->vtbl->close(st);
  99        free(st);
 100        return r;
 101}
 102
 103ssize_t read_istream(struct git_istream *st, void *buf, size_t sz)
 104{
 105        return st->vtbl->read(st, buf, sz);
 106}
 107
 108static enum input_source istream_source(const unsigned char *sha1,
 109                                        enum object_type *type,
 110                                        struct object_info *oi)
 111{
 112        unsigned long size;
 113        int status;
 114
 115        oi->typep = type;
 116        oi->sizep = &size;
 117        status = sha1_object_info_extended(sha1, oi, 0);
 118        if (status < 0)
 119                return stream_error;
 120
 121        switch (oi->whence) {
 122        case OI_LOOSE:
 123                return loose;
 124        case OI_PACKED:
 125                if (!oi->u.packed.is_delta && big_file_threshold < size)
 126                        return pack_non_delta;
 127                /* fallthru */
 128        default:
 129                return incore;
 130        }
 131}
 132
 133struct git_istream *open_istream(const unsigned char *sha1,
 134                                 enum object_type *type,
 135                                 unsigned long *size,
 136                                 struct stream_filter *filter)
 137{
 138        struct git_istream *st;
 139        struct object_info oi = OBJECT_INFO_INIT;
 140        const unsigned char *real = lookup_replace_object(sha1);
 141        enum input_source src = istream_source(real, type, &oi);
 142
 143        if (src < 0)
 144                return NULL;
 145
 146        st = xmalloc(sizeof(*st));
 147        if (open_istream_tbl[src](st, &oi, real, type)) {
 148                if (open_istream_incore(st, &oi, real, type)) {
 149                        free(st);
 150                        return NULL;
 151                }
 152        }
 153        if (filter) {
 154                /* Add "&& !is_null_stream_filter(filter)" for performance */
 155                struct git_istream *nst = attach_stream_filter(st, filter);
 156                if (!nst) {
 157                        close_istream(st);
 158                        return NULL;
 159                }
 160                st = nst;
 161        }
 162
 163        *size = st->size;
 164        return st;
 165}
 166
 167
 168/*****************************************************************
 169 *
 170 * Common helpers
 171 *
 172 *****************************************************************/
 173
 174static void close_deflated_stream(struct git_istream *st)
 175{
 176        if (st->z_state == z_used)
 177                git_inflate_end(&st->z);
 178}
 179
 180
 181/*****************************************************************
 182 *
 183 * Filtered stream
 184 *
 185 *****************************************************************/
 186
 187static close_method_decl(filtered)
 188{
 189        free_stream_filter(st->u.filtered.filter);
 190        return close_istream(st->u.filtered.upstream);
 191}
 192
 193static read_method_decl(filtered)
 194{
 195        struct filtered_istream *fs = &(st->u.filtered);
 196        size_t filled = 0;
 197
 198        while (sz) {
 199                /* do we already have filtered output? */
 200                if (fs->o_ptr < fs->o_end) {
 201                        size_t to_move = fs->o_end - fs->o_ptr;
 202                        if (sz < to_move)
 203                                to_move = sz;
 204                        memcpy(buf + filled, fs->obuf + fs->o_ptr, to_move);
 205                        fs->o_ptr += to_move;
 206                        sz -= to_move;
 207                        filled += to_move;
 208                        continue;
 209                }
 210                fs->o_end = fs->o_ptr = 0;
 211
 212                /* do we have anything to feed the filter with? */
 213                if (fs->i_ptr < fs->i_end) {
 214                        size_t to_feed = fs->i_end - fs->i_ptr;
 215                        size_t to_receive = FILTER_BUFFER;
 216                        if (stream_filter(fs->filter,
 217                                          fs->ibuf + fs->i_ptr, &to_feed,
 218                                          fs->obuf, &to_receive))
 219                                return -1;
 220                        fs->i_ptr = fs->i_end - to_feed;
 221                        fs->o_end = FILTER_BUFFER - to_receive;
 222                        continue;
 223                }
 224
 225                /* tell the filter to drain upon no more input */
 226                if (fs->input_finished) {
 227                        size_t to_receive = FILTER_BUFFER;
 228                        if (stream_filter(fs->filter,
 229                                          NULL, NULL,
 230                                          fs->obuf, &to_receive))
 231                                return -1;
 232                        fs->o_end = FILTER_BUFFER - to_receive;
 233                        if (!fs->o_end)
 234                                break;
 235                        continue;
 236                }
 237                fs->i_end = fs->i_ptr = 0;
 238
 239                /* refill the input from the upstream */
 240                if (!fs->input_finished) {
 241                        fs->i_end = read_istream(fs->upstream, fs->ibuf, FILTER_BUFFER);
 242                        if (fs->i_end < 0)
 243                                return -1;
 244                        if (fs->i_end)
 245                                continue;
 246                }
 247                fs->input_finished = 1;
 248        }
 249        return filled;
 250}
 251
 252static struct stream_vtbl filtered_vtbl = {
 253        close_istream_filtered,
 254        read_istream_filtered,
 255};
 256
 257static struct git_istream *attach_stream_filter(struct git_istream *st,
 258                                                struct stream_filter *filter)
 259{
 260        struct git_istream *ifs = xmalloc(sizeof(*ifs));
 261        struct filtered_istream *fs = &(ifs->u.filtered);
 262
 263        ifs->vtbl = &filtered_vtbl;
 264        fs->upstream = st;
 265        fs->filter = filter;
 266        fs->i_end = fs->i_ptr = 0;
 267        fs->o_end = fs->o_ptr = 0;
 268        fs->input_finished = 0;
 269        ifs->size = -1; /* unknown */
 270        return ifs;
 271}
 272
 273/*****************************************************************
 274 *
 275 * Loose object stream
 276 *
 277 *****************************************************************/
 278
 279static read_method_decl(loose)
 280{
 281        size_t total_read = 0;
 282
 283        switch (st->z_state) {
 284        case z_done:
 285                return 0;
 286        case z_error:
 287                return -1;
 288        default:
 289                break;
 290        }
 291
 292        if (st->u.loose.hdr_used < st->u.loose.hdr_avail) {
 293                size_t to_copy = st->u.loose.hdr_avail - st->u.loose.hdr_used;
 294                if (sz < to_copy)
 295                        to_copy = sz;
 296                memcpy(buf, st->u.loose.hdr + st->u.loose.hdr_used, to_copy);
 297                st->u.loose.hdr_used += to_copy;
 298                total_read += to_copy;
 299        }
 300
 301        while (total_read < sz) {
 302                int status;
 303
 304                st->z.next_out = (unsigned char *)buf + total_read;
 305                st->z.avail_out = sz - total_read;
 306                status = git_inflate(&st->z, Z_FINISH);
 307
 308                total_read = st->z.next_out - (unsigned char *)buf;
 309
 310                if (status == Z_STREAM_END) {
 311                        git_inflate_end(&st->z);
 312                        st->z_state = z_done;
 313                        break;
 314                }
 315                if (status != Z_OK && (status != Z_BUF_ERROR || total_read < sz)) {
 316                        git_inflate_end(&st->z);
 317                        st->z_state = z_error;
 318                        return -1;
 319                }
 320        }
 321        return total_read;
 322}
 323
 324static close_method_decl(loose)
 325{
 326        close_deflated_stream(st);
 327        munmap(st->u.loose.mapped, st->u.loose.mapsize);
 328        return 0;
 329}
 330
 331static struct stream_vtbl loose_vtbl = {
 332        close_istream_loose,
 333        read_istream_loose,
 334};
 335
 336static open_method_decl(loose)
 337{
 338        st->u.loose.mapped = map_sha1_file(sha1, &st->u.loose.mapsize);
 339        if (!st->u.loose.mapped)
 340                return -1;
 341        if ((unpack_sha1_header(&st->z,
 342                                st->u.loose.mapped,
 343                                st->u.loose.mapsize,
 344                                st->u.loose.hdr,
 345                                sizeof(st->u.loose.hdr)) < 0) ||
 346            (parse_sha1_header(st->u.loose.hdr, &st->size) < 0)) {
 347                git_inflate_end(&st->z);
 348                munmap(st->u.loose.mapped, st->u.loose.mapsize);
 349                return -1;
 350        }
 351
 352        st->u.loose.hdr_used = strlen(st->u.loose.hdr) + 1;
 353        st->u.loose.hdr_avail = st->z.total_out;
 354        st->z_state = z_used;
 355
 356        st->vtbl = &loose_vtbl;
 357        return 0;
 358}
 359
 360
 361/*****************************************************************
 362 *
 363 * Non-delta packed object stream
 364 *
 365 *****************************************************************/
 366
 367static read_method_decl(pack_non_delta)
 368{
 369        size_t total_read = 0;
 370
 371        switch (st->z_state) {
 372        case z_unused:
 373                memset(&st->z, 0, sizeof(st->z));
 374                git_inflate_init(&st->z);
 375                st->z_state = z_used;
 376                break;
 377        case z_done:
 378                return 0;
 379        case z_error:
 380                return -1;
 381        case z_used:
 382                break;
 383        }
 384
 385        while (total_read < sz) {
 386                int status;
 387                struct pack_window *window = NULL;
 388                unsigned char *mapped;
 389
 390                mapped = use_pack(st->u.in_pack.pack, &window,
 391                                  st->u.in_pack.pos, &st->z.avail_in);
 392
 393                st->z.next_out = (unsigned char *)buf + total_read;
 394                st->z.avail_out = sz - total_read;
 395                st->z.next_in = mapped;
 396                status = git_inflate(&st->z, Z_FINISH);
 397
 398                st->u.in_pack.pos += st->z.next_in - mapped;
 399                total_read = st->z.next_out - (unsigned char *)buf;
 400                unuse_pack(&window);
 401
 402                if (status == Z_STREAM_END) {
 403                        git_inflate_end(&st->z);
 404                        st->z_state = z_done;
 405                        break;
 406                }
 407                if (status != Z_OK && status != Z_BUF_ERROR) {
 408                        git_inflate_end(&st->z);
 409                        st->z_state = z_error;
 410                        return -1;
 411                }
 412        }
 413        return total_read;
 414}
 415
 416static close_method_decl(pack_non_delta)
 417{
 418        close_deflated_stream(st);
 419        return 0;
 420}
 421
 422static struct stream_vtbl pack_non_delta_vtbl = {
 423        close_istream_pack_non_delta,
 424        read_istream_pack_non_delta,
 425};
 426
 427static open_method_decl(pack_non_delta)
 428{
 429        struct pack_window *window;
 430        enum object_type in_pack_type;
 431
 432        st->u.in_pack.pack = oi->u.packed.pack;
 433        st->u.in_pack.pos = oi->u.packed.offset;
 434        window = NULL;
 435
 436        in_pack_type = unpack_object_header(st->u.in_pack.pack,
 437                                            &window,
 438                                            &st->u.in_pack.pos,
 439                                            &st->size);
 440        unuse_pack(&window);
 441        switch (in_pack_type) {
 442        default:
 443                return -1; /* we do not do deltas for now */
 444        case OBJ_COMMIT:
 445        case OBJ_TREE:
 446        case OBJ_BLOB:
 447        case OBJ_TAG:
 448                break;
 449        }
 450        st->z_state = z_unused;
 451        st->vtbl = &pack_non_delta_vtbl;
 452        return 0;
 453}
 454
 455
 456/*****************************************************************
 457 *
 458 * In-core stream
 459 *
 460 *****************************************************************/
 461
 462static close_method_decl(incore)
 463{
 464        free(st->u.incore.buf);
 465        return 0;
 466}
 467
 468static read_method_decl(incore)
 469{
 470        size_t read_size = sz;
 471        size_t remainder = st->size - st->u.incore.read_ptr;
 472
 473        if (remainder <= read_size)
 474                read_size = remainder;
 475        if (read_size) {
 476                memcpy(buf, st->u.incore.buf + st->u.incore.read_ptr, read_size);
 477                st->u.incore.read_ptr += read_size;
 478        }
 479        return read_size;
 480}
 481
 482static struct stream_vtbl incore_vtbl = {
 483        close_istream_incore,
 484        read_istream_incore,
 485};
 486
 487static open_method_decl(incore)
 488{
 489        st->u.incore.buf = read_sha1_file_extended(sha1, type, &st->size, 0);
 490        st->u.incore.read_ptr = 0;
 491        st->vtbl = &incore_vtbl;
 492
 493        return st->u.incore.buf ? 0 : -1;
 494}
 495
 496
 497/****************************************************************
 498 * Users of streaming interface
 499 ****************************************************************/
 500
 501int stream_blob_to_fd(int fd, const struct object_id *oid, struct stream_filter *filter,
 502                      int can_seek)
 503{
 504        struct git_istream *st;
 505        enum object_type type;
 506        unsigned long sz;
 507        ssize_t kept = 0;
 508        int result = -1;
 509
 510        st = open_istream(oid->hash, &type, &sz, filter);
 511        if (!st) {
 512                if (filter)
 513                        free_stream_filter(filter);
 514                return result;
 515        }
 516        if (type != OBJ_BLOB)
 517                goto close_and_exit;
 518        for (;;) {
 519                char buf[1024 * 16];
 520                ssize_t wrote, holeto;
 521                ssize_t readlen = read_istream(st, buf, sizeof(buf));
 522
 523                if (readlen < 0)
 524                        goto close_and_exit;
 525                if (!readlen)
 526                        break;
 527                if (can_seek && sizeof(buf) == readlen) {
 528                        for (holeto = 0; holeto < readlen; holeto++)
 529                                if (buf[holeto])
 530                                        break;
 531                        if (readlen == holeto) {
 532                                kept += holeto;
 533                                continue;
 534                        }
 535                }
 536
 537                if (kept && lseek(fd, kept, SEEK_CUR) == (off_t) -1)
 538                        goto close_and_exit;
 539                else
 540                        kept = 0;
 541                wrote = write_in_full(fd, buf, readlen);
 542
 543                if (wrote != readlen)
 544                        goto close_and_exit;
 545        }
 546        if (kept && (lseek(fd, kept - 1, SEEK_CUR) == (off_t) -1 ||
 547                     xwrite(fd, "", 1) != 1))
 548                goto close_and_exit;
 549        result = 0;
 550
 551 close_and_exit:
 552        close_istream(st);
 553        return result;
 554}