Add streaming filter API
[gitweb.git] / streaming.c
index 4fdd567a36a3f6b22a934838b46f6b2cd71fc3ec..565f000790b482a8977a36ca9a9cfcbd633f056d 100644 (file)
@@ -41,6 +41,9 @@ struct stream_vtbl {
 static open_method_decl(incore);
 static open_method_decl(loose);
 static open_method_decl(pack_non_delta);
+static struct git_istream *attach_stream_filter(struct git_istream *st,
+                                               struct stream_filter *filter);
+
 
 static open_istream_fn open_istream_tbl[] = {
        open_istream_incore,
@@ -48,6 +51,17 @@ static open_istream_fn open_istream_tbl[] = {
        open_istream_pack_non_delta,
 };
 
+#define FILTER_BUFFER (1024*16)
+
+struct filtered_istream {
+       struct git_istream *upstream;
+       struct stream_filter *filter;
+       char ibuf[FILTER_BUFFER];
+       char obuf[FILTER_BUFFER];
+       int i_end, i_ptr;
+       int o_end, o_ptr;
+};
+
 struct git_istream {
        const struct stream_vtbl *vtbl;
        unsigned long size; /* inflated size of full object */
@@ -61,14 +75,19 @@ struct git_istream {
                } incore;
 
                struct {
-                       int fd; /* open for reading */
-                       /* NEEDSWORK: what else? */
+                       void *mapped;
+                       unsigned long mapsize;
+                       char hdr[32];
+                       int hdr_avail;
+                       int hdr_used;
                } loose;
 
                struct {
                        struct packed_git *pack;
                        off_t pos;
                } in_pack;
+
+               struct filtered_istream filtered;
        } u;
 };
 
@@ -109,7 +128,8 @@ static enum input_source istream_source(const unsigned char *sha1,
 
 struct git_istream *open_istream(const unsigned char *sha1,
                                 enum object_type *type,
-                                unsigned long *size)
+                                unsigned long *size,
+                                struct stream_filter *filter)
 {
        struct git_istream *st;
        struct object_info oi;
@@ -126,6 +146,14 @@ struct git_istream *open_istream(const unsigned char *sha1,
                        return NULL;
                }
        }
+       if (st && filter) {
+               /* Add "&& !is_null_stream_filter(filter)" for performance */
+               struct git_istream *nst = attach_stream_filter(st, filter);
+               if (!nst)
+                       close_istream(st);
+               st = nst;
+       }
+
        *size = st->size;
        return st;
 }
@@ -144,15 +172,164 @@ static void close_deflated_stream(struct git_istream *st)
 }
 
 
