From: David Scott Date: Tue, 12 Oct 2010 09:34:27 +0000 (+0100) Subject: CA-44731: rewrite RRD marshalling, unmarshalling code to (i) copy the compact binary... X-Git-Url: http://xenbits.xensource.com/gitweb?a=commitdiff_plain;h=b0b8cfb62b0101ad56195b3c9d8c7a2da037deff;p=xcp%2Fxen-api.git CA-44731: rewrite RRD marshalling, unmarshalling code to (i) copy the compact binary representation; and then (ii) stream the XML directly to an fd without creating a temporary buffer of some kind. This should now work properly when the XML data is > 16MiB Signed-off-by: David Scott --- diff --git a/ocaml/xapi/monitor_rrds.ml b/ocaml/xapi/monitor_rrds.ml index e5865f70..8d2e74e2 100644 --- a/ocaml/xapi/monitor_rrds.ml +++ b/ocaml/xapi/monitor_rrds.ml @@ -92,20 +92,22 @@ open D let step = 5 +let rrd_of_fd fd = + let ic = Unix.in_channel_of_descr fd in + let input = Xmlm.make_input (`Channel ic) in + Rrd.from_xml input (** Helper function - path is the path to the file _without the extension .gz_ *) -let read_gzipped_file_with_fallback path = +let rrd_of_gzip path = let gz_path = path ^ ".gz" in let gz_exists = try Unix.stat gz_path; true with _ -> false in let result = ref "" in if gz_exists then begin Unixext.with_file gz_path [ Unix.O_RDONLY ] 0o0 - (fun fd -> Gzip.decompress_passive fd - (fun fd -> result := Unixext.string_of_fd fd)); - !result + (fun fd -> Gzip.decompress_passive fd rrd_of_fd) end else begin (* If this fails, let the exception propagate *) - Unixext.string_of_file path + Unixext.with_file path [ Unix.O_RDONLY ] 0 rrd_of_fd end @@ -165,11 +167,10 @@ let create_fresh_rrd use_min_max dss = let rrd = Rrd.rrd_create dss rras (Int64.of_int step) (Unix.gettimeofday()) in rrd -let send_rrd address to_archive uuid body = +let send_rrd address to_archive uuid rrd = debug "Sending RRD for object uuid=%s archiving=%b to address: %s" uuid to_archive address; let pool_secret = !Xapi_globs.pool_secret in - let length = Int64.of_int (String.length body) in - let headers = Xapi_http.http_request ~cookie:[ "pool_secret", pool_secret ] ~length + let headers = Xapi_http.http_request_request ~cookie:[ "pool_secret", pool_secret ] Http.Put address (Constants.rrd_put_uri^"?uuid="^uuid^(if to_archive then "&archive=true" else "")) in let st_proc = Xmlrpcclient.get_reusable_stunnel ~write_to_log:Xmlrpcclient.write_to_log address Xapi_globs.default_ssl_port in @@ -178,7 +179,7 @@ let send_rrd address to_archive uuid body = (fun () -> try debug "Sending rrd to %s" address; - let _, _ = Xmlrpcclient.http_rpc_fd fd headers body in + Http_client.rpc fd headers "" (fun _ fd -> Rrd.to_fd rrd fd); debug "Sent" with e -> debug "Caught exception: %s" (ExnHelper.string_of_exn e); @@ -187,7 +188,7 @@ let send_rrd address to_archive uuid body = (fun () -> Stunnel.disconnect st_proc) (* never called with the mutex held *) -let archive_rrd ?(save_stats_locally = Pool_role.is_master ()) uuid body = +let archive_rrd ?(save_stats_locally = Pool_role.is_master ()) uuid rrd = debug "Archiving RRD for object uuid=%s %s" uuid (if save_stats_locally then "to local disk" else "to remote master"); if save_stats_locally then begin try @@ -202,11 +203,7 @@ let archive_rrd ?(save_stats_locally = Pool_role.is_master ()) uuid body = Unixext.mkdir_safe Xapi_globs.xapi_rrd_location 0o755; let base_filename = Xapi_globs.xapi_rrd_location ^ "/" ^ uuid in Unixext.atomic_write_to_file (base_filename ^ ".gz") 0o644 - (fun fd -> - Gzip.compress fd (fun fd -> - let len = String.length body in - let written = Unix.write fd body 0 len in - if written <> len then (failwith "Short write occured!"))); + (fun fd -> Gzip.compress fd (Rrd.to_fd rrd)); Unixext.unlink_safe base_filename (* If there's an uncompressed one hanging around, remove it *) end else begin debug "No local storage: not persisting RRDs" @@ -219,7 +216,7 @@ let archive_rrd ?(save_stats_locally = Pool_role.is_master ()) uuid body = (* Stream it to the master to store, or maybe to a host in the migrate case *) let master_address = Pool_role.get_master_address () in debug "About to send"; - send_rrd master_address true uuid body + send_rrd master_address true uuid rrd end (* This is where we add the update hook that updates the metrics classes every @@ -264,7 +261,7 @@ let send_host_rrd_to_master () = match !host_rrd with | Some rrdi -> debug "sending host RRD to master"; - let rrd = Mutex.execute mutex (fun () -> Rrd.to_string rrdi.rrd) in + let rrd = Mutex.execute mutex (fun () -> Rrd.copy_rrd rrdi.rrd) in archive_rrd (Helpers.get_localhost_uuid ()) ~save_stats_locally:false rrd | None -> () @@ -287,13 +284,13 @@ let backup ?(save_stats_locally=true) () = Mutex.unlock mutex; List.iter (fun (uuid,rrd) -> debug "Backup: saving RRD for VM uuid=%s to local disk" uuid; - let rrd = Mutex.execute mutex (fun () -> Rrd.to_string rrd) in + let rrd = Mutex.execute mutex (fun () -> Rrd.copy_rrd rrd) in archive_rrd uuid ~save_stats_locally rrd) vrrds; match !host_rrd with | Some rrdi -> debug "Backup: saving RRD for host to local disk"; - let rrd = Mutex.execute mutex (fun () -> Rrd.to_string rrdi.rrd) in + let rrd = Mutex.execute mutex (fun () -> Rrd.copy_rrd rrdi.rrd) in archive_rrd (Helpers.get_localhost_uuid ()) ~save_stats_locally rrd | None -> () end else begin @@ -329,7 +326,7 @@ let migrate_push ~__context vm_uuid host = Hashtbl.remove vm_rrds vm_uuid; rrdi) in - send_rrd address false vm_uuid (Rrd.to_string rrdi.rrd) + send_rrd address false vm_uuid rrdi.rrd @@ -338,28 +335,24 @@ let migrate_push ~__context vm_uuid host = let push_rrd ~__context uuid = try let path = Xapi_globs.xapi_rrd_location ^ "/" ^ uuid in - let body = read_gzipped_file_with_fallback path in + let rrd = rrd_of_gzip path in debug "Pushing RRD for VM uuid=%s" uuid; let vm = Db.VM.get_by_uuid ~__context ~uuid in let host = Db.VM.get_resident_on ~__context ~self:vm in if host = Helpers.get_localhost ~__context then begin - let input = Xmlm.make_input (`String (0, body)) in - let rrd = Rrd.from_xml input in Mutex.execute mutex (fun () -> Hashtbl.replace vm_rrds uuid {rrd=rrd; dss=[]}) end else begin (* Host might be OpaqueRef:null, in which case we'll fail silently *) let address = Db.Host.get_address ~__context ~self:host in - send_rrd address false uuid body + send_rrd address false uuid (Rrd.copy_rrd rrd) end with _ -> () (* Load an RRD from the local filesystem. Will return an RRD or throw an exception. *) let load_rrd_from_local_filesystem ~__context uuid = - let path = Xapi_globs.xapi_rrd_location ^ "/" ^ uuid in - let body = read_gzipped_file_with_fallback path in debug "Loading RRD from local filesystem for object uuid=%s" uuid; - let input = Xmlm.make_input (`String (0, body)) in - Rrd.from_xml input + let path = Xapi_globs.xapi_rrd_location ^ "/" ^ uuid in + rrd_of_gzip path (* Fetch an RRD from the master *) let pull_rrd_from_master ~__context uuid is_host = @@ -425,21 +418,20 @@ let load_rrd ~__context uuid is_host = with _ -> () (** Receive handler, for RRDs being pushed onto us *) -exception Uuid_not_found exception Invalid_RRD let receive_handler (req: Http.request) (bio: Buf_io.t) = + debug "Monitor_rrds.receive_handler"; let query = req.Http.query in req.Http.close <- true; - debug "RRD receive handler"; + let fd = Buf_io.fd_of bio in (* fd only used for writing *) if not(List.mem_assoc "uuid" query) then begin error "HTTP request for RRD lacked 'uuid' parameter"; - failwith "Bad request" + Http_svr.headers fd Http.http_400_badrequest; + failwith "Monitor_rrds.receive_handler: Bad request" end; - let fd = Buf_io.fd_of bio in (* fd only used for writing *) Xapi_http.with_context ~dummy:true "Receiving VM rrd" req fd (fun __context -> - try let uuid = List.assoc "uuid" query in (* Check to see if it's a valid uuid for a host or VM *) @@ -449,16 +441,16 @@ let receive_handler (req: Http.request) (bio: Buf_io.t) = with _ -> begin try ignore(Db.Host.get_by_uuid ~__context ~uuid); Host with _ -> - raise Uuid_not_found + Http_svr.headers fd Http.http_404_missing; + failwith (Printf.sprintf "Monitor_rrds.receive_handler: UUID %s neither host nor VM" uuid) end end in + (* Tell client we're good to receive *) + Http_svr.headers fd (Http.http_200_ok ()); (* Now we know what sort of RRD it is, read it in and validate it *) - let body = Http_svr.read_body ~limit:Xapi_globs.http_limit_max_rrd_size req bio in - - let input = Xmlm.make_input (`String (0, body)) in - let rrd = try Rrd.from_xml input with _ -> raise Invalid_RRD in + let rrd = rrd_of_fd fd in (* By now we know it's a valid RRD *) let to_archive = List.mem_assoc "archive" query in @@ -472,21 +464,16 @@ let receive_handler (req: Http.request) (bio: Buf_io.t) = end else begin debug "Receiving RRD for archiving, type=%s" (match ty with Host -> "Host" | VM uuid -> Printf.sprintf "VM uuid=%s" uuid | _ -> "Unknown"); - archive_rrd uuid body + archive_rrd uuid (Rrd.copy_rrd rrd) end; - Http_svr.headers fd (Http.http_200_ok ()) - with - | Invalid_RRD -> - Http_svr.headers fd (Http.http_400_badrequest) - | Uuid_not_found -> - Http_svr.headers fd (Http.http_400_badrequest) + ) (** Send handler, for sending out requested RRDs *) let handler (req: Http.request) s = + debug "Monitor_rrds.handler"; let query = req.Http.query in req.Http.close <- true; - debug "RRD handler"; if not(List.mem_assoc "ref" query) && not(List.mem_assoc "uuid" query) then begin error "HTTP request for RRD lacked 'uuid' parameter"; failwith "Bad request" @@ -496,15 +483,9 @@ let handler (req: Http.request) s = let uuid = List.assoc "uuid" query in (* If the uuid is in our hashtbl, we own the data and we'll just respond with it *) try - let xml = Mutex.execute mutex (fun () -> - let rrd = Hashtbl.find vm_rrds uuid in - Rrd.to_string rrd.rrd) - in - Http_svr.headers s - (Http.http_200_ok_with_content - (Int64.of_int (String.length xml)) - ~version:"HTTP/1.1" ~keep_alive:false ()); - ignore(Unix.write s xml 0 (String.length xml)) + let rrd = Mutex.execute mutex (fun () -> Rrd.copy_rrd (Hashtbl.find vm_rrds uuid).rrd) in + Http_svr.headers s (Http.http_200_ok ~version:"1.0" ~keep_alive:false ()); + Rrd.to_fd rrd s with | Not_found -> (* If we're not the master, redirect to the master *) @@ -533,18 +514,16 @@ let handler (req: Http.request) s = (* it's off, and we're the master, so unarchive the rrd and send it off (if it's there) *) let path = Xapi_globs.xapi_rrd_location ^ "/" ^ uuid in - let body = read_gzipped_file_with_fallback path in - Http_svr.headers s (Http.http_200_ok_with_content - (Int64.of_int (String.length body)) - ~version:"HTTP/1.1" ~keep_alive:false ()); - ignore(Unix.write s body 0 (String.length body)) + let rrd = rrd_of_gzip path in + Http_svr.headers s (Http.http_200_ok ~version:"1.0" ~keep_alive:false ()); + Rrd.to_fd rrd s end) (** Send handler, for sending out requested host RRDs *) let handler_host (req: Http.request) s = + debug "Monitor_rrds.handler_host"; let query = req.Http.query in req.Http.close <- true; - debug "RRD host handler"; Xapi_http.with_context ~dummy:true "Obtaining the Host RRD statistics" req s (fun __context -> (* This is only used by hosts when booting - not for public use! *) @@ -555,24 +534,18 @@ let handler_host (req: Http.request) s = end else begin let uuid = List.assoc "uuid" query in let path = Xapi_globs.xapi_rrd_location ^ "/" ^ uuid in - let body = read_gzipped_file_with_fallback path in - Http_svr.headers s (Http.http_200_ok_with_content - (Int64.of_int (String.length body)) - ~version:"HTTP/1.1" ~keep_alive:false ()); - ignore(Unix.write s body 0 (String.length body)) + let rrd = rrd_of_gzip path in + Http_svr.headers s (Http.http_200_ok ~version:"1.0" ~keep_alive:false ()); + Rrd.to_fd rrd s; end end else begin - let xml = Mutex.execute mutex + let rrd = Mutex.execute mutex (fun () -> debug "Received request for Host RRD"; - let rrd = match !host_rrd with Some rrdi -> rrdi.rrd | None -> failwith "No host RRD available" in - let json = List.mem_assoc "json" query in - Rrd.to_string rrd ~json) in - Http_svr.headers s - (Http.http_200_ok_with_content - (Int64.of_int (String.length xml)) - ~version:"HTTP/1.1" ~keep_alive:false ()); - ignore(Unix.write s xml 0 (String.length xml)) + Rrd.copy_rrd (match !host_rrd with Some rrdi -> rrdi.rrd | None -> failwith "No host RRD available") + ) in + Http_svr.headers s (Http.http_200_ok ~version:"1.0" ~keep_alive:false ()); + Rrd.to_fd ~json:(List.mem_assoc "json" query) rrd s end) @@ -595,6 +568,7 @@ let get_host_stats ?(json=false) start interval cfopt host uuid = Rrd.export ~json prefixandrrds start interval cfopt) let handler_rrd_updates (req: Http.request) s = + (* This is commonly-called: not worth logging *) let query = req.Http.query in req.Http.close <- true; Xapi_http.with_context ~dummy:true "Obtaining the Host RRD statistics" req s @@ -614,7 +588,7 @@ let handler_rrd_updates (req: Http.request) s = let headers = (Http.http_200_ok_with_content (Int64.of_int (String.length xml)) - ~version:"HTTP/1.1" ~keep_alive:false ()) + ~version:"1.1" ~keep_alive:false ()) in let headers = if json then headers else (headers@["Content-Type: text/xml"]) in Http_svr.headers s headers; @@ -789,7 +763,7 @@ let update_rrds ~__context timestamp dss uuids pifs rebooting_vms paused_vms = ) in - List.iter (fun (uuid,rrd) -> debug "Sending back RRD for VM uuid=%s" uuid; archive_rrd uuid (Rrd.to_string rrd.rrd)) to_send_back + List.iter (fun (uuid,rrd) -> debug "Sending back RRD for VM uuid=%s" uuid; archive_rrd uuid rrd.rrd) to_send_back let query_possible_dss rrdi = diff --git a/ocaml/xapi/rrd.ml b/ocaml/xapi/rrd.ml index aaac120a..3acc4102 100644 --- a/ocaml/xapi/rrd.ml +++ b/ocaml/xapi/rrd.ml @@ -21,6 +21,7 @@ *) open Listext +open Pervasiveext (* We're not currently printing any debug data in this module. This is commented out so that we can link a standalone binary with this without bringing in logs @@ -92,7 +93,6 @@ type rra = { mutable rra_updatehook : (rrd -> int -> unit) option; (** Hook that gets called when an update happens *) } - (** DS - a data source This defines how we deal with incoming data. Type is one of: @@ -123,6 +123,44 @@ and rrd = { rrd_rras: rra array; } +let copy_cdp_prep x = +{ + cdp_value = x.cdp_value; + cdp_unknown_pdps = x.cdp_unknown_pdps; +} + +let copy_rra x = +{ + rra_cf = x.rra_cf; + rra_row_cnt = x.rra_row_cnt; + rra_pdp_cnt = x.rra_pdp_cnt; + rra_xff = x.rra_xff; + rra_data = Array.map Fring.copy x.rra_data; + rra_cdps = Array.map copy_cdp_prep x.rra_cdps; + rra_updatehook = x.rra_updatehook +} + + +let copy_ds x = +{ + ds_name = x.ds_name; (* not mutable *) + ds_ty = x.ds_ty; + ds_min = x.ds_min; + ds_max = x.ds_max; + ds_mrhb = x.ds_mrhb; + ds_last = x.ds_last; + ds_value = x.ds_value; + ds_unknown_sec = x.ds_unknown_sec; +} + +let copy_rrd x = +{ + last_updated = x.last_updated; + timestep = x.timestep; + rrd_dss = Array.map copy_ds x.rrd_dss; + rrd_rras = Array.map copy_rra x.rrd_rras; +} + (** Helper function to get the start time and age of the current/last PDP *) let get_times time timestep = let starttime = Int64.mul timestep (Int64.div (Int64.of_float time) timestep) in @@ -704,16 +742,28 @@ let from_xml input = inner rrd n) rrd removals_required | _ -> failwith "Bad xml!" -let to_xml output rrd = - let tag n next () = +(** Repeatedly call [f string] where [string] contains a fragment of the RRD XML *) +let xml_to_fd rrd fd = + (* We use an output channel for Xmlm-compat buffered output. Provided we flush + at the end we should be safe. *) + let with_out_channel_output fd f = + let oc = Unix.out_channel_of_descr fd in + finally + (fun () -> + let output = Xmlm.make_output (`Channel oc) in + f output + ) + (fun () -> flush oc) in + + let tag n next output = Xmlm.output output (`El_start (("",n),[])); - List.iter (fun x -> x ()) next; + List.iter (fun x -> x output) next; Xmlm.output output (`El_end) in - let tags n next () = - List.iter (fun next -> tag n next ()) next + let tags n next output = + List.iter (fun next -> tag n next output) next in - let data dat () = Xmlm.output output (`Data dat) in + let data dat output = Xmlm.output output (`Data dat) in let do_dss ds_list = [tags "ds" (List.map (fun ds -> [ @@ -758,17 +808,22 @@ let to_xml output rrd = tag "database" (do_database rra.rra_data)]) rra_list)] in - Xmlm.output output (`Dtd None); - tag "rrd" - (List.concat - [[tag "version" [data "0003"]; - tag "step" [data (Int64.to_string rrd.timestep)]; - tag "lastupdate" [data (Printf.sprintf "%Ld" (Int64.of_float (rrd.last_updated)))]]; - do_dss (Array.to_list rrd.rrd_dss); - do_rras (Array.to_list rrd.rrd_rras); - ]) () - -let to_json rrd = + with_out_channel_output fd + (fun output -> + Xmlm.output output (`Dtd None); + tag "rrd" + (List.concat + [[tag "version" [data "0003"]; + tag "step" [data (Int64.to_string rrd.timestep)]; + tag "lastupdate" [data (Printf.sprintf "%Ld" (Int64.of_float (rrd.last_updated)))]]; + do_dss (Array.to_list rrd.rrd_dss); + do_rras (Array.to_list rrd.rrd_rras); + ]) output + ) + +let json_to_fd rrd fd = + let write x = if Unix.write fd x 0 (String.length x) <> String.length x then failwith "json_to_fd: short write" in + let do_dss ds_list = "ds:["^(String.concat "," (List.map (fun ds -> "{name:\""^ds.ds_name^"\",type:\""^(match ds.ds_ty with Gauge -> "GAUGE" | Absolute -> "ABSOLUTE" | Derive -> "DERIVE")^ @@ -805,18 +860,36 @@ let to_json rrd = "},database:"^(do_database rra.rra_data)) rra_list))^"}]" in - "{version: \"0003\",step:"^(Int64.to_string rrd.timestep)^",lastupdate:"^ - (f_to_s rrd.last_updated)^","^(do_dss (Array.to_list rrd.rrd_dss))^","^ - do_rras (Array.to_list rrd.rrd_rras)^"}" + write "{version: \"0003\",step:"; + write (Int64.to_string rrd.timestep); + write ",lastupdate:"; + write (f_to_s rrd.last_updated); + write ","; + write (do_dss (Array.to_list rrd.rrd_dss)); + write ","; + write (do_rras (Array.to_list rrd.rrd_rras)^"}") (* XXX need to split this *) + +let iter_to_string_list f x = + let acc = ref [] in + f x (fun string -> acc := string :: !acc); + List.rev !acc + +(* +(* XXX: we copy and return to avoid holding locks: this is why we aren't exposing + an iter/fold interface here. It would be better to copy the original (compact) + data then top copy the expanded version. *) +let to_bigbuffer ?(json=false) rrd = + let b = Bigbuffer.make () in + begin + if json + then Bigbuffer.append_string b (to_json rrd) (* XXX: might be too big *) + else iter_over_xml rrd (Bigbuffer.append_string b) + end; + b +*) + +let to_fd ?(json=false) rrd fd = (if json then json_to_fd else xml_to_fd) rrd fd -let to_string ?(json=false) rrd = - if json then - to_json rrd - else - (let buffer = Buffer.create 10 in - let output = Xmlm.make_output (`Buffer buffer) in - to_xml output rrd; - Buffer.contents buffer) (** WARNING WARNING: Do not call the following function from within xapi! *) let text_export rrd grouping = diff --git a/ocaml/xapi/xapi_blob.ml b/ocaml/xapi/xapi_blob.ml index 0a7b08be..164fd937 100644 --- a/ocaml/xapi/xapi_blob.ml +++ b/ocaml/xapi/xapi_blob.ml @@ -67,7 +67,7 @@ let handler (req: Http.request) s = let ifd = Unix.openfile path [Unix.O_RDONLY] 0o600 in let size = (Unix.LargeFile.stat path).Unix.LargeFile.st_size in Http_svr.headers s ((Http.http_200_ok_with_content - size ~version:"HTTP/1.1" ~keep_alive:false ()) + size ~version:"1.1" ~keep_alive:false ()) @ ["Content-Type: "^(Db.Blob.get_mime_type ~__context ~self)]); ignore(Pervasiveext.finally (fun () -> Unixext.copy_file ifd s) diff --git a/ocaml/xapi/xapi_http.ml b/ocaml/xapi/xapi_http.ml index 5e48ed46..2676412e 100644 --- a/ocaml/xapi/xapi_http.ml +++ b/ocaml/xapi/xapi_http.ml @@ -214,6 +214,8 @@ let with_context ?(dummy=false) label (req: request) (s: Unix.file_descr) f = (* Other exceptions are dealt with by the Http_svr module's exception handler *) let http_request = Http.http_request ~user_agent:Xapi_globs.xapi_user_agent +let http_request_request = Http.http_request_request ~user_agent:Xapi_globs.xapi_user_agent + let svr_bind = Http_svr.bind ~listen_backlog:Xapi_globs.listen_backlog let add_handler (name, handler) = diff --git a/ocaml/xapi/xapi_message.ml b/ocaml/xapi/xapi_message.ml index 00af3d03..6438a82e 100644 --- a/ocaml/xapi/xapi_message.ml +++ b/ocaml/xapi/xapi_message.ml @@ -434,5 +434,5 @@ let handler (req: Http.request) (bio: Buf_io.t) = let body = "" ^ body in Http_svr.headers s ((Http.http_200_ok_with_content (Int64.of_int (String.length body)) - ~version:"HTTP/1.1" ~keep_alive:false ())@["Content-Type: application/rss+xml"]); + ~version:"1.1" ~keep_alive:false ())@["Content-Type: application/rss+xml"]); ignore(Unix.write s body 0 (String.length body)))