]> xenbits.xensource.com Git - xcp/xen-api.git/commitdiff
CA-43021: hook in 'sparse_dd' for improved VM.copy performance
authorDavid Scott <dave.scott@eu.citrix.com>
Mon, 23 Aug 2010 12:16:56 +0000 (13:16 +0100)
committerDavid Scott <dave.scott@eu.citrix.com>
Mon, 23 Aug 2010 12:16:56 +0000 (13:16 +0100)
On local LVHD, VM.copies of freshly installed guests are much quicker:

Guest        Previous VM.copy time New VM.copy time   Speedup
----------------------------------------------------------------
Debian Lenny   2:11                     1:18               40%
Windows 7      14:18                    7:57               44%

Signed-off-by: David Scott <dave.scott@eu.citrix.com>
ocaml/xapi/OMakefile
ocaml/xapi/import_raw_vdi.ml
ocaml/xapi/sm_fs_ops.ml
ocaml/xapi/sm_fs_ops.mli

index 1d281e731b26272e7f6d204effc04c6713584cc9..6cfcb3676625aecf6fe60105e2f4b896a08e1d3c 100644 (file)
@@ -135,6 +135,7 @@ XAPI_MODULES = $(COMMON) \
        config_file_io \
        slave_backup \
        sm_fs_ops \
+       sparse_dd_wrapper \
        vmopshelpers \
        vm_config \
        vmops \
index 0c2c615ccd375721a26b00ddcf82f1df288999b0..dfe64523a9d8fa158b2e1bcbbb0c1951c88b8251 100644 (file)
@@ -61,11 +61,8 @@ let localhost_handler rpc session_id (req: request) (s: Unix.file_descr) =
            
                Server_helpers.exec_with_new_task "VDI.import" 
                (fun __context -> 
-                Sm_fs_ops.with_block_attached_device __context rpc session_id vdi `RW
-                  (fun device ->
-                     let fd = Unix.openfile device  [ Unix.O_WRONLY ] 0 in
-                     finally 
-                       (fun () -> 
+                Sm_fs_ops.with_open_block_attached_device __context rpc session_id vdi `RW
+                  (fun fd ->
                           try
                             if chunked
                             then receive_chunks s fd
@@ -73,8 +70,6 @@ let localhost_handler rpc session_id (req: request) (s: Unix.file_descr) =
                             Unixext.fsync fd;
                           with Unix.Unix_error(Unix.EIO, _, _) ->
                             raise (Api_errors.Server_error (Api_errors.vdi_io_error, ["Device I/O errors"]))
-                       )
-                       (fun () -> Unix.close fd)
                   )
            );
            TaskHelper.complete ~__context [];
index 7d2f8400817291b170e177ed168c7b04af2ea9aa..20a4ec12f5ccae258c55972f879719a62a1187f7 100644 (file)
@@ -34,6 +34,19 @@ let with_block_attached_device __context rpc session_id vdi mode f =
        let vbd = List.hd vbds in
        f ("/dev/" ^ (Db.VBD.get_device ~__context ~self:vbd)))
 
+(** Block-attach a VDI to dom0, open the device and pass the file descriptor to [f] *)
+let with_open_block_attached_device __context rpc session_id vdi mode f = 
+       with_block_attached_device __context rpc session_id vdi mode
+       (fun path ->
+               let mode' = match mode with
+               | `RO -> [ Unix.O_RDONLY ]
+               | `RW -> [ Unix.O_RDWR ] in
+               let fd = Unix.openfile path mode' 0 in
+               finally
+               (fun () -> f fd)
+               (fun () -> Unix.close fd)
+       )
+
 (** Execute a function with a list of device paths after attaching a bunch of VDIs to dom0 *)
 let with_block_attached_devices (__context: Context.t) rpc (session_id: API.ref_session) (vdis: API.ref_VDI list) mode f = 
   let rec loop acc = function