+/*****************************************************************
+ *
+ * Filtered stream
+ *
+ *****************************************************************/
+
+static close_method_decl(filtered)
+{
+       free_stream_filter(st->u.filtered.filter);
+       return close_istream(st->u.filtered.upstream);
+}
+
+static read_method_decl(filtered)
+{
+       struct filtered_istream *fs = &(st->u.filtered);
+       size_t filled = 0;
+
+       while (sz) {
+               /* do we already have filtered output? */
+               if (fs->o_ptr < fs->o_end) {
+                       size_t to_move = fs->o_end - fs->o_ptr;
+                       if (sz < to_move)
+                               to_move = sz;
+                       memcpy(buf + filled, fs->obuf + fs->o_ptr, to_move);
+                       fs->o_ptr += to_move;
+                       sz -= to_move;
+                       filled += to_move;
+                       continue;
+               }
+               fs->o_end = fs->o_ptr = 0;
+
+               /* do we have anything to feed the filter with? */
+               if (fs->i_ptr < fs->i_end) {
+                       size_t to_feed = fs->i_end - fs->i_ptr;
+                       size_t to_receive = FILTER_BUFFER;
+                       if (stream_filter(fs->filter,
+                                         fs->ibuf + fs->i_ptr, &to_feed,
+                                         fs->obuf, &to_receive))
+                               return -1;
+                       fs->i_ptr = fs->i_end - to_feed;
+                       fs->o_end = FILTER_BUFFER - to_receive;
+                       continue;
+               }
+               fs->i_end = fs->i_ptr = 0;
+
+               /* refill the input from the upstream */
+               fs->i_end = read_istream(fs->upstream, fs->ibuf, FILTER_BUFFER);
+               if (fs->i_end <= 0)
+                       break;
+       }
+       return filled;
+}
+
+static struct stream_vtbl filtered_vtbl = {
+       close_istream_filtered,
+       read_istream_filtered,
+};
+
+static struct git_istream *attach_stream_filter(struct git_istream *st,
+                                               struct stream_filter *filter)
+{
+       struct git_istream *ifs = xmalloc(sizeof(*ifs));
+       struct filtered_istream *fs = &(ifs->u.filtered);
+
+       ifs->vtbl = &filtered_vtbl;
+       fs->upstream = st;
+       fs->filter = filter;
+       fs->i_end = fs->i_ptr = 0;
+       fs->o_end = fs->o_ptr = 0;
+       ifs->size = -1; /* unknown */
+       return ifs;
+}
+
 /*****************************************************************
  *
  * Loose object stream
  *
  *****************************************************************/
 
+static read_method_decl(loose)
+{
+       size_t total_read = 0;
+
+       switch (st->z_state) {
+       case z_done:
+               return 0;
+       case z_error:
+               return -1;
+       default:
+               break;
+       }
+
+       if (st->u.loose.hdr_used < st->u.loose.hdr_avail) {
+               size_t to_copy = st->u.loose.hdr_avail - st->u.loose.hdr_used;
+               if (sz < to_copy)
+                       to_copy = sz;
+               memcpy(buf, st->u.loose.hdr + st->u.loose.hdr_used, to_copy);
+               st->u.loose.hdr_used += to_copy;
+               total_read += to_copy;
+       }
+
+       while (total_read < sz) {
+               int status;
+
+               st->z.next_out = (unsigned char *)buf + total_read;
+               st->z.avail_out = sz - total_read;
+               status = git_inflate(&st->z, Z_FINISH);
+
+               total_read = st->z.next_out - (unsigned char *)buf;
+
+               if (status == Z_STREAM_END) {
+                       git_inflate_end(&st->z);
+                       st->z_state = z_done;
+                       break;
+               }
+               if (status != Z_OK && status != Z_BUF_ERROR) {
+                       git_inflate_end(&st->z);
+                       st->z_state = z_error;
+                       return -1;
+               }
+       }
+       return total_read;
+}
+
+static close_method_decl(loose)
+{
+       close_deflated_stream(st);
+       munmap(st->u.loose.mapped, st->u.loose.mapsize);
+       return 0;
+}
+
+static struct stream_vtbl loose_vtbl = {
+       close_istream_loose,
+       read_istream_loose,
+};
+
 static open_method_decl(loose)
 {
-       return -1; /* for now */
+       st->u.loose.mapped = map_sha1_file(sha1, &st->u.loose.mapsize);
+       if (!st->u.loose.mapped)
+               return -1;
+       if (unpack_sha1_header(&st->z,
+                              st->u.loose.mapped,
+                              st->u.loose.mapsize,
+                              st->u.loose.hdr,
+                              sizeof(st->u.loose.hdr)) < 0) {
+               git_inflate_end(&st->z);
+               munmap(st->u.loose.mapped, st->u.loose.mapsize);
+               return -1;
+       }
+
+       parse_sha1_header(st->u.loose.hdr, &st->size);
+       st->u.loose.hdr_used = strlen(st->u.loose.hdr) + 1;
+       st->u.loose.hdr_avail = st->z.total_out;
+       st->z_state = z_used;
+
+       st->vtbl = &loose_vtbl;
+       return 0;
 }