let step = 5
+let rrd_of_fd fd =
+ let ic = Unix.in_channel_of_descr fd in
+ let input = Xmlm.make_input (`Channel ic) in
+ Rrd.from_xml input
(** Helper function - path is the path to the file _without the extension .gz_ *)
-let read_gzipped_file_with_fallback path =
+let rrd_of_gzip path =
let gz_path = path ^ ".gz" in
let gz_exists = try Unix.stat gz_path; true with _ -> false in
let result = ref "" in
if gz_exists then begin
Unixext.with_file gz_path [ Unix.O_RDONLY ] 0o0
- (fun fd -> Gzip.decompress_passive fd
- (fun fd -> result := Unixext.string_of_fd fd));
- !result
+ (fun fd -> Gzip.decompress_passive fd rrd_of_fd)
end else begin
(* If this fails, let the exception propagate *)
- Unixext.string_of_file path
+ Unixext.with_file path [ Unix.O_RDONLY ] 0 rrd_of_fd
end
let rrd = Rrd.rrd_create dss rras (Int64.of_int step) (Unix.gettimeofday()) in
rrd
-let send_rrd address to_archive uuid body =
+let send_rrd address to_archive uuid rrd =
debug "Sending RRD for object uuid=%s archiving=%b to address: %s" uuid to_archive address;
let pool_secret = !Xapi_globs.pool_secret in
- let length = Int64.of_int (String.length body) in
- let headers = Xapi_http.http_request ~cookie:[ "pool_secret", pool_secret ] ~length
+ let headers = Xapi_http.http_request_request ~cookie:[ "pool_secret", pool_secret ]
Http.Put address (Constants.rrd_put_uri^"?uuid="^uuid^(if to_archive then "&archive=true" else "")) in
let st_proc = Xmlrpcclient.get_reusable_stunnel
~write_to_log:Xmlrpcclient.write_to_log address Xapi_globs.default_ssl_port in
(fun () ->
try
debug "Sending rrd to %s" address;
- let _, _ = Xmlrpcclient.http_rpc_fd fd headers body in
+ Http_client.rpc fd headers "" (fun _ fd -> Rrd.to_fd rrd fd);
debug "Sent"
with e ->
debug "Caught exception: %s" (ExnHelper.string_of_exn e);
(fun () -> Stunnel.disconnect st_proc)
(* never called with the mutex held *)
-let archive_rrd ?(save_stats_locally = Pool_role.is_master ()) uuid body =
+let archive_rrd ?(save_stats_locally = Pool_role.is_master ()) uuid rrd =
debug "Archiving RRD for object uuid=%s %s" uuid (if save_stats_locally then "to local disk" else "to remote master");
if save_stats_locally then begin
try
Unixext.mkdir_safe Xapi_globs.xapi_rrd_location 0o755;
let base_filename = Xapi_globs.xapi_rrd_location ^ "/" ^ uuid in
Unixext.atomic_write_to_file (base_filename ^ ".gz") 0o644
- (fun fd ->
- Gzip.compress fd (fun fd ->
- let len = String.length body in
- let written = Unix.write fd body 0 len in
- if written <> len then (failwith "Short write occured!")));
+ (fun fd -> Gzip.compress fd (Rrd.to_fd rrd));
Unixext.unlink_safe base_filename (* If there's an uncompressed one hanging around, remove it *)
end else begin
debug "No local storage: not persisting RRDs"
(* Stream it to the master to store, or maybe to a host in the migrate case *)
let master_address = Pool_role.get_master_address () in
debug "About to send";
- send_rrd master_address true uuid body
+ send_rrd master_address true uuid rrd
end
(* This is where we add the update hook that updates the metrics classes every
match !host_rrd with
| Some rrdi ->
debug "sending host RRD to master";
- let rrd = Mutex.execute mutex (fun () -> Rrd.to_string rrdi.rrd) in
+ let rrd = Mutex.execute mutex (fun () -> Rrd.copy_rrd rrdi.rrd) in
archive_rrd (Helpers.get_localhost_uuid ()) ~save_stats_locally:false rrd
| None -> ()
Mutex.unlock mutex;
List.iter (fun (uuid,rrd) ->
debug "Backup: saving RRD for VM uuid=%s to local disk" uuid;
- let rrd = Mutex.execute mutex (fun () -> Rrd.to_string rrd) in
+ let rrd = Mutex.execute mutex (fun () -> Rrd.copy_rrd rrd) in
archive_rrd uuid ~save_stats_locally rrd)
vrrds;
match !host_rrd with
| Some rrdi ->
debug "Backup: saving RRD for host to local disk";
- let rrd = Mutex.execute mutex (fun () -> Rrd.to_string rrdi.rrd) in
+ let rrd = Mutex.execute mutex (fun () -> Rrd.copy_rrd rrdi.rrd) in
archive_rrd (Helpers.get_localhost_uuid ()) ~save_stats_locally rrd
| None -> ()
end else begin
Hashtbl.remove vm_rrds vm_uuid;
rrdi)
in
- send_rrd address false vm_uuid (Rrd.to_string rrdi.rrd)
+ send_rrd address false vm_uuid rrdi.rrd
let push_rrd ~__context uuid =
try
let path = Xapi_globs.xapi_rrd_location ^ "/" ^ uuid in
- let body = read_gzipped_file_with_fallback path in
+ let rrd = rrd_of_gzip path in
debug "Pushing RRD for VM uuid=%s" uuid;
let vm = Db.VM.get_by_uuid ~__context ~uuid in
let host = Db.VM.get_resident_on ~__context ~self:vm in
if host = Helpers.get_localhost ~__context then begin
- let input = Xmlm.make_input (`String (0, body)) in
- let rrd = Rrd.from_xml input in
Mutex.execute mutex (fun () -> Hashtbl.replace vm_rrds uuid {rrd=rrd; dss=[]})
end else begin
(* Host might be OpaqueRef:null, in which case we'll fail silently *)
let address = Db.Host.get_address ~__context ~self:host in
- send_rrd address false uuid body
+ send_rrd address false uuid (Rrd.copy_rrd rrd)
end
with _ -> ()
(* Load an RRD from the local filesystem. Will return an RRD or throw an exception. *)
let load_rrd_from_local_filesystem ~__context uuid =
- let path = Xapi_globs.xapi_rrd_location ^ "/" ^ uuid in
- let body = read_gzipped_file_with_fallback path in
debug "Loading RRD from local filesystem for object uuid=%s" uuid;
- let input = Xmlm.make_input (`String (0, body)) in
- Rrd.from_xml input
+ let path = Xapi_globs.xapi_rrd_location ^ "/" ^ uuid in
+ rrd_of_gzip path
(* Fetch an RRD from the master *)
let pull_rrd_from_master ~__context uuid is_host =
with _ -> ()
(** Receive handler, for RRDs being pushed onto us *)
-exception Uuid_not_found
exception Invalid_RRD
let receive_handler (req: Http.request) (bio: Buf_io.t) =
+ debug "Monitor_rrds.receive_handler";
let query = req.Http.query in
req.Http.close <- true;
- debug "RRD receive handler";
+ let fd = Buf_io.fd_of bio in (* fd only used for writing *)
if not(List.mem_assoc "uuid" query) then begin
error "HTTP request for RRD lacked 'uuid' parameter";
- failwith "Bad request"
+ Http_svr.headers fd Http.http_400_badrequest;
+ failwith "Monitor_rrds.receive_handler: Bad request"
end;
- let fd = Buf_io.fd_of bio in (* fd only used for writing *)
Xapi_http.with_context ~dummy:true "Receiving VM rrd" req fd
(fun __context ->
- try
let uuid = List.assoc "uuid" query in
(* Check to see if it's a valid uuid for a host or VM *)
with _ -> begin
try ignore(Db.Host.get_by_uuid ~__context ~uuid); Host
with _ ->
- raise Uuid_not_found
+ Http_svr.headers fd Http.http_404_missing;
+ failwith (Printf.sprintf "Monitor_rrds.receive_handler: UUID %s neither host nor VM" uuid)
end
end
in
+ (* Tell client we're good to receive *)
+ Http_svr.headers fd (Http.http_200_ok ());
(* Now we know what sort of RRD it is, read it in and validate it *)
- let body = Http_svr.read_body ~limit:Xapi_globs.http_limit_max_rrd_size req bio in
-
- let input = Xmlm.make_input (`String (0, body)) in
- let rrd = try Rrd.from_xml input with _ -> raise Invalid_RRD in
+ let rrd = rrd_of_fd fd in
(* By now we know it's a valid RRD *)
let to_archive = List.mem_assoc "archive" query in
end else begin
debug "Receiving RRD for archiving, type=%s"
(match ty with Host -> "Host" | VM uuid -> Printf.sprintf "VM uuid=%s" uuid | _ -> "Unknown");
- archive_rrd uuid body
+ archive_rrd uuid (Rrd.copy_rrd rrd)
end;
- Http_svr.headers fd (Http.http_200_ok ())
- with
- | Invalid_RRD ->
- Http_svr.headers fd (Http.http_400_badrequest)
- | Uuid_not_found ->
- Http_svr.headers fd (Http.http_400_badrequest)
+
)
(** Send handler, for sending out requested RRDs *)
let handler (req: Http.request) s =
+ debug "Monitor_rrds.handler";
let query = req.Http.query in
req.Http.close <- true;
- debug "RRD handler";
if not(List.mem_assoc "ref" query) && not(List.mem_assoc "uuid" query) then begin
error "HTTP request for RRD lacked 'uuid' parameter";
failwith "Bad request"
let uuid = List.assoc "uuid" query in
(* If the uuid is in our hashtbl, we own the data and we'll just respond with it *)
try
- let xml = Mutex.execute mutex (fun () ->
- let rrd = Hashtbl.find vm_rrds uuid in
- Rrd.to_string rrd.rrd)
- in
- Http_svr.headers s
- (Http.http_200_ok_with_content
- (Int64.of_int (String.length xml))
- ~version:"HTTP/1.1" ~keep_alive:false ());
- ignore(Unix.write s xml 0 (String.length xml))
+ let rrd = Mutex.execute mutex (fun () -> Rrd.copy_rrd (Hashtbl.find vm_rrds uuid).rrd) in
+ Http_svr.headers s (Http.http_200_ok ~version:"1.0" ~keep_alive:false ());
+ Rrd.to_fd rrd s
with
| Not_found ->
(* If we're not the master, redirect to the master *)
(* it's off, and we're the master, so unarchive the rrd and
send it off (if it's there) *)
let path = Xapi_globs.xapi_rrd_location ^ "/" ^ uuid in
- let body = read_gzipped_file_with_fallback path in
- Http_svr.headers s (Http.http_200_ok_with_content
- (Int64.of_int (String.length body))
- ~version:"HTTP/1.1" ~keep_alive:false ());
- ignore(Unix.write s body 0 (String.length body))
+ let rrd = rrd_of_gzip path in
+ Http_svr.headers s (Http.http_200_ok ~version:"1.0" ~keep_alive:false ());
+ Rrd.to_fd rrd s
end)
(** Send handler, for sending out requested host RRDs *)
let handler_host (req: Http.request) s =
+ debug "Monitor_rrds.handler_host";
let query = req.Http.query in
req.Http.close <- true;
- debug "RRD host handler";
Xapi_http.with_context ~dummy:true "Obtaining the Host RRD statistics" req s
(fun __context ->
(* This is only used by hosts when booting - not for public use! *)
end else begin
let uuid = List.assoc "uuid" query in
let path = Xapi_globs.xapi_rrd_location ^ "/" ^ uuid in
- let body = read_gzipped_file_with_fallback path in
- Http_svr.headers s (Http.http_200_ok_with_content
- (Int64.of_int (String.length body))
- ~version:"HTTP/1.1" ~keep_alive:false ());
- ignore(Unix.write s body 0 (String.length body))
+ let rrd = rrd_of_gzip path in
+ Http_svr.headers s (Http.http_200_ok ~version:"1.0" ~keep_alive:false ());
+ Rrd.to_fd rrd s;
end
end else begin
- let xml = Mutex.execute mutex
+ let rrd = Mutex.execute mutex
(fun () ->
debug "Received request for Host RRD";
- let rrd = match !host_rrd with Some rrdi -> rrdi.rrd | None -> failwith "No host RRD available" in
- let json = List.mem_assoc "json" query in
- Rrd.to_string rrd ~json) in
- Http_svr.headers s
- (Http.http_200_ok_with_content
- (Int64.of_int (String.length xml))
- ~version:"HTTP/1.1" ~keep_alive:false ());
- ignore(Unix.write s xml 0 (String.length xml))
+ Rrd.copy_rrd (match !host_rrd with Some rrdi -> rrdi.rrd | None -> failwith "No host RRD available")
+ ) in
+ Http_svr.headers s (Http.http_200_ok ~version:"1.0" ~keep_alive:false ());
+ Rrd.to_fd ~json:(List.mem_assoc "json" query) rrd s
end)
Rrd.export ~json prefixandrrds start interval cfopt)
let handler_rrd_updates (req: Http.request) s =
+ (* This is commonly-called: not worth logging *)
let query = req.Http.query in
req.Http.close <- true;
Xapi_http.with_context ~dummy:true "Obtaining the Host RRD statistics" req s
let headers =
(Http.http_200_ok_with_content
(Int64.of_int (String.length xml))
- ~version:"HTTP/1.1" ~keep_alive:false ())
+ ~version:"1.1" ~keep_alive:false ())
in
let headers = if json then headers else (headers@["Content-Type: text/xml"]) in
Http_svr.headers s headers;
)
in
- List.iter (fun (uuid,rrd) -> debug "Sending back RRD for VM uuid=%s" uuid; archive_rrd uuid (Rrd.to_string rrd.rrd)) to_send_back
+ List.iter (fun (uuid,rrd) -> debug "Sending back RRD for VM uuid=%s" uuid; archive_rrd uuid rrd.rrd) to_send_back
let query_possible_dss rrdi =
*)
open Listext
+open Pervasiveext
(* We're not currently printing any debug data in this module. This is commented out
so that we can link a standalone binary with this without bringing in logs
mutable rra_updatehook : (rrd -> int -> unit) option; (** Hook that gets called when an update happens *)
}
-
(** DS - a data source
This defines how we deal with incoming data. Type is one of:
rrd_rras: rra array;
}
+let copy_cdp_prep x =
+{
+ cdp_value = x.cdp_value;
+ cdp_unknown_pdps = x.cdp_unknown_pdps;
+}
+
+let copy_rra x =
+{
+ rra_cf = x.rra_cf;
+ rra_row_cnt = x.rra_row_cnt;
+ rra_pdp_cnt = x.rra_pdp_cnt;
+ rra_xff = x.rra_xff;
+ rra_data = Array.map Fring.copy x.rra_data;
+ rra_cdps = Array.map copy_cdp_prep x.rra_cdps;
+ rra_updatehook = x.rra_updatehook
+}
+
+
+let copy_ds x =
+{
+ ds_name = x.ds_name; (* not mutable *)
+ ds_ty = x.ds_ty;
+ ds_min = x.ds_min;
+ ds_max = x.ds_max;
+ ds_mrhb = x.ds_mrhb;
+ ds_last = x.ds_last;
+ ds_value = x.ds_value;
+ ds_unknown_sec = x.ds_unknown_sec;
+}
+
+let copy_rrd x =
+{
+ last_updated = x.last_updated;
+ timestep = x.timestep;
+ rrd_dss = Array.map copy_ds x.rrd_dss;
+ rrd_rras = Array.map copy_rra x.rrd_rras;
+}
+
(** Helper function to get the start time and age of the current/last PDP *)
let get_times time timestep =
let starttime = Int64.mul timestep (Int64.div (Int64.of_float time) timestep) in
inner rrd n) rrd removals_required
| _ -> failwith "Bad xml!"
-let to_xml output rrd =
- let tag n next () =
+(** Repeatedly call [f string] where [string] contains a fragment of the RRD XML *)
+let xml_to_fd rrd fd =
+ (* We use an output channel for Xmlm-compat buffered output. Provided we flush
+ at the end we should be safe. *)
+ let with_out_channel_output fd f =
+ let oc = Unix.out_channel_of_descr fd in
+ finally
+ (fun () ->
+ let output = Xmlm.make_output (`Channel oc) in
+ f output
+ )
+ (fun () -> flush oc) in
+
+ let tag n next output =
Xmlm.output output (`El_start (("",n),[]));
- List.iter (fun x -> x ()) next;
+ List.iter (fun x -> x output) next;
Xmlm.output output (`El_end)
in
- let tags n next () =
- List.iter (fun next -> tag n next ()) next
+ let tags n next output =
+ List.iter (fun next -> tag n next output) next
in
- let data dat () = Xmlm.output output (`Data dat) in
+ let data dat output = Xmlm.output output (`Data dat) in
let do_dss ds_list =
[tags "ds" (List.map (fun ds -> [
tag "database" (do_database rra.rra_data)]) rra_list)]
in
- Xmlm.output output (`Dtd None);
- tag "rrd"
- (List.concat
- [[tag "version" [data "0003"];
- tag "step" [data (Int64.to_string rrd.timestep)];
- tag "lastupdate" [data (Printf.sprintf "%Ld" (Int64.of_float (rrd.last_updated)))]];
- do_dss (Array.to_list rrd.rrd_dss);
- do_rras (Array.to_list rrd.rrd_rras);
- ]) ()
-
-let to_json rrd =
+ with_out_channel_output fd
+ (fun output ->
+ Xmlm.output output (`Dtd None);
+ tag "rrd"
+ (List.concat
+ [[tag "version" [data "0003"];
+ tag "step" [data (Int64.to_string rrd.timestep)];
+ tag "lastupdate" [data (Printf.sprintf "%Ld" (Int64.of_float (rrd.last_updated)))]];
+ do_dss (Array.to_list rrd.rrd_dss);
+ do_rras (Array.to_list rrd.rrd_rras);
+ ]) output
+ )
+
+let json_to_fd rrd fd =
+ let write x = if Unix.write fd x 0 (String.length x) <> String.length x then failwith "json_to_fd: short write" in
+
let do_dss ds_list =
"ds:["^(String.concat "," (List.map (fun ds ->
"{name:\""^ds.ds_name^"\",type:\""^(match ds.ds_ty with Gauge -> "GAUGE" | Absolute -> "ABSOLUTE" | Derive -> "DERIVE")^
"},database:"^(do_database rra.rra_data)) rra_list))^"}]"
in
- "{version: \"0003\",step:"^(Int64.to_string rrd.timestep)^",lastupdate:"^
- (f_to_s rrd.last_updated)^","^(do_dss (Array.to_list rrd.rrd_dss))^","^
- do_rras (Array.to_list rrd.rrd_rras)^"}"
+ write "{version: \"0003\",step:";
+ write (Int64.to_string rrd.timestep);
+ write ",lastupdate:";
+ write (f_to_s rrd.last_updated);
+ write ",";
+ write (do_dss (Array.to_list rrd.rrd_dss));
+ write ",";
+ write (do_rras (Array.to_list rrd.rrd_rras)^"}") (* XXX need to split this *)
+
+let iter_to_string_list f x =
+ let acc = ref [] in
+ f x (fun string -> acc := string :: !acc);
+ List.rev !acc
+
+(*
+(* XXX: we copy and return to avoid holding locks: this is why we aren't exposing
+ an iter/fold interface here. It would be better to copy the original (compact)
+ data then top copy the expanded version. *)
+let to_bigbuffer ?(json=false) rrd =
+ let b = Bigbuffer.make () in
+ begin
+ if json
+ then Bigbuffer.append_string b (to_json rrd) (* XXX: might be too big *)
+ else iter_over_xml rrd (Bigbuffer.append_string b)
+ end;
+ b
+*)
+
+let to_fd ?(json=false) rrd fd = (if json then json_to_fd else xml_to_fd) rrd fd
-let to_string ?(json=false) rrd =
- if json then
- to_json rrd
- else
- (let buffer = Buffer.create 10 in
- let output = Xmlm.make_output (`Buffer buffer) in
- to_xml output rrd;
- Buffer.contents buffer)
(** WARNING WARNING: Do not call the following function from within xapi! *)
let text_export rrd grouping =