From 5534ba31bf533e0e3bd57af3e5414e369d580eeb Mon Sep 17 00:00:00 2001 From: Prashanth Mundkur Date: Tue, 19 May 2009 15:51:53 -0700 Subject: [PATCH] add a payload callback to handle large direct-to-file payloads --- libs/http/http.ml | 62 ++++++++++++++++++++++++++++++++++------------ libs/http/http.mli | 12 ++++++--- 2 files changed, 54 insertions(+), 20 deletions(-) diff --git a/libs/http/http.ml b/libs/http/http.ml index c1ecad4..088f7e9 100644 --- a/libs/http/http.ml +++ b/libs/http/http.ml @@ -690,6 +690,8 @@ end . payloads using multipart/byteranges (http://tools.ietf.org/html/rfc2616#section-19.2) *) +type payload_callback = string -> (* offset *) int -> (* length *) int -> (* final *) bool -> unit + module Payload = struct type cursor = | Start_chunk_length @@ -719,6 +721,11 @@ module Payload = struct | Length of int64 | Connection_close + (* if a payload callback is registered, this governs the max + amount of buffered data before the callback is called. + *) + let max_buffered_size = 2048 + type state = { mutable cursor: cursor; @@ -726,6 +733,7 @@ module Payload = struct mutable remaining_length: int64; max_payload_length: int64; mutable body: Buffer.t; + payload_callback: payload_callback option; mutable headers: header_fields; mutable num_bytes_parsed: int64 } @@ -784,27 +792,28 @@ module Payload = struct | Payload of state | Error of string - let init_default_state max_payload_length = + let init_default_state payload_callback max_payload_length = { cursor = In_body; content_length = Connection_close; remaining_length = -1L; max_payload_length = max_payload_length; body = Buffer.create 512; + payload_callback = payload_callback; headers = []; num_bytes_parsed = 0L } let default_max_payload_length = Int64.of_int (10*1024*1024) - let init_from_request ?(max_payload_length=default_max_payload_length) req = + let init_from_request ?payload_callback ?(max_payload_length=default_max_payload_length) req = let version = req.Request_header.version in let meth = req.Request_header.meth in let hdrs = req.Request_header.headers in let chunked = transfer_encoding_uses_chunked hdrs in let content_length = get_content_length version hdrs in let multipart_body = content_type_is_multipart_byteranges hdrs in - let default = init_default_state max_payload_length in + let default = init_default_state payload_callback max_payload_length in match version, meth, content_length, chunked, multipart_body with | HTTP09, _, _, _, _ @@ -830,14 +839,14 @@ module Payload = struct (* Default to assuming that the payload is terminated by a Connection:close. *) | _ -> Payload default - let init_from_response ?(max_payload_length=default_max_payload_length) resp = + let init_from_response ?payload_callback ?(max_payload_length=default_max_payload_length) resp = let version = resp.Response_header.version in let status_code = resp.Response_header.status_code in let hdrs = resp.Response_header.headers in let chunked = transfer_encoding_uses_chunked hdrs in let content_length = get_content_length version hdrs in let multipart_body = content_type_is_multipart_byteranges hdrs in - let default = init_default_state max_payload_length in + let default = init_default_state payload_callback max_payload_length in match status_code, content_length, chunked, multipart_body with | sc, _, _, _ @@ -865,15 +874,28 @@ module Payload = struct (* Default to assuming that the payload is terminated by a Connection:close. *) | _ -> Payload default + let check_payload_callback s final = + match s.payload_callback with + | None -> () + | Some f when Buffer.length s.body >= max_buffered_size -> + let content = Buffer.contents s.body in + Buffer.clear s.body; + f content 0 (String.length content) final + | _ -> () + let parse_char s c = dbg "parsing %C in state %s...\n" c (string_of_cursor s.cursor); let raise_bad_char () = raise_error (Parse_error (s.cursor, c)) in match s.cursor with | In_body -> + check_payload_callback s false; Buffer.add_char s.body c; if s.remaining_length > 0L then begin s.remaining_length <- Int64.pred s.remaining_length; - if s.remaining_length = 0L then s.cursor <- Done + if s.remaining_length = 0L then begin + s.cursor <- Done; + check_payload_callback s true + end end | Start_chunk_length -> if is_hex c then begin @@ -920,6 +942,7 @@ module Payload = struct | _ -> raise_bad_char () ) else begin + check_payload_callback s false; Buffer.add_char s.body c; s.remaining_length <- Int64.pred s.remaining_length end @@ -930,7 +953,8 @@ module Payload = struct Headers.parse_char hs c; if Headers.is_done hs then begin s.headers <- List.rev hs.Headers.headers; - s.cursor <- Done + s.cursor <- Done; + check_payload_callback s true end | Done -> raise_error (Internal_error "parse called on finished request!") @@ -967,8 +991,10 @@ module Payload = struct parse_substring state str 0 (String.length str) let connection_closed state = - if state.content_length = Connection_close then - state.cursor <- Done + if state.content_length = Connection_close then begin + state.cursor <- Done; + check_payload_callback state true + end let serialize ~chunked buf payload = if chunked then begin @@ -1005,6 +1031,7 @@ module Request = struct mutable s_request: Request_header.t option; mutable num_bytes_parsed: int64; header_callback: header_callback option; + payload_callback: payload_callback option; } type error = @@ -1018,12 +1045,13 @@ module Request = struct let raise_error err = raise (Http_error err) - let init_state ?header_callback () = + let init_state ?header_callback ?payload_callback () = { cursor = In_request_header (Request_header.init_state ()); s_request = None; num_bytes_parsed = 0L; - header_callback = header_callback + header_callback = header_callback; + payload_callback = payload_callback } let num_bytes_parsed s = @@ -1050,11 +1078,11 @@ module Request = struct | Request_header.Result (v, consumed) -> let v = (match state.header_callback with | None -> v - | Some f -> f v + | Some f -> f v ) in state.s_request <- Some v; state.num_bytes_parsed <- Int64.of_int (Request_header.num_bytes_parsed rs); - (match Payload.init_from_request v with + (match Payload.init_from_request ?payload_callback:state.payload_callback v with | Payload.No_payload -> state.cursor <- Done; Result ({ request = v; payload = None }, consumed) @@ -1113,6 +1141,7 @@ module Response = struct mutable s_response: Response_header.t option; mutable num_bytes_parsed: int64; header_callback: header_callback option; + payload_callback: payload_callback option } type error = @@ -1126,12 +1155,13 @@ module Response = struct let raise_error err = raise (Http_error err) - let init_state ?header_callback () = + let init_state ?header_callback ?payload_callback () = { cursor = In_response_header (Response_header.init_state ()); s_response = None; num_bytes_parsed = 0L; header_callback = header_callback; + payload_callback = payload_callback } let num_bytes_parsed s = @@ -1158,11 +1188,11 @@ module Response = struct | Response_header.Result (v, consumed) -> let v = (match state.header_callback with | None -> v - | Some f -> f v + | Some f -> f v ) in state.s_response <- Some v; state.num_bytes_parsed <- Int64.of_int (Response_header.num_bytes_parsed rs); - (match Payload.init_from_response v with + (match Payload.init_from_response ?payload_callback:state.payload_callback v with | Payload.No_payload -> state.cursor <- Done; Result ({ response = v; payload = None }, consumed) diff --git a/libs/http/http.mli b/libs/http/http.mli index 4c0f3e9..cc55cfb 100644 --- a/libs/http/http.mli +++ b/libs/http/http.mli @@ -73,6 +73,8 @@ module Response_header : sig val serialize : Buffer.t -> t -> unit end +type payload_callback = string -> (* offset *) int -> (* length *) int -> (* final *) bool -> unit + module Payload : sig type state val num_bytes_parsed : state -> int64 @@ -83,9 +85,11 @@ module Payload : sig | Error of string val init_from_response: - ?max_payload_length:int64 -> Response_header.t -> payload_type + ?payload_callback:payload_callback -> ?max_payload_length:int64 + -> Response_header.t -> payload_type val init_from_request: - ?max_payload_length:int64 -> Request_header.t -> payload_type + ?payload_callback:payload_callback -> ?max_payload_length:int64 + -> Request_header.t -> payload_type type error val string_of_error : error -> string @@ -114,7 +118,7 @@ module Request : sig type state type header_callback = Request_header.t -> Request_header.t - val init_state : ?header_callback:header_callback -> unit -> state + val init_state : ?header_callback:header_callback -> ?payload_callback:payload_callback -> unit -> state val num_bytes_parsed : state -> int64 type error @@ -142,7 +146,7 @@ module Response : sig type state type header_callback = Response_header.t -> Response_header.t - val init_state : ?header_callback:header_callback -> unit -> state + val init_state : ?header_callback:header_callback -> ?payload_callback:payload_callback -> unit -> state val num_bytes_parsed : state -> int64 type error = Internal_error of string -- 2.39.5