]> xenbits.xensource.com Git - xenclient/toolstack.git/commitdiff
add a payload callback to handle large direct-to-file payloads
authorPrashanth Mundkur <prashanth.mundkur@citrix.com>
Tue, 19 May 2009 22:51:53 +0000 (15:51 -0700)
committerPrashanth Mundkur <prashanth.mundkur@citrix.com>
Tue, 19 May 2009 22:51:53 +0000 (15:51 -0700)
libs/http/http.ml
libs/http/http.mli

index c1ecad4f99e5568546496f106e5d0d1565bfa5c3..088f7e9a1e868ca1f517d03b9d28744ccf051442 100644 (file)
@@ -690,6 +690,8 @@ end
    . payloads using multipart/byteranges (http://tools.ietf.org/html/rfc2616#section-19.2)
 *)
 
+type payload_callback = string -> (* offset *) int -> (* length *) int -> (* final *) bool -> unit
+
 module Payload = struct
        type cursor =
                | Start_chunk_length
@@ -719,6 +721,11 @@ module Payload = struct
                | Length of int64
                | Connection_close
 
+       (* if a payload callback is registered, this governs the max
+          amount of buffered data before the callback is called.
+       *)
+       let max_buffered_size = 2048
+
        type state =
        {
                mutable cursor: cursor;
@@ -726,6 +733,7 @@ module Payload = struct
                mutable remaining_length: int64;
                max_payload_length: int64;
                mutable body: Buffer.t;
+               payload_callback: payload_callback option;
                mutable headers: header_fields;
                mutable num_bytes_parsed: int64
        }
@@ -784,27 +792,28 @@ module Payload = struct
                | Payload of state
                | Error of string
 
-       let init_default_state max_payload_length =
+       let init_default_state payload_callback max_payload_length =
        {
                cursor = In_body;
                content_length = Connection_close;
                remaining_length = -1L;
                max_payload_length = max_payload_length;
                body = Buffer.create 512;
+               payload_callback = payload_callback;
                headers = [];
                num_bytes_parsed = 0L
        }
 
        let default_max_payload_length = Int64.of_int (10*1024*1024)
 
-       let init_from_request ?(max_payload_length=default_max_payload_length) req =
+       let init_from_request ?payload_callback ?(max_payload_length=default_max_payload_length) req =
                let version = req.Request_header.version in
                let meth = req.Request_header.meth in
                let hdrs = req.Request_header.headers in
                let chunked = transfer_encoding_uses_chunked hdrs in
                let content_length = get_content_length version hdrs in
                let multipart_body = content_type_is_multipart_byteranges hdrs in
-               let default = init_default_state max_payload_length in
+               let default = init_default_state payload_callback max_payload_length in
 
                match version, meth, content_length, chunked, multipart_body with
                | HTTP09, _, _, _, _
@@ -830,14 +839,14 @@ module Payload = struct
                (* Default to assuming that the payload is terminated by a Connection:close. *)
                | _                             -> Payload default
 
-       let init_from_response ?(max_payload_length=default_max_payload_length) resp =
+       let init_from_response ?payload_callback ?(max_payload_length=default_max_payload_length) resp =
                let version = resp.Response_header.version in
                let status_code = resp.Response_header.status_code in
                let hdrs = resp.Response_header.headers in
                let chunked = transfer_encoding_uses_chunked hdrs in
                let content_length = get_content_length version hdrs in
                let multipart_body = content_type_is_multipart_byteranges hdrs in
-               let default = init_default_state max_payload_length in
+               let default = init_default_state payload_callback max_payload_length in
 
                match status_code, content_length, chunked, multipart_body with
                | sc, _, _, _
@@ -865,15 +874,28 @@ module Payload = struct
                (* Default to assuming that the payload is terminated by a Connection:close. *)
                | _ ->                 Payload default
 
+       let check_payload_callback s final =
+               match s.payload_callback with
+               | None -> ()
+               | Some f when Buffer.length s.body >= max_buffered_size ->
+                       let content = Buffer.contents s.body in
+                       Buffer.clear s.body;
+                       f content 0 (String.length content) final
+               | _ -> ()
+
        let parse_char s c =
                dbg "parsing %C in state %s...\n" c (string_of_cursor s.cursor);
                let raise_bad_char () = raise_error (Parse_error (s.cursor, c)) in
                match s.cursor with
                | In_body ->
+                       check_payload_callback s false;
                        Buffer.add_char s.body c;
                        if s.remaining_length > 0L then begin
                                s.remaining_length <- Int64.pred s.remaining_length;
-                               if s.remaining_length = 0L then s.cursor <- Done
+                               if s.remaining_length = 0L then begin
+                                       s.cursor <- Done;
+                                       check_payload_callback s true
+                               end
                        end
                | Start_chunk_length ->
                        if is_hex c then begin
@@ -920,6 +942,7 @@ module Payload = struct
                                 | _    -> raise_bad_char ()
                                )
                        else begin
+                               check_payload_callback s false;
                                Buffer.add_char s.body c;
                                s.remaining_length <- Int64.pred s.remaining_length
                        end
