streaming.con commit Merge branch 'lw/daemon-log-destination' (f9bcd75)
   1/*
   2 * Copyright (c) 2011, Google Inc.
   3 */
   4#include "cache.h"
   5#include "streaming.h"
   6#include "repository.h"
   7#include "object-store.h"
   8#include "packfile.h"
   9
  10enum input_source {
  11        stream_error = -1,
  12        incore = 0,
  13        loose = 1,
  14        pack_non_delta = 2
  15};
  16
  17typedef int (*open_istream_fn)(struct git_istream *,
  18                               struct object_info *,
  19                               const struct object_id *,
  20                               enum object_type *);
  21typedef int (*close_istream_fn)(struct git_istream *);
  22typedef ssize_t (*read_istream_fn)(struct git_istream *, char *, size_t);
  23
  24struct stream_vtbl {
  25        close_istream_fn close;
  26        read_istream_fn read;
  27};
  28
  29#define open_method_decl(name) \
  30        int open_istream_ ##name \
  31        (struct git_istream *st, struct object_info *oi, \
  32         const struct object_id *oid, \
  33         enum object_type *type)
  34
  35#define close_method_decl(name) \
  36        int close_istream_ ##name \
  37        (struct git_istream *st)
  38
  39#define read_method_decl(name) \
  40        ssize_t read_istream_ ##name \
  41        (struct git_istream *st, char *buf, size_t sz)
  42
  43/* forward declaration */
  44static open_method_decl(incore);
  45static open_method_decl(loose);
  46static open_method_decl(pack_non_delta);
  47static struct git_istream *attach_stream_filter(struct git_istream *st,
  48                                                struct stream_filter *filter);
  49
  50
  51static open_istream_fn open_istream_tbl[] = {
  52        open_istream_incore,
  53        open_istream_loose,
  54        open_istream_pack_non_delta,
  55};
  56
  57#define FILTER_BUFFER (1024*16)
  58
  59struct filtered_istream {
  60        struct git_istream *upstream;
  61        struct stream_filter *filter;
  62        char ibuf[FILTER_BUFFER];
  63        char obuf[FILTER_BUFFER];
  64        int i_end, i_ptr;
  65        int o_end, o_ptr;
  66        int input_finished;
  67};
  68
  69struct git_istream {
  70        const struct stream_vtbl *vtbl;
  71        unsigned long size; /* inflated size of full object */
  72        git_zstream z;
  73        enum { z_unused, z_used, z_done, z_error } z_state;
  74
  75        union {
  76                struct {
  77                        char *buf; /* from read_object() */
  78                        unsigned long read_ptr;
  79                } incore;
  80
  81                struct {
  82                        void *mapped;
  83                        unsigned long mapsize;
  84                        char hdr[32];
  85                        int hdr_avail;
  86                        int hdr_used;
  87                } loose;
  88
  89                struct {
  90                        struct packed_git *pack;
  91                        off_t pos;
  92                } in_pack;
  93
  94                struct filtered_istream filtered;
  95        } u;
  96};
  97
  98int close_istream(struct git_istream *st)
  99{
 100        int r = st->vtbl->close(st);
 101        free(st);
 102        return r;
 103}
 104
 105ssize_t read_istream(struct git_istream *st, void *buf, size_t sz)
 106{
 107        return st->vtbl->read(st, buf, sz);
 108}
 109
 110static enum input_source istream_source(const struct object_id *oid,
 111                                        enum object_type *type,
 112                                        struct object_info *oi)
 113{
 114        unsigned long size;
 115        int status;
 116
 117        oi->typep = type;
 118        oi->sizep = &size;
 119        status = oid_object_info_extended(oid, oi, 0);
 120        if (status < 0)
 121                return stream_error;
 122
 123        switch (oi->whence) {
 124        case OI_LOOSE:
 125                return loose;
 126        case OI_PACKED:
 127                if (!oi->u.packed.is_delta && big_file_threshold < size)
 128                        return pack_non_delta;
 129                /* fallthru */
 130        default:
 131                return incore;
 132        }
 133}
 134
 135struct git_istream *open_istream(const struct object_id *oid,
 136                                 enum object_type *type,
 137                                 unsigned long *size,
 138                                 struct stream_filter *filter)
 139{
 140        struct git_istream *st;
 141        struct object_info oi = OBJECT_INFO_INIT;
 142        const struct object_id *real = lookup_replace_object(oid);
 143        enum input_source src = istream_source(real, type, &oi);
 144
 145        if (src < 0)
 146                return NULL;
 147
 148        st = xmalloc(sizeof(*st));
 149        if (open_istream_tbl[src](st, &oi, real, type)) {
 150                if (open_istream_incore(st, &oi, real, type)) {
 151                        free(st);
 152                        return NULL;
 153                }
 154        }
 155        if (filter) {
 156                /* Add "&& !is_null_stream_filter(filter)" for performance */
 157                struct git_istream *nst = attach_stream_filter(st, filter);
 158                if (!nst) {
 159                        close_istream(st);
 160                        return NULL;
 161                }
 162                st = nst;
 163        }
 164
 165        *size = st->size;
 166        return st;
 167}
 168
 169
 170/*****************************************************************
 171 *
 172 * Common helpers
 173 *
 174 *****************************************************************/
 175
 176static void close_deflated_stream(struct git_istream *st)
 177{
 178        if (st->z_state == z_used)
 179                git_inflate_end(&st->z);
 180}
 181
 182
 183/*****************************************************************
 184 *
 185 * Filtered stream
 186 *
 187 *****************************************************************/
 188
 189static close_method_decl(filtered)
 190{
 191        free_stream_filter(st->u.filtered.filter);
 192        return close_istream(st->u.filtered.upstream);
 193}
 194
 195static read_method_decl(filtered)
 196{
 197        struct filtered_istream *fs = &(st->u.filtered);
 198        size_t filled = 0;
 199
 200        while (sz) {
 201                /* do we already have filtered output? */
 202                if (fs->o_ptr < fs->o_end) {
 203                        size_t to_move = fs->o_end - fs->o_ptr;
 204                        if (sz < to_move)
 205                                to_move = sz;
 206                        memcpy(buf + filled, fs->obuf + fs->o_ptr, to_move);
 207                        fs->o_ptr += to_move;
 208                        sz -= to_move;
 209                        filled += to_move;
 210                        continue;
 211                }
 212                fs->o_end = fs->o_ptr = 0;
 213
 214                /* do we have anything to feed the filter with? */
 215                if (fs->i_ptr < fs->i_end) {
 216                        size_t to_feed = fs->i_end - fs->i_ptr;
 217                        size_t to_receive = FILTER_BUFFER;
 218                        if (stream_filter(fs->filter,
 219                                          fs->ibuf + fs->i_ptr, &to_feed,
 220                                          fs->obuf, &to_receive))
 221                                return -1;
 222                        fs->i_ptr = fs->i_end - to_feed;
 223                        fs->o_end = FILTER_BUFFER - to_receive;
 224                        continue;
 225                }
 226
 227                /* tell the filter to drain upon no more input */
 228                if (fs->input_finished) {
 229                        size_t to_receive = FILTER_BUFFER;
 230                        if (stream_filter(fs->filter,
 231                                          NULL, NULL,
 232                                          fs->obuf, &to_receive))
 233                                return -1;
 234                        fs->o_end = FILTER_BUFFER - to_receive;
 235                        if (!fs->o_end)
 236                                break;
 237                        continue;
 238                }
 239                fs->i_end = fs->i_ptr = 0;
 240
 241                /* refill the input from the upstream */
 242                if (!fs->input_finished) {
 243                        fs->i_end = read_istream(fs->upstream, fs->ibuf, FILTER_BUFFER);
 244                        if (fs->i_end < 0)
 245                                return -1;
 246                        if (fs->i_end)
 247                                continue;
 248                }
 249                fs->input_finished = 1;
 250        }
 251        return filled;
 252}
 253
 254static struct stream_vtbl filtered_vtbl = {
 255        close_istream_filtered,
 256        read_istream_filtered,
 257};
 258
 259static struct git_istream *attach_stream_filter(struct git_istream *st,
 260                                                struct stream_filter *filter)
 261{
 262        struct git_istream *ifs = xmalloc(sizeof(*ifs));
 263        struct filtered_istream *fs = &(ifs->u.filtered);
 264
 265        ifs->vtbl = &filtered_vtbl;
 266        fs->upstream = st;
 267        fs->filter = filter;
 268        fs->i_end = fs->i_ptr = 0;
 269        fs->o_end = fs->o_ptr = 0;
 270        fs->input_finished = 0;
 271        ifs->size = -1; /* unknown */
 272        return ifs;
 273}
 274
 275/*****************************************************************
 276 *
 277 * Loose object stream
 278 *
 279 *****************************************************************/
 280
 281static read_method_decl(loose)
 282{
 283        size_t total_read = 0;
 284
 285        switch (st->z_state) {
 286        case z_done:
 287                return 0;
 288        case z_error:
 289                return -1;
 290        default:
 291                break;
 292        }
 293
 294        if (st->u.loose.hdr_used < st->u.loose.hdr_avail) {
 295                size_t to_copy = st->u.loose.hdr_avail - st->u.loose.hdr_used;
 296                if (sz < to_copy)
 297                        to_copy = sz;
 298                memcpy(buf, st->u.loose.hdr + st->u.loose.hdr_used, to_copy);
 299                st->u.loose.hdr_used += to_copy;
 300                total_read += to_copy;
 301        }
 302
 303        while (total_read < sz) {
 304                int status;
 305
 306                st->z.next_out = (unsigned char *)buf + total_read;
 307                st->z.avail_out = sz - total_read;
 308                status = git_inflate(&st->z, Z_FINISH);
 309
 310                total_read = st->z.next_out - (unsigned char *)buf;
 311
 312                if (status == Z_STREAM_END) {
 313                        git_inflate_end(&st->z);
 314                        st->z_state = z_done;
 315                        break;
 316                }
 317                if (status != Z_OK && (status != Z_BUF_ERROR || total_read < sz)) {
 318                        git_inflate_end(&st->z);
 319                        st->z_state = z_error;
 320                        return -1;
 321                }
 322        }
 323        return total_read;
 324}
 325
 326static close_method_decl(loose)
 327{
 328        close_deflated_stream(st);
 329        munmap(st->u.loose.mapped, st->u.loose.mapsize);
 330        return 0;
 331}
 332
 333static struct stream_vtbl loose_vtbl = {
 334        close_istream_loose,
 335        read_istream_loose,
 336};
 337
 338static open_method_decl(loose)
 339{
 340        st->u.loose.mapped = map_sha1_file(the_repository,
 341                                           oid->hash, &st->u.loose.mapsize);
 342        if (!st->u.loose.mapped)
 343                return -1;
 344        if ((unpack_sha1_header(&st->z,
 345                                st->u.loose.mapped,
 346                                st->u.loose.mapsize,
 347                                st->u.loose.hdr,
 348                                sizeof(st->u.loose.hdr)) < 0) ||
 349            (parse_sha1_header(st->u.loose.hdr, &st->size) < 0)) {
 350                git_inflate_end(&st->z);
 351                munmap(st->u.loose.mapped, st->u.loose.mapsize);
 352                return -1;
 353        }
 354
 355        st->u.loose.hdr_used = strlen(st->u.loose.hdr) + 1;
 356        st->u.loose.hdr_avail = st->z.total_out;
 357        st->z_state = z_used;
 358
 359        st->vtbl = &loose_vtbl;
 360        return 0;
 361}
 362
 363
 364/*****************************************************************
 365 *
 366 * Non-delta packed object stream
 367 *
 368 *****************************************************************/
 369
 370static read_method_decl(pack_non_delta)
 371{
 372        size_t total_read = 0;
 373
 374        switch (st->z_state) {
 375        case z_unused:
 376                memset(&st->z, 0, sizeof(st->z));
 377                git_inflate_init(&st->z);
 378                st->z_state = z_used;
 379                break;
 380        case z_done:
 381                return 0;
 382        case z_error:
 383                return -1;
 384        case z_used:
 385                break;
 386        }
 387
 388        while (total_read < sz) {
 389                int status;
 390                struct pack_window *window = NULL;
 391                unsigned char *mapped;
 392
 393                mapped = use_pack(st->u.in_pack.pack, &window,
 394                                  st->u.in_pack.pos, &st->z.avail_in);
 395
 396                st->z.next_out = (unsigned char *)buf + total_read;
 397                st->z.avail_out = sz - total_read;
 398                st->z.next_in = mapped;
 399                status = git_inflate(&st->z, Z_FINISH);
 400
 401                st->u.in_pack.pos += st->z.next_in - mapped;
 402                total_read = st->z.next_out - (unsigned char *)buf;
 403                unuse_pack(&window);
 404
 405                if (status == Z_STREAM_END) {
 406                        git_inflate_end(&st->z);
 407                        st->z_state = z_done;
 408                        break;
 409                }
 410                if (status != Z_OK && status != Z_BUF_ERROR) {
 411                        git_inflate_end(&st->z);
 412                        st->z_state = z_error;
 413                        return -1;
 414                }
 415        }
 416        return total_read;
 417}
 418
 419static close_method_decl(pack_non_delta)
 420{
 421        close_deflated_stream(st);
 422        return 0;
 423}
 424
 425static struct stream_vtbl pack_non_delta_vtbl = {
 426        close_istream_pack_non_delta,
 427        read_istream_pack_non_delta,
 428};
 429
 430static open_method_decl(pack_non_delta)
 431{
 432        struct pack_window *window;
 433        enum object_type in_pack_type;
 434
 435        st->u.in_pack.pack = oi->u.packed.pack;
 436        st->u.in_pack.pos = oi->u.packed.offset;
 437        window = NULL;
 438
 439        in_pack_type = unpack_object_header(st->u.in_pack.pack,
 440                                            &window,
 441                                            &st->u.in_pack.pos,
 442                                            &st->size);
 443        unuse_pack(&window);
 444        switch (in_pack_type) {
 445        default:
 446                return -1; /* we do not do deltas for now */
 447        case OBJ_COMMIT:
 448        case OBJ_TREE:
 449        case OBJ_BLOB:
 450        case OBJ_TAG:
 451                break;
 452        }
 453        st->z_state = z_unused;
 454        st->vtbl = &pack_non_delta_vtbl;
 455        return 0;
 456}
 457
 458
 459/*****************************************************************
 460 *
 461 * In-core stream
 462 *
 463 *****************************************************************/
 464
 465static close_method_decl(incore)
 466{
 467        free(st->u.incore.buf);
 468        return 0;
 469}
 470
 471static read_method_decl(incore)
 472{
 473        size_t read_size = sz;
 474        size_t remainder = st->size - st->u.incore.read_ptr;
 475
 476        if (remainder <= read_size)
 477                read_size = remainder;
 478        if (read_size) {
 479                memcpy(buf, st->u.incore.buf + st->u.incore.read_ptr, read_size);
 480                st->u.incore.read_ptr += read_size;
 481        }
 482        return read_size;
 483}
 484
 485static struct stream_vtbl incore_vtbl = {
 486        close_istream_incore,
 487        read_istream_incore,
 488};
 489
 490static open_method_decl(incore)
 491{
 492        st->u.incore.buf = read_object_file_extended(oid, type, &st->size, 0);
 493        st->u.incore.read_ptr = 0;
 494        st->vtbl = &incore_vtbl;
 495
 496        return st->u.incore.buf ? 0 : -1;
 497}
 498
 499
 500/****************************************************************
 501 * Users of streaming interface
 502 ****************************************************************/
 503
 504int stream_blob_to_fd(int fd, const struct object_id *oid, struct stream_filter *filter,
 505                      int can_seek)
 506{
 507        struct git_istream *st;
 508        enum object_type type;
 509        unsigned long sz;
 510        ssize_t kept = 0;
 511        int result = -1;
 512
 513        st = open_istream(oid, &type, &sz, filter);
 514        if (!st) {
 515                if (filter)
 516                        free_stream_filter(filter);
 517                return result;
 518        }
 519        if (type != OBJ_BLOB)
 520                goto close_and_exit;
 521        for (;;) {
 522                char buf[1024 * 16];
 523                ssize_t wrote, holeto;
 524                ssize_t readlen = read_istream(st, buf, sizeof(buf));
 525
 526                if (readlen < 0)
 527                        goto close_and_exit;
 528                if (!readlen)
 529                        break;
 530                if (can_seek && sizeof(buf) == readlen) {
 531                        for (holeto = 0; holeto < readlen; holeto++)
 532                                if (buf[holeto])
 533                                        break;
 534                        if (readlen == holeto) {
 535                                kept += holeto;
 536                                continue;
 537                        }
 538                }
 539
 540                if (kept && lseek(fd, kept, SEEK_CUR) == (off_t) -1)
 541                        goto close_and_exit;
 542                else
 543                        kept = 0;
 544                wrote = write_in_full(fd, buf, readlen);
 545
 546                if (wrote < 0)
 547                        goto close_and_exit;
 548        }
 549        if (kept && (lseek(fd, kept - 1, SEEK_CUR) == (off_t) -1 ||
 550                     xwrite(fd, "", 1) != 1))
 551                goto close_and_exit;
 552        result = 0;
 553
 554 close_and_exit:
 555        close_istream(st);
 556        return result;
 557}