. payloads using multipart/byteranges (http://tools.ietf.org/html/rfc2616#section-19.2)
*)
+type payload_callback = string -> (* offset *) int -> (* length *) int -> (* final *) bool -> unit
+
module Payload = struct
type cursor =
| Start_chunk_length
| Length of int64
| Connection_close
+ (* if a payload callback is registered, this governs the max
+ amount of buffered data before the callback is called.
+ *)
+ let max_buffered_size = 2048
+
type state =
{
mutable cursor: cursor;
mutable remaining_length: int64;
max_payload_length: int64;
mutable body: Buffer.t;
+ payload_callback: payload_callback option;
mutable headers: header_fields;
mutable num_bytes_parsed: int64
}
| Payload of state
| Error of string
- let init_default_state max_payload_length =
+ let init_default_state payload_callback max_payload_length =
{
cursor = In_body;
content_length = Connection_close;
remaining_length = -1L;
max_payload_length = max_payload_length;
body = Buffer.create 512;
+ payload_callback = payload_callback;
headers = [];
num_bytes_parsed = 0L
}
let default_max_payload_length = Int64.of_int (10*1024*1024)
- let init_from_request ?(max_payload_length=default_max_payload_length) req =
+ let init_from_request ?payload_callback ?(max_payload_length=default_max_payload_length) req =
let version = req.Request_header.version in
let meth = req.Request_header.meth in
let hdrs = req.Request_header.headers in
let chunked = transfer_encoding_uses_chunked hdrs in
let content_length = get_content_length version hdrs in
let multipart_body = content_type_is_multipart_byteranges hdrs in
- let default = init_default_state max_payload_length in
+ let default = init_default_state payload_callback max_payload_length in
match version, meth, content_length, chunked, multipart_body with
| HTTP09, _, _, _, _
(* Default to assuming that the payload is terminated by a Connection:close. *)
| _ -> Payload default
- let init_from_response ?(max_payload_length=default_max_payload_length) resp =
+ let init_from_response ?payload_callback ?(max_payload_length=default_max_payload_length) resp =
let version = resp.Response_header.version in
let status_code = resp.Response_header.status_code in
let hdrs = resp.Response_header.headers in
let chunked = transfer_encoding_uses_chunked hdrs in
let content_length = get_content_length version hdrs in
let multipart_body = content_type_is_multipart_byteranges hdrs in
- let default = init_default_state max_payload_length in
+ let default = init_default_state payload_callback max_payload_length in
match status_code, content_length, chunked, multipart_body with
| sc, _, _, _
(* Default to assuming that the payload is terminated by a Connection:close. *)
| _ -> Payload default
+ let check_payload_callback s final =
+ match s.payload_callback with
+ | None -> ()
+ | Some f when Buffer.length s.body >= max_buffered_size ->
+ let content = Buffer.contents s.body in
+ Buffer.clear s.body;
+ f content 0 (String.length content) final
+ | _ -> ()
+
let parse_char s c =
dbg "parsing %C in state %s...\n" c (string_of_cursor s.cursor);
let raise_bad_char () = raise_error (Parse_error (s.cursor, c)) in
match s.cursor with
| In_body ->
+ check_payload_callback s false;
Buffer.add_char s.body c;
if s.remaining_length > 0L then begin
s.remaining_length <- Int64.pred s.remaining_length;
- if s.remaining_length = 0L then s.cursor <- Done
+ if s.remaining_length = 0L then begin
+ s.cursor <- Done;
+ check_payload_callback s true
+ end
end
| Start_chunk_length ->
if is_hex c then begin
| _ -> raise_bad_char ()
)
else begin
+ check_payload_callback s false;
Buffer.add_char s.body c;
s.remaining_length <- Int64.pred s.remaining_length
end
Headers.parse_char hs c;
if Headers.is_done hs then begin
s.headers <- List.rev hs.Headers.headers;
- s.cursor <- Done
+ s.cursor <- Done;
+ check_payload_callback s true
end
| Done -> raise_error (Internal_error "parse called on finished request!")
parse_substring state str 0 (String.length str)
let connection_closed state =
- if state.content_length = Connection_close then
- state.cursor <- Done
+ if state.content_length = Connection_close then begin
+ state.cursor <- Done;
+ check_payload_callback state true
+ end
let serialize ~chunked buf payload =
if chunked then begin
mutable s_request: Request_header.t option;
mutable num_bytes_parsed: int64;
header_callback: header_callback option;
+ payload_callback: payload_callback option;
}
type error =
let raise_error err = raise (Http_error err)
- let init_state ?header_callback () =
+ let init_state ?header_callback ?payload_callback () =
{
cursor = In_request_header (Request_header.init_state ());
s_request = None;
num_bytes_parsed = 0L;
- header_callback = header_callback
+ header_callback = header_callback;
+ payload_callback = payload_callback
}
let num_bytes_parsed s =
| Request_header.Result (v, consumed) ->
let v = (match state.header_callback with
| None -> v
- | Some f -> f v
+ | Some f -> f v
) in
state.s_request <- Some v;
state.num_bytes_parsed <- Int64.of_int (Request_header.num_bytes_parsed rs);
- (match Payload.init_from_request v with
+ (match Payload.init_from_request ?payload_callback:state.payload_callback v with
| Payload.No_payload ->
state.cursor <- Done;
Result ({ request = v; payload = None }, consumed)
mutable s_response: Response_header.t option;
mutable num_bytes_parsed: int64;
header_callback: header_callback option;
+ payload_callback: payload_callback option
}
type error =
let raise_error err = raise (Http_error err)
- let init_state ?header_callback () =
+ let init_state ?header_callback ?payload_callback () =
{
cursor = In_response_header (Response_header.init_state ());
s_response = None;
num_bytes_parsed = 0L;
header_callback = header_callback;
+ payload_callback = payload_callback
}
let num_bytes_parsed s =
| Response_header.Result (v, consumed) ->
let v = (match state.header_callback with
| None -> v
- | Some f -> f v
+ | Some f -> f v
) in
state.s_response <- Some v;
state.num_bytes_parsed <- Int64.of_int (Response_header.num_bytes_parsed rs);
- (match Payload.init_from_response v with
+ (match Payload.init_from_response ?payload_callback:state.payload_callback v with
| Payload.No_payload ->
state.cursor <- Done;
Result ({ response = v; payload = None }, consumed)