@@ -41,35 +54,13 @@ let with_block_attached_devices (__context: Context.t) rpc (session_id: API.ref_
     | vdi :: vdis -> with_block_attached_device __context rpc session_id vdi mode (fun path -> loop (path :: acc) vdis) in
   loop [] vdis
 
-(** Open an import_raw_vdi HTTP connection and run [f] with the socket *)
-let with_import_vdi __context rpc session_id vdi f = 
-       let subtask_of = Context.get_task_id __context in
-       Server_helpers.exec_with_new_task "VDI.import" 
-       (fun __context -> 
-               (* Find a suitable host for the SR containing the VDI *)
-               let sr = Db.VDI.get_SR ~__context ~self:vdi in
-               let host = Importexport.find_host_for_sr ~__context sr in
-               let address = Db.Host.get_address ~__context ~self:host in
-               let importtask = Client.Task.create rpc session_id "VDI.import" "" in
-
-                       let headers = Xapi_http.http_request 
-                               ~cookie:(["session_id", Ref.string_of session_id;
-                                         "task_id", Ref.string_of importtask;
-                                         "vdi", Ref.string_of vdi;
-                                         "chunked", "true"])
-                               Http.Put address Constants.import_raw_vdi_uri in
-                       let writer _ _ sock = f sock; true in
-                       if not (Xmlrpcclient.do_secure_http_rpc ~use_stunnel_cache:false 
-                               ~task_id:(Ref.string_of (Context.get_task_id __context))
-                               ~host:address ~port:Xapi_globs.default_ssl_port ~headers ~body:"" writer)
-                       then failwith "with_import_vdi";
-                       debug "Waiting for import task (%s) to complete" (Ref.string_of importtask);
-                       (* wait for the task to complete before cleaning anything up *)
-                       while Client.Task.get_status rpc session_id importtask = `pending do
-                               Thread.delay 1.;
-                       done;
-                       Client.Task.destroy rpc session_id importtask;
-       )
+(** Return a URL suitable for passing to the sparse_dd process *)
+let import_vdi_url ~__context rpc session_id vdi = 
+        (* Find a suitable host for the SR containing the VDI *)
+        let sr = Db.VDI.get_SR ~__context ~self:vdi in
+        let host = Importexport.find_host_for_sr ~__context sr in
+        let address = Db.Host.get_address ~__context ~self:host in
+        Printf.sprintf "https://%s%s?vdi=%s&session_id=%s" address Constants.import_raw_vdi_uri (Ref.string_of vdi) (Ref.string_of session_id)
 
 (** Catch those smint exceptions and convert into meaningful internal errors *)
 let with_api_errors f x = 
@@ -167,91 +158,6 @@ let with_new_fs_vdi __context ~name_label ~name_description ~sR ~required_space
              raise e
          )
          
-exception Cancelled
-exception NonZero
-
-(** The copying routine can operate on anything which looks like a file-descriptor/Stream *)
-module type Stream = sig
-       type stream
-       val write: stream -> int64 -> string -> int -> int -> unit
-end
-
-(** Writes directly to a file *)
-module FileStream = struct
-       type stream = Unix.file_descr
-       let write stream stream_offset buf off len =
-               let newoff = Unix.LargeFile.lseek stream stream_offset Unix.SEEK_SET in
-               (* Printf.printf "Unix.write buf len %d; offset %d; len %d\n" (String.length buf) offset len; *)
-               let n = Unix.write stream buf off len in
-               if n < len then failwith "Short write"
-end
-
-(** Marshals data across the network in chunks *)
-module NetworkStream = struct
-       open Sparse_encoding
-       type stream = Unix.file_descr
-       let write stream stream_offset buf off len =
-               let copy = String.create len in
-               String.blit buf off copy 0 len;
-               let x = { Chunk.start = stream_offset; data = copy } in
-               Chunk.marshal stream x
-end
-
-module DD(Output: Stream) = struct
-
-(* dd with sparseness check *)
-let sparse_dd refresh_session ~__context sparse ifd stream size bs : unit =
-  let round v = int_of_float (v *. 50.0) in
-  let update = 
-    let oldvalue = ref (-1.0) in
-    fun value ->  
-      if round value <> round !oldvalue then begin
-       TaskHelper.exn_if_cancelling ~__context;
-       TaskHelper.operate_on_db_task ~__context 
-         (fun self -> 
-           Db.Task.set_progress ~__context ~self ~value);
-      end;
-      oldvalue := value
-  in
-
-  let buf = String.create bs in
-  
-  let allzero s n =
-    try
-      for i=0 to n-1 do
-        if s.[i] <> '\000' then raise NonZero
-      done;
-      true
-    with NonZero -> false
-  in
-
-  let rec do_block offset =
-    refresh_session ();
-
-    update ((Int64.to_float offset) /. (Int64.to_float size));   
-    let remaining = Int64.sub size offset in
-    if remaining=0L 
-    then ()  (* EOF *)
-    else
-      begin
-       let this_chunk = Int64.to_int (min remaining (Int64.of_int bs)) in
-       Unixext.really_read ifd buf 0 this_chunk;
-
-       if not sparse || (not (allzero buf this_chunk))
-       then Output.write stream offset buf 0 this_chunk;
-
-       do_block (Int64.add offset (Int64.of_int this_chunk))
-      end
-  in
-  do_block 0L;
-  Output.write stream 0L "" 0 0; (* end of stream is a zero-sized chunk *)
-  update 1.0
-
-end
-
-module LocalDD = DD(FileStream)
-module RemoteDD = DD(NetworkStream)
-
 (* SCTX-286: thin provisioning is thrown away over VDI.copy, VM.import(VM.export).
    Return true if the newly created vdi must have zeroes written into it; default to false
    under the assumption that "proper" storage devices (ie not our legacy LVM stuff) always 
@@ -286,47 +192,30 @@ let must_write_zeroes_into_new_vdi ~__context vdi =
 let copy_vdi ~__context vdi_src vdi_dst = 
   TaskHelper.set_cancellable ~__context;
   Helpers.call_api_functions ~__context (fun rpc session_id ->
-  let refresh_session = Xapi_session.consider_touching_session rpc session_id in
-
 
   (* Use the sparse copy unless we must write zeroes into the new VDI *)
   let sparse = not (must_write_zeroes_into_new_vdi ~__context vdi_dst) in
 
   (* Copy locally unless this host can't see the destination SR *)
-  let sr_dst = Db.VDI.get_SR ~__context ~self:vdi_dst in
-  let local_copy = Importexport.check_sr_availability ~__context sr_dst in
+  let can_local_copy = Importexport.check_sr_availability ~__context (Db.VDI.get_SR ~__context ~self:vdi_dst) in
 
   let size = Db.VDI.get_virtual_size ~__context ~self:vdi_src in
-  let blocksize = 1024*1024 in
 
-  debug "Sm_fs_ops.copy_vdi: %s-copying %Ld in blocks of %d%s preserving sparseness" (if local_copy then "locally" else "remotely") size blocksize (if sparse then "" else " NOT");
+  let local_copy = can_local_copy && not (Xapi_fist.force_remote_vdi_copy ()) in
 
-  let local_dd = LocalDD.sparse_dd refresh_session ~__context sparse in
-  let remote_dd = RemoteDD.sparse_dd refresh_session ~__context sparse in
+  debug "Sm_fs_ops.copy_vdi: %s-copying %Ld%s preserving sparseness" (if local_copy then "locally" else "remotely") size (if sparse then "" else " NOT");
 
 try
+       let remote_uri = import_vdi_url ~__context rpc session_id vdi_dst in
+       debug "remote_uri = %s" remote_uri;
        with_block_attached_device __context rpc session_id vdi_src `RO
        (fun device_src ->
-               let ifd=Unix.openfile device_src [Unix.O_RDONLY] 0o600 in
-               finally
-               (fun () ->
-                       if local_copy && not (Xapi_fist.force_remote_vdi_copy ())
-                       then with_block_attached_device __context rpc session_id vdi_dst `RW
-                               (fun device_dst ->
-                                       let ofd=Unix.openfile device_dst [Unix.O_WRONLY; Unix.O_SYNC] 0o600 in
-                                       finally
-                                       (fun () ->
-                                               local_dd ifd ofd size blocksize
-                                       )
-                                       (fun () -> Unix.close ofd)
-
-                               )
-                       else with_import_vdi __context rpc session_id vdi_dst
-                               (fun ofd ->
-                                       remote_dd ifd ofd size blocksize
-                               )
+                       if local_copy
+               then with_block_attached_device __context rpc session_id vdi_dst `RW
+               (fun device_dst ->
+                       Sparse_dd_wrapper.dd ~__context sparse device_src device_dst size
                )
-               (fun () -> Unix.close ifd)
+               else Sparse_dd_wrapper.dd ~__context sparse device_src remote_uri size
        )
 with
 | Unix.Unix_error(Unix.EIO, _, _) ->
index 7fd5fab5073d5819e04c58c7aff12986b18e6c58..10755c301fce8f09d0f35bbc9776343bc7e0c57b 100644 (file)
@@ -17,6 +17,7 @@
 
 val with_block_attached_devices :    Context.t -> (XMLRPC.xmlrpc -> XMLRPC.xmlrpc) -> API.ref_session -> API.ref_VDI list -> API.vbd_mode -> (string list -> 'a) -> 'a
 val with_block_attached_device  :    Context.t -> (XMLRPC.xmlrpc -> XMLRPC.xmlrpc) -> API.ref_session -> API.ref_VDI -> API.vbd_mode -> (string -> 'a) -> 'a
+val with_open_block_attached_device  :    Context.t -> (XMLRPC.xmlrpc -> XMLRPC.xmlrpc) -> API.ref_session -> API.ref_VDI -> API.vbd_mode -> (Unix.file_descr -> 'a) -> 'a
 val with_new_fs_vdi : Context.t -> name_label:string -> name_description:string -> sR:API.ref_SR -> required_space:int64 -> _type:API.vdi_type ->
   sm_config:API.string_to_string_map -> (API.ref_VDI -> string -> 'a) -> 'a
 val with_fs_vdi :   Context.t -> API.ref_VDI -> (string -> 'a) -> 'a