@@ -930,7 +953,8 @@ module Payload = struct
                        Headers.parse_char hs c;
                        if Headers.is_done hs then begin
                                s.headers <- List.rev hs.Headers.headers;
-                               s.cursor <- Done
+                               s.cursor <- Done;
+                               check_payload_callback s true
                        end
                | Done -> raise_error (Internal_error "parse called on finished request!")
 
@@ -967,8 +991,10 @@ module Payload = struct
                parse_substring state str 0 (String.length str)
 
        let connection_closed state =
-               if state.content_length = Connection_close then
-                       state.cursor <- Done
+               if state.content_length = Connection_close then begin
+                       state.cursor <- Done;
+                       check_payload_callback state true
+               end
 
        let serialize ~chunked buf payload =
                if chunked then begin
@@ -1005,6 +1031,7 @@ module Request = struct
                mutable s_request: Request_header.t option;
                mutable num_bytes_parsed: int64;
                header_callback: header_callback option;
+               payload_callback: payload_callback option;
        }
 
        type error =
@@ -1018,12 +1045,13 @@ module Request = struct
 
        let raise_error err = raise (Http_error err)
 
-       let init_state ?header_callback () =
+       let init_state ?header_callback ?payload_callback () =
        {
                cursor = In_request_header (Request_header.init_state ());
                s_request = None;
                num_bytes_parsed = 0L;
-               header_callback = header_callback
+               header_callback = header_callback;
+               payload_callback = payload_callback
        }
 
        let num_bytes_parsed s =
@@ -1050,11 +1078,11 @@ module Request = struct
                         | Request_header.Result (v, consumed) ->
                                let v = (match state.header_callback with
                                         | None -> v
-                                        | Some f -> f v 
+                                        | Some f -> f v
                                        ) in
                                state.s_request <- Some v;
                                state.num_bytes_parsed <- Int64.of_int (Request_header.num_bytes_parsed rs);
-                               (match Payload.init_from_request v with
+                               (match Payload.init_from_request ?payload_callback:state.payload_callback v with
                                 | Payload.No_payload ->
                                        state.cursor <- Done;
                                        Result ({ request = v; payload = None }, consumed)
@@ -1113,6 +1141,7 @@ module Response = struct
                mutable s_response: Response_header.t option;
                mutable num_bytes_parsed: int64;
                header_callback: header_callback option;
+               payload_callback: payload_callback option
        }
 
        type error =
@@ -1126,12 +1155,13 @@ module Response = struct
 
        let raise_error err = raise (Http_error err)
 
-       let init_state ?header_callback () =
+       let init_state ?header_callback ?payload_callback () =
        {
                cursor = In_response_header (Response_header.init_state ());
                s_response = None;
                num_bytes_parsed = 0L;
                header_callback = header_callback;
+               payload_callback = payload_callback
        }
 
        let num_bytes_parsed s =
@@ -1158,11 +1188,11 @@ module Response = struct
                         | Response_header.Result (v, consumed) ->
                                let v = (match state.header_callback with
                                         | None -> v
-                                        | Some f -> f v 
+                                        | Some f -> f v
                                        ) in
                                state.s_response <- Some v;
                                state.num_bytes_parsed <- Int64.of_int (Response_header.num_bytes_parsed rs);
-                               (match Payload.init_from_response v with
+                               (match Payload.init_from_response ?payload_callback:state.payload_callback v with
                                 | Payload.No_payload ->
                                        state.cursor <- Done;
                                        Result ({ response = v; payload = None }, consumed)
index 4c0f3e9cfd03beaa59fc8e85099ce26a52b465b2..cc55cfbe040b8e0dc9c39b1db81e39f7610c0778 100644 (file)
@@ -73,6 +73,8 @@ module Response_header : sig
        val serialize : Buffer.t -> t -> unit
 end
 
+type payload_callback = string -> (* offset *) int -> (* length *) int -> (* final *) bool -> unit
+
 module Payload : sig
        type state
        val num_bytes_parsed : state -> int64
@@ -83,9 +85,11 @@ module Payload : sig
                | Error of string
 
        val init_from_response:
-               ?max_payload_length:int64 -> Response_header.t -> payload_type
+               ?payload_callback:payload_callback -> ?max_payload_length:int64
+               -> Response_header.t -> payload_type
        val init_from_request:
-               ?max_payload_length:int64 -> Request_header.t -> payload_type
+               ?payload_callback:payload_callback -> ?max_payload_length:int64
+               -> Request_header.t -> payload_type
 
        type error
        val string_of_error : error -> string
@@ -114,7 +118,7 @@ module Request : sig
        type state
        type header_callback = Request_header.t -> Request_header.t
 
-       val init_state : ?header_callback:header_callback -> unit -> state
+       val init_state : ?header_callback:header_callback -> ?payload_callback:payload_callback -> unit -> state
        val num_bytes_parsed : state -> int64
 
        type error
@@ -142,7 +146,7 @@ module Response : sig
        type state
        type header_callback = Response_header.t -> Response_header.t
 
-       val init_state : ?header_callback:header_callback -> unit -> state
+       val init_state : ?header_callback:header_callback -> ?payload_callback:payload_callback -> unit -> state
        val num_bytes_parsed : state -> int64
 
        type error = Internal_error of string