end
+(** Write a block of checksummed data of length [len] with name [filename] to [ofd] *)
+let write_block ~__context filename buffer ofd len =
+ let hdr = Tar.Header.make filename (Int64.of_int len) in
+
+ try
+ let csum = Sha1sum.sha1sum
+ (fun checksumfd ->
+ Tar.write_block hdr (fun ofd -> Tar.Archive.multicast_n_string buffer
+ [ ofd; checksumfd ] len) ofd
+ ) in
+ (* Write the checksum as a separate file *)
+ let hdr' = Tar.Header.make (filename ^ checksum_extension) (Int64.of_int (String.length csum)) in
+ Tar.write_block hdr' (fun ofd -> ignore(Unix.write ofd csum 0 (String.length csum))) ofd
+ with
+ Unix.Unix_error (a,b,c) as e ->
+ if TaskHelper.is_cancelling ~__context
+ then raise (Api_errors.Server_error (Api_errors.task_cancelled, []))
+ else
+ (if b="write"
+ then raise (Api_errors.Server_error (Api_errors.client_error, [ExnHelper.string_of_exn e]))
+ else raise e)
(** Stream a set of VDIs split into chunks in a tar format in a defined order. Return an
let progress = new_progress_record __context prefix_vdis in
+ (* Remember when we last wrote something so that we can work around firewalls which close 'idle' connections *)
+ let last_transmission_time = ref 0. in
+
let send_one ofd (__context:Context.t) (prefix, vdi_ref, size) =
let size = Db.VDI.get_virtual_size ~__context ~self:vdi_ref in
- let buffer = String.make (Int64.to_int chunk_size) '\000' in
with_open_vdi __context rpc session_id vdi_ref `RO [Unix.O_RDONLY] 0o644
(fun ifd ->
-
+
+
(* NB. It used to be that chunks could be larger than a native int *)
(* could handle, but this is no longer the case! Ensure all chunks *)
(* are strictly less than 2^30 bytes *)
let rec stream_from (chunk_no: int) (offset: int64) =
refresh_session ();
-
let remaining = Int64.sub size offset in
if remaining > 0L
then
begin
let this_chunk = (min remaining chunk_size) in
- let last_chunk = this_chunk=remaining in
+ let last_chunk = this_chunk = remaining in
let this_chunk = Int64.to_int this_chunk in
let filename = Printf.sprintf "%s/%08d" prefix chunk_no in
let hdr = Tar.Header.make filename (Int64.of_int this_chunk) in
- Unixext.really_read ifd buffer 0 this_chunk;
-
- (* Only write the chunk if it's not all zeros, or if it's the first *)
- if not (Zerocheck.is_all_zeros buffer this_chunk) || chunk_no=0 || last_chunk then
- begin
- let csum = Sha1sum.sha1sum
- (fun checksumfd ->
- try
- Tar.write_block hdr (fun ofd -> Tar.Archive.multicast_n_string buffer
- [ ofd; checksumfd ] this_chunk) ofd
- with
- Unix.Unix_error (a,b,c) as e ->
- if TaskHelper.is_cancelling ~__context
- then raise (Api_errors.Server_error (Api_errors.task_cancelled, []))
- else
- (if b="write"
- then raise (Api_errors.Server_error (Api_errors.client_error, [ExnHelper.string_of_exn e]))
- else raise e)
- ) in
+
+ let now = Unix.gettimeofday () in
+ let time_since_transmission = now -. !last_transmission_time in
- (* Write the checksum as a separate file *)
- let hdr' = Tar.Header.make (filename ^ checksum_extension) (Int64.of_int (String.length csum)) in
- Tar.write_block hdr' (fun ofd -> ignore(Unix.write ofd csum 0 (String.length csum))) ofd
- end;
-
- made_progress __context progress (Int64.of_int this_chunk);
- stream_from (chunk_no + 1) (Int64.add offset chunk_size)
- end;
+ (* We always include the first and last blocks *)
+ let first_or_last = chunk_no = 0 || last_chunk in
+
+ if time_since_transmission > 5. && not first_or_last then begin
+ last_transmission_time := now;
+ write_block ~__context filename "" ofd 0;
+ (* no progress has been made *)
+ stream_from (chunk_no + 1) offset
+ end else begin
+ let buffer = String.make this_chunk '\000' in
+ Unixext.really_read ifd buffer 0 this_chunk;
+ if not (Zerocheck.is_all_zeros buffer this_chunk) || first_or_last then begin
+ last_transmission_time := now;
+ write_block ~__context filename buffer ofd this_chunk;
+ end;
+ made_progress __context progress (Int64.of_int this_chunk);
+ stream_from (chunk_no + 1) (Int64.add offset chunk_size);
+ end
+ end
in
stream_from 0 0L);
debug "Finished streaming VDI" in