]> xenbits.xensource.com Git - xcp/xen-api.git/commitdiff
CA-39291: Work around firewalls which kill idle TCP connections by inserting a small...
authorDavid Scott <dave.scott@eu.citrix.com>
Fri, 9 Apr 2010 18:57:43 +0000 (19:57 +0100)
committerDavid Scott <dave.scott@eu.citrix.com>
Fri, 9 Apr 2010 18:57:43 +0000 (19:57 +0100)
The failure happens whenever a disk has a lot of zeroes in it: the TCP connection goes idle while the server is scanning for the next non-zero block. Even setting SO_KEEPALIVE on the stunnel sockets and reducing the window probe interval down to 30s didn't fix it. We wish to keep the ability to have a basic client do an export via HTTP GET so we can't add application-level keepalives to the protocol... we must add them to the export itself.

Note this change is backwards compatible. The receiver code expects:
* a common prefix
* a monotonically increasing chunk number
* the first and last blocks to be the same size and included verbatim (even if all zeroes)
* blocks of zeroes the same size as the first block represented as gaps in the increasing chunk number sequence

Therefore including extra files of length 0 in the stream will be ignored provided they
* share the common prefix and chunk numbering scheme
* are not the first or last blocks

Signed-off-by: David Scott <dave.scott@eu.citrix.com>
ocaml/xapi/stream_vdi.ml

index 59f9aa13c91015fcd0684791cf01d16aed16e9e9..46c4920d1076e01c5aec3f40035e6722c5a9313a 100644 (file)
@@ -86,6 +86,27 @@ let made_progress __context progress n =
   end
 
 
+(** Write a block of checksummed data of length [len] with name [filename] to [ofd] *)
+let write_block ~__context filename buffer ofd len = 
+  let hdr = Tar.Header.make filename (Int64.of_int len) in
+
+  try
+       let csum = Sha1sum.sha1sum
+         (fun checksumfd ->
+                  Tar.write_block hdr (fun ofd -> Tar.Archive.multicast_n_string buffer 
+                                                                       [ ofd; checksumfd ] len) ofd
+         ) in
+       (* Write the checksum as a separate file *)
+       let hdr' = Tar.Header.make (filename ^ checksum_extension) (Int64.of_int (String.length csum)) in
+       Tar.write_block hdr' (fun ofd -> ignore(Unix.write ofd csum 0 (String.length csum))) ofd
+  with
+       Unix.Unix_error (a,b,c) as e ->
+               if TaskHelper.is_cancelling ~__context
+               then raise (Api_errors.Server_error (Api_errors.task_cancelled, []))
+               else 
+                 (if b="write" 
+                  then raise (Api_errors.Server_error (Api_errors.client_error, [ExnHelper.string_of_exn e]))
+                  else raise e)
 
 
 (** Stream a set of VDIs split into chunks in a tar format in a defined order. Return an
@@ -95,56 +116,53 @@ let send_all refresh_session ofd ~__context rpc session_id (prefix_vdis: vdi lis
 
   let progress = new_progress_record __context prefix_vdis in
 
+  (* Remember when we last wrote something so that we can work around firewalls which close 'idle' connections *)
+  let last_transmission_time = ref 0. in
+
   let send_one ofd (__context:Context.t) (prefix, vdi_ref, size) = 
     let size = Db.VDI.get_virtual_size ~__context ~self:vdi_ref in
-    let buffer = String.make (Int64.to_int chunk_size) '\000' in
 
     with_open_vdi __context rpc session_id vdi_ref `RO [Unix.O_RDONLY] 0o644
       (fun ifd ->
-        
+
+
         (* NB. It used to be that chunks could be larger than a native int *)
         (* could handle, but this is no longer the case! Ensure all chunks *)
         (* are strictly less than 2^30 bytes *)
         let rec stream_from (chunk_no: int) (offset: int64) = 
           refresh_session ();
-
           let remaining = Int64.sub size offset in
           if remaining > 0L
           then 
             begin
               let this_chunk = (min remaining chunk_size) in
-              let last_chunk = this_chunk=remaining in
+                  let last_chunk = this_chunk = remaining in
               let this_chunk = Int64.to_int this_chunk in
               let filename = Printf.sprintf "%s/%08d" prefix chunk_no in
               let hdr = Tar.Header.make filename (Int64.of_int this_chunk) in
-              Unixext.really_read ifd buffer 0 this_chunk;
-              
-              (* Only write the chunk if it's not all zeros, or if it's the first *)
-              if not (Zerocheck.is_all_zeros buffer this_chunk) || chunk_no=0 || last_chunk then
-                begin
-                  let csum = Sha1sum.sha1sum
-                    (fun checksumfd ->
-                      try
-                        Tar.write_block hdr (fun ofd -> Tar.Archive.multicast_n_string buffer 
-                          [ ofd; checksumfd ] this_chunk) ofd
-                      with
-                          Unix.Unix_error (a,b,c) as e ->
-                            if TaskHelper.is_cancelling ~__context
-                            then raise (Api_errors.Server_error (Api_errors.task_cancelled, []))
-                            else 
-                              (if b="write" 
-                              then raise (Api_errors.Server_error (Api_errors.client_error, [ExnHelper.string_of_exn e]))
-                                else raise e)
-                    ) in       
+
+                  let now = Unix.gettimeofday () in
+                  let time_since_transmission = now -. !last_transmission_time in
                   
-                  (* Write the checksum as a separate file *)
-                  let hdr' = Tar.Header.make (filename ^ checksum_extension) (Int64.of_int (String.length csum)) in
-                  Tar.write_block hdr' (fun ofd -> ignore(Unix.write ofd csum 0 (String.length csum))) ofd
-                end;
-
-              made_progress __context progress (Int64.of_int this_chunk);
-              stream_from (chunk_no + 1) (Int64.add offset chunk_size)
-            end;
+                  (* We always include the first and last blocks *)
+                  let first_or_last = chunk_no = 0 || last_chunk in
+
+                  if time_since_transmission > 5. && not first_or_last then begin
+                        last_transmission_time := now;
+                        write_block ~__context filename "" ofd 0;
+                        (* no progress has been made *)
+                        stream_from (chunk_no + 1) offset
+                  end else begin
+                        let buffer = String.make this_chunk '\000' in
+                        Unixext.really_read ifd buffer 0 this_chunk;
+                        if not (Zerocheck.is_all_zeros buffer this_chunk) || first_or_last then begin
+                          last_transmission_time := now;
+                          write_block ~__context filename buffer ofd this_chunk;
+                        end;
+                        made_progress __context progress (Int64.of_int this_chunk);
+                        stream_from (chunk_no + 1) (Int64.add offset chunk_size);
+              end
+                end
         in
         stream_from 0 0L);
     debug "Finished streaming VDI" in