open Pervasiveext
open Stringext
open Listext
-open Zerocheck
let ( +* ) = Int64.add
let ( -* ) = Int64.sub
let quantum = 16384 in
((x + quantum + quantum - 1) / quantum) * quantum
-(** The copying routine has inputs and outputs which both look like a
- Unix file-descriptor *)
-module type IO = sig
- type t
- val op: t -> int64 -> substring -> unit
+(** The copying routine can operate on anything which looks like a file-descriptor/Stream *)
+module type Stream = sig
+ type stream
+ val read: stream -> int64 -> string -> int -> int -> unit
+ val write: stream -> int64 -> string -> int -> int -> int
end
-
-(* [partition_into_blocks (s, len) skip f initial] applies contiguous (start, length) pairs to
+
+(* [fold_over_blocks (s, len) skip f initial] applies contiguous (start, length) pairs to
[f] starting at [s] up to maximum length [len] where each pair is as large as possible
up to [skip]. *)
-let partition_into_blocks (s, len) skip f initial =
+let fold_over_blocks (s, len) skip f initial =
let rec inner offset acc =
if offset = s +* len then acc
else
}
(** Perform the data duplication ("DD") *)
-module DD(Input : IO)(Output : IO) = struct
- let fold bat sparse input_op blocksize size f initial =
- let buf = String.create (Int64.to_int blocksize) in
- let do_block acc (offset, this_chunk) =
- input_op offset { buf = buf; offset = 0; len = Int64.to_int this_chunk };
- begin match sparse with
- | Some zero -> fold_over_nonzeros buf (Int64.to_int this_chunk) roundup (f offset) acc
- | None -> f offset acc { buf = buf; offset = 0; len = Int64.to_int this_chunk }
- end in
- (* For each entry from the BAT, copy it as a sequence of sub-blocks *)
- Bat.fold_left (fun acc b -> partition_into_blocks b blocksize do_block acc) initial bat
-
+module DD(S : Stream) = struct
(** [copy progress_cb bat sparse src dst size] copies blocks of data from [src] to [dst]
where [bat] represents the allocated / dirty blocks in [src];
- where if prezeroed is true it means do scan for and skip over blocks of \000
+ where if sparse is None it means don't scan for and skip over blocks of zeroes in [src]
+ where if sparse is (Some c) it means do scan for and skip over blocks of 'c' in [src]
while calling [progress_cb] frequently to report the fraction complete
*)
- let copy progress_cb bat prezeroed src dst blocksize size =
- (* If [prezeroed] then nothing needs wiping; otherwise we wipe not(bat) *)
- let empty = Bat.of_list [] and full = Bat.of_list [0L, size] in
- let bat = Opt.default full bat in
- let bat' = if prezeroed then empty else Bat.difference full bat in
- let sizeof bat = Bat.fold_left (fun total (_, size) -> total +* size) 0L bat in
- let total_work = sizeof bat +* (sizeof bat') in
- let stats = { writes = 0; bytes = 0L } in
- let with_stats f offset stats substr =
- f offset stats substr;
- let stats' = { writes = stats.writes + 1; bytes = stats.bytes +* (Int64.of_int substr.len) } in
- progress_cb (Int64.to_float stats'.bytes /. (Int64.to_float total_work));
- stats' in
- let copy offset stats substr =
- Output.op dst (offset +* (Int64.of_int substr.offset)) substr in
- let input_zero offset { buf = buf; offset = offset; len = len } =
- for i = 0 to len - 1 do
- buf.[offset + i] <- '\000'
- done in
- (* Do any necessary pre-zeroing then do the real work *)
- let sparse = if prezeroed then Some '\000' else None in
- fold bat sparse (Input.op src) blocksize size (with_stats copy)
- (fold bat' sparse input_zero blocksize size (with_stats copy) stats)
+ let copy progress_cb bat sparse (src: S.stream) (dst: S.stream) size =
+ let buf = String.create (Int64.to_int blocksize) in
+ let do_block stats (offset, this_chunk) : stats =
+ progress_cb ((Int64.to_float offset) /. (Int64.to_float size));
+ S.read src offset buf 0 (Int64.to_int this_chunk);
+ let write_extent stats (s, e) =
+ let n = S.write dst (offset +* (Int64.of_int s)) buf s e in
+ if n < e then raise (ShortWrite(s, e, n));
+ { stats with writes = stats.writes + 1; bytes = stats.bytes +* (Int64.of_int n) }
+ in
+ begin match sparse with
+ | Some zero -> Zerocheck.fold_over_nonzeros buf (Int64.to_int this_chunk) roundup write_extent stats
+ | None -> write_extent stats (0, Int64.to_int this_chunk)
+ end in
+ (* For each entry from the BAT, copy it as a sequence of sub-blocks *)
+ Bat.fold_left (fun stats b -> fold_over_blocks b blocksize do_block stats) { writes = 0; bytes = 0L } bat
end
-let blit src srcoff dst dstoff len =
- (* Printf.printf "[%s](%d) -> [%s](%d) %d\n" "?" srcoff "?" dstoff len; *)
- String.blit src srcoff dst dstoff len
-
-module String_reader = struct
- type t = string
- let op str stream_offset { buf = buf; offset = offset; len = len } =
- blit str (Int64.to_int stream_offset) buf offset len
-end
-module String_writer = struct
- type t = string
- let op str stream_offset { buf = buf; offset = offset; len = len } =
- blit buf offset str (Int64.to_int stream_offset) len
+(* Helper function to always return a block of zeroes, like /dev/null *)
+let read_zeroes _ _ buf offset len =
+ for i = 0 to len - 1 do
+ buf.[offset + i] <- '\000'
+ done
+
+(** A Stream interface implemented over strings, useful for testing *)
+module String_stream = struct
+ type stream = string
+ let blit src srcoff dst dstoff len =
+ (* Printf.printf "[%s](%d) -> [%s](%d) %d\n" "?" srcoff "?" dstoff len; *)
+ String.blit src srcoff dst dstoff len
+ let read str stream_offset buf offset len =
+ blit str (Int64.to_int stream_offset) buf offset len
+ let write str stream_offset buf offset len =
+ blit buf offset str (Int64.to_int stream_offset) len;
+ len
end
(** A File interface implemented over open Unix files *)
-module File_reader = struct
- type t = Unix.file_descr
- let op stream stream_offset { buf = buf; offset = offset; len = len } =
+module File_stream = struct
+ type stream = Unix.file_descr
+
+ let read stream stream_offset buf offset len =
Unix.LargeFile.lseek stream stream_offset Unix.SEEK_SET;
Unixext.really_read stream buf offset len
-end
-module File_writer = struct
- type t = Unix.file_descr
- let op stream stream_offset { buf = buf; offset = offset; len = len } =
+ let write stream stream_offset buf offset len =
let newoff = Unix.LargeFile.lseek stream stream_offset Unix.SEEK_SET in
(* Printf.printf "Unix.write buf len %d; offset %d; len %d\n" (String.length buf) offset len; *)
- let n = Unix.write stream buf offset len in
- if n < len
- then raise (ShortWrite(offset, len, n))
-end
-
-(** Marshals data across the network in chunks *)
-module Network_writer = struct
- open Sparse_encoding
- type t = Unix.file_descr
-
- type url = {
- host: string;
- port: int;
- auth: (string * string) option;
- uri: string;
- https: bool;
- }
-
- let url_of_string url =
- let host x = match String.split ':' x with
- | host :: _ -> host
- | _ -> failwith (Printf.sprintf "Failed to parse host: %s" x) in
- let port x = match String.split ':' x with
- | _ :: port :: _ -> Some (int_of_string port)
- | _ -> None in
- let uname_password_host_port x = match String.split '@' x with
- | [ _ ] -> None, host x, port x
- | [ uname_password; host_port ] ->
- begin match String.split ':' uname_password with
- | [ uname; password ] -> Some (uname, password), host host_port, port host_port
- | _ -> failwith (Printf.sprintf "Failed to parse authentication substring: %s" uname_password)
- end
- | _ -> failwith (Printf.sprintf "Failed to parse username password host and port: %s" x) in
- match String.split '/' url with
- | http_or_https :: "" :: x :: uri ->
- let uname_password, host, port = uname_password_host_port x in
- if not(List.mem http_or_https [ "https:"; "http:" ])
- then failwith (Printf.sprintf "Unknown URL scheme: %s" http_or_https);
- let https = String.startswith "https://" url in
- let port = (match port with Some p -> p | None -> if https then 443 else 80) in
- { host = host; port = port; auth = uname_password; uri = "/" ^ (String.concat "/" uri); https = https }
- | _ -> failwith (Printf.sprintf "Failed to parse URL: %s" url)
-
- let open_url url f =
- let with_ssl url f =
- Printf.printf "connecting to %s:%d\n" url.host url.port;
- let stunnel = Stunnel.connect url.host url.port in
- finally
- (fun () -> f stunnel.Stunnel.fd)
- (fun () -> Stunnel.disconnect stunnel) in
- let with_plaintext url f =
- let fd = Unixext.open_connection_fd url.host url.port in
- finally
- (fun () -> f fd)
- (fun () -> Unix.close fd) in
- let uri, query = Http.parse_uri url.uri in
- let request = { Http.m = Http.Put;
- uri = uri;
- query = query;
- version = "1.0";
- transfer_encoding = None;
- content_length = None;
- auth = Opt.map (fun (username, password) -> Http.Basic(username, password)) url.auth;
- cookie = [ "chunked", "true" ];
- task = None; subtask_of = None;
- content_type = None;
- user_agent = Some "sparse_dd/0.1";
- close = true;
- headers = [] } in
- try
- if url.https
- then with_ssl url (fun fd -> Http_client.rpc fd request "" f)
- else with_plaintext url (fun fd -> Http_client.rpc fd request "" f)
- with Http_client.Http_error("401") as e ->
- Printf.printf "HTTP 401 Unauthorized\n";
- raise e
-
- let op stream stream_offset { buf = buf; offset = offset; len = len } =
- let copy = String.create len in
- String.blit buf offset copy 0 len;
- let x = { Chunk.start = stream_offset; data = copy } in
- Chunk.marshal stream x
-
- let close stream = Chunk.marshal stream { Chunk.start = 0L; data = "" }
+ Unix.write stream buf offset len
end
(** An implementation of the DD algorithm over strings *)
-module String_copy = DD(String_reader)(String_writer)
+module String_copy = DD(String_stream)
+
+(** An implementatino of the DD algorithm which copies zeroes into strings *)
+module String_write_zero = DD(struct
+ include String_stream
+ let read = read_zeroes
+end)
-(** An implementation of the DD algorithm over Unix files *)
-module File_copy = DD(File_reader)(File_writer)
+(** An implementatino of the DD algorithm over Unix files *)
+module File_copy = DD(File_stream)
-(** An implementatino of the DD algorithm from Unix files to a Network socket *)
-module Network_copy = DD(File_reader)(Network_writer)
+(** An implementatin of the DD algorithm which copies zeroes into files *)
+module File_write_zero = DD(struct
+ include File_stream
+ let read = read_zeroes
+end)
(** [file_dd ?progress_cb ?size ?bat prezeroed src dst]
If [size] is not given, will assume a plain file and will use st_size from Unix.stat.
If [prezeroed] is false, will first explicitly write zeroes to all blocks not in [bat].
Will then write blocks from [src] into [dst], using the [bat]. If [prezeroed] will additionally
- scan for zeroes within the allocated blocks.
- If [dst] has the format:
- fd:X
- then data is written directly to file descriptor X in a chunked encoding. Otherwise
- it is written directly to the file referenced by [dst].
- *)
+ scan for zeroes within the allocated blocks. *)
let file_dd ?(progress_cb = (fun _ -> ())) ?size ?bat prezeroed src dst =
let size = match size with
| None -> (Unix.LargeFile.stat src).Unix.LargeFile.st_size
| Some x -> x in
let ifd = Unix.openfile src [ Unix.O_RDONLY ] 0o600 in
- if String.startswith "http:" dst || String.startswith "https:" dst then begin
- (* Network copy *)
- Network_writer.open_url (Network_writer.url_of_string dst)
- (fun _ ofd ->
- Printf.printf "\nWriting chunked encoding to fd: %d\n" (Unixext.int_of_file_descr ofd);
- let stats = Network_copy.copy progress_cb bat prezeroed ifd ofd blocksize size in
- Printf.printf "\nSending final chunk\n";
- Network_writer.close ofd;
- Printf.printf "Waiting for connection to close\n";
- (try let tmp = " " in Unixext.really_read ofd tmp 0 1 with End_of_file -> ());
- Printf.printf "Connection closed\n";
- stats)
- end else begin
- let ofd = Unix.openfile dst [ Unix.O_WRONLY; Unix.O_CREAT ] 0o600 in
- (* Make sure the output file has the right size *)
- Unix.LargeFile.lseek ofd (size -* 1L) Unix.SEEK_SET;
- Unix.write ofd "\000" 0 1;
- Unix.LargeFile.lseek ofd 0L Unix.SEEK_SET;
- Printf.printf "Copying\n";
- File_copy.copy progress_cb bat prezeroed ifd ofd blocksize size
- end
+ let ofd = Unix.openfile dst [ Unix.O_WRONLY; Unix.O_CREAT ] 0o600 in
+ (* Make sure the output file has the right size *)
+ Unix.LargeFile.lseek ofd (size -* 1L) Unix.SEEK_SET;
+ Unix.write ofd "\000" 0 1;
+ Unix.LargeFile.lseek ofd 0L Unix.SEEK_SET;
+ let full_bat = Bat.of_list [0L, size] in
+ let empty_bat = Bat.of_list [] in
+ let bat = Opt.default full_bat bat in
+ (* If not prezeroed then:
+ 1. explicitly write zeroes into the complement of the BAT;
+ 2. don't scan and skip zeroes in the source disk *)
+ let bat' = if prezeroed
+ then empty_bat
+ else Bat.difference full_bat bat in
+ let progress_cb_zero, progress_cb_copy =
+ (fun fraction -> progress_cb (0.5 *. fraction)),
+ (fun fraction -> progress_cb (0.5 *. fraction +. 0.5)) in
+ Printf.printf "Wiping\n";
+ File_write_zero.copy progress_cb_zero bat' None ifd ofd size;
+ Printf.printf "Copying\n";
+ File_copy.copy progress_cb_copy bat (if prezeroed then Some '\000' else None) ifd ofd size
(** [make_random size zero nonzero] returns a string (of size [size]) and a BAT. Blocks not in the BAT
are guaranteed to be [zero]. Blocks in the BAT are randomly either [zero] or [nonzero]. *)
let offset' = min size (offset + bs) in
offset', if bit then (offset, offset' - offset) :: acc else acc) (0, []) bits) in
let bat = Bat.of_list (List.map (fun (x, y) -> Int64.of_int x, Int64.of_int y) bat) in
- result, Some bat
+ result, bat
(** [test_dd (input, bat) ignore_bat prezeroed zero nonzero] uses the DD algorithm to make a copy of
the string [input].
*)
let test_dd (input, bat) ignore_bat prezeroed zero nonzero =
let size = String.length input in
- let blocksize = Int64.of_int (size / 100) in
let output = String.make size (if prezeroed then zero else nonzero) in
try
- let stats = String_copy.copy (fun _ -> ()) bat prezeroed input output blocksize (Int64.of_int size) in
+ let full_bat = Bat.of_list [0L, Int64.of_int size] in
+ let empty_bat = Bat.of_list [] in
+ let bat = if ignore_bat then full_bat else bat in
+ (* If not prezeroed then:
+ 1. explicitly write zeroes into the complement of the BAT;
+ 2. don't scan and skip zeroes in the source disk *)
+ let bat' = if prezeroed
+ then empty_bat
+ else Bat.difference full_bat bat in
+ String_write_zero.copy (fun _ -> ()) bat' None input output (Int64.of_int size);
+ let stats = String_copy.copy (fun _ -> ()) bat (if prezeroed then Some zero else None) input output (Int64.of_int size) in
assert (String.compare input output = 0);
stats
with e ->
(** Generates lots of random strings and makes copies with the DD algorithm, checking that the copies are identical *)
let test_lots_of_strings () =
- let n = 1000 and m = 100000 in
+ let n = 1000 and m = 1000 in
let writes = ref 0 and bytes = ref 0L in
for i = 0 to n do
if i mod 100 = 0 then (Printf.printf "i = %d\n" i; flush stdout);
let backend = xs.Xs.read (Printf.sprintf "device/vbd/%d/backend" id) in
let params = xs.Xs.read (Printf.sprintf "%s/params" backend) in
match String.split '/' backend with
- | "" :: "local" :: "domain" :: bedomid :: _ ->
+ | "local" :: "domain" :: bedomid :: _ ->
assert (self = bedomid);
Some params
| _ -> raise Not_found
match Tapctl.of_device (Tapctl.create ()) path with
| _, _, (Some (_, vhd)) -> Some vhd
| _, _, _ -> raise Not_found
- with Tapctl.Not_blktap ->
- Printf.printf "Device %s is not controlled by blktap\n" path;
- None
- | Tapctl.Not_a_device ->
- Printf.printf "%s is not a device\n" path;
- None
- | _ ->
- Printf.printf "Device %s has an unknown driver\n" path;
- None in
- begin match find_underlying_tapdisk path with
- | Some path ->
- begin match tapdisk_of_path path with
- | Some vhd -> Some vhd
+ with Not_found -> None in
+ match tapdisk_of_path path with
+ | Some vhd -> Some vhd
+ | None ->
+ begin match find_underlying_tapdisk path with
+ | Some path ->
+ begin match tapdisk_of_path path with
+ | Some vhd -> Some vhd
+ | None -> None
+ end
| None -> None
end
- | None -> None
- end
-
-let deref_symlinks path =
- let rec inner seen_already path =
- if List.mem path seen_already
- then failwith "Circular symlink";
- let stats = Unix.lstat path in
- if stats.Unix.st_kind = Unix.S_LNK
- then inner (path :: seen_already) (Unix.readlink path)
- else path in
- inner [] path
-
-let with_rdonly_vhd path f =
- let h = Vhd._open path [ Vhd.Open_rdonly ] in
- finally
- (fun () -> f h)
- (fun () -> Vhd.close h)
-
-let parent_of_vhd vhd =
- let vhd' = deref_symlinks vhd in
- let parent = with_rdonly_vhd vhd' Vhd.get_parent in
- (* Make path absolute *)
- if String.length parent > 0 && String.startswith "./" parent
- then Filename.concat (Filename.dirname vhd') parent
- else parent
-
-let rec chain_of_vhd vhd =
- try
- let p = parent_of_vhd vhd in
- vhd :: (chain_of_vhd p)
- with (Failure "Disk is not a differencing disk") -> [ vhd ]
(** Given a vhd filename, return the BAT *)
let bat vhd =
- with_rdonly_vhd vhd
- (fun h ->
+ let h = Vhd._open vhd [ Vhd.Open_rdonly ] in
+ finally
+ (fun () ->
let b = Vhd.get_bat h in
let b' = List.map_tr (fun (s, l) -> 2L ** mib ** (Int64.of_int s), 2L ** mib ** (Int64.of_int l)) b in
Bat.of_list b')
+ (fun () -> Vhd.close h)
(* Record when the binary started for performance measuring *)
let start = Unix.gettimeofday ()
last_percent := new_percent
let _ =
- Stunnel.init_stunnel_path ();
- let base = ref None and src = ref None and dest = ref None and size = ref (-1L) and prezeroed = ref false and test = ref false in
- Arg.parse [ "-base", Arg.String (fun x -> base := Some x), "base disk to search for differences from (default: None)";
- "-src", Arg.String (fun x -> src := Some x), "source disk";
- "-dest", Arg.String (fun x -> dest := Some x), "destination disk";
+ let base = ref "" and src = ref "" and dest = ref "" and size = ref (-1L) and prezeroed = ref false and test = ref false in
+ Arg.parse [ "-base", Arg.Set_string base, "base disk to search for differences from (default: None)";
+ "-src", Arg.Set_string src, "source disk";
+ "-dest", Arg.Set_string dest, "destination disk";
"-size", Arg.String (fun x -> size := Int64.of_string x), "number of bytes to copy";
"-prezeroed", Arg.Set prezeroed, "assume the destination disk has been prezeroed";
"-machine", Arg.Set machine_readable, "emit machine-readable output";
(fun x -> Printf.fprintf stderr "Warning: ignoring unexpected argument %s\n" x)
(String.concat "\n" [ "Usage:";
Printf.sprintf "%s [-base x] [-prezeroed] <-src y> <-dest z> <-size s>" Sys.argv.(0);
- " -- copy <s> bytes from <y> to <z>.";
- " <x> and <y> are always interpreted as filenames. If <z> is a URL then the URL";
- " is opened and encoded chunks of data are written directly to it";
- " otherwise <z> is interpreted as a filename.";
- "";
- " If <-base x> is specified then only copy differences";
+ " -- copy <s> bytes from <y> to <z>. If <-base x> is specified then only copy differences";
" between <x> and <y>. If [-base x] is unspecified and [-prezeroed] is unspecified ";
" then assume the destination must be fully wiped.";
"";
test_lots_of_strings ();
exit 0
end;
- if !src = None || !dest = None || !size = (-1L) then begin
+ if !src = "" || !dest = "" || !size = (-1L) then begin
Printf.fprintf stderr "Must have -src -dest and -size arguments\n";
exit 1;
end;
- let empty = Bat.of_list [] in
- Printf.printf "src = %s; dest = %s; base = %s; size = %Ld\n" (Opt.default "None" !src) (Opt.default "None" !dest) (Opt.default "None" !base) !size;
- let size = Some !size in
-
- (** [chain_of_device device] returns [None] if [device] is None.
- If device is [Some d] then returns [None] if no vhds were detected or [Some chain] *)
- let chain_of_device device =
- let flatten = function
- | Some (Some x) -> Some x
- | Some None -> None
- | None -> None in
- let vhd : string option = flatten (Opt.map vhd_of_device device) in
- let chain : string list option = Opt.map chain_of_vhd vhd in
- let option y = Opt.default "None" (Opt.map (fun x -> "Some " ^ x) y) in
- Printf.printf "%s has chain: [ %s ]\n" (option device) (option (Opt.map (String.concat "; ") chain));
- chain in
-
- let bat : Bat.t option =
- try
- let src_chain = chain_of_device !src in
- let base_chain = chain_of_device !base in
-
- (* If the src_chain is None then we have no BAT information *)
- Opt.map
- (fun s ->
- let b = Opt.default [] base_chain in
- (* We need to copy blocks from: (base - src) + (src - base)
- ie. everything except for blocks from the shared nodes *)
- let unshared = List.set_difference b s @ (List.set_difference s b) in
- List.fold_left Bat.union empty (List.map bat unshared)
- ) src_chain
- with e ->
- Printf.printf "Caught exception: %s while calculating BAT. Ignoring all BAT information\n" (Printexc.to_string e);
- None in
+ let size = Some !size in
+ let src_vhd = vhd_of_device !src and dest_vhd = vhd_of_device !dest in
+ Printf.printf "auto-detect src vhd: %s\n" (Opt.default "None" (Opt.map (fun x -> "Some " ^ x) src_vhd));
+ let src_bat = Opt.map bat src_vhd in
progress_cb 0.;
- let stats = file_dd ~progress_cb ?size ?bat !prezeroed (Opt.unbox !src) (Opt.unbox !dest) in
+ let stats = file_dd ~progress_cb ?size ?bat:src_bat !prezeroed !src !dest in
Printf.printf "Time: %.2f seconds\n" (Unix.gettimeofday () -. start);
Printf.printf "\nNumber of writes: %d\n" stats.writes;
Printf.printf "Number of bytes: %Ld\n" stats.bytes