open Sparse_encoding
open Unixext
open Pervasiveext
+open Client
let receive_chunks (s: Unix.file_descr) (fd: Unix.file_descr) =
Chunk.fold (fun () -> Chunk.write fd) () s
let vdi_of_req ~__context (req: request) =
+ let all = req.Http.query @ req.Http.cookie in
let vdi =
- if List.mem_assoc "vdi" req.Http.query
- then List.assoc "vdi" req.Http.query
+ if List.mem_assoc "vdi" all
+ then List.assoc "vdi" all
else raise (Failure "Missing vdi query parameter") in
if Db_cache.DBCache.is_valid_ref vdi
then Ref.of_string vdi
req.close <- true;
Xapi_http.with_context "Importing raw VDI" req s
(fun __context ->
+ let all = req.Http.query @ req.Http.cookie in
let vdi = vdi_of_req ~__context req in
- let chunked = List.mem_assoc "chunked" req.Http.query in
- try
- match req.transfer_encoding, req.content_length with
- | Some "chunked", _ ->
+ let chunked = List.mem_assoc "chunked" all in
+ let task_id = Context.get_task_id __context in
+ debug "import_raw_vdi task_id = %s vdi = %s; chunked = %b" (Ref.string_of task_id) (Ref.string_of vdi) chunked;
+ try
+ match req.transfer_encoding with
+ | Some "chunked" ->
error "Chunked encoding not yet implemented in the import code";
Http_svr.headers s http_403_forbidden;
raise (Failure "import code cannot handle chunked encoding")
- | None, Some len ->
+ | None ->
let headers = Http.http_200_ok ~keep_alive:false () @
- [ Http.task_id_hdr ^ ":" ^ (Ref.string_of (Context.get_task_id __context));
+ [ Http.task_id_hdr ^ ":" ^ (Ref.string_of task_id);
content_type ] in
Http_svr.headers s headers;
-
+ Server_helpers.exec_with_new_task "VDI.import"
+ (fun __context ->
Sm_fs_ops.with_block_attached_device __context rpc session_id vdi `RW
(fun device ->
let fd = Unix.openfile device [ Unix.O_WRONLY ] 0 in
try
if chunked
then receive_chunks s fd
- else ignore(Unixext.copy_file ~limit:len s fd);
- Unixext.fsync fd
+ else ignore(Unixext.copy_file ?limit:req.content_length s fd);
+ Unixext.fsync fd;
with Unix.Unix_error(Unix.EIO, _, _) ->
raise (Api_errors.Server_error (Api_errors.vdi_io_error, ["Device I/O errors"]))
)
(fun () -> Unix.close fd)
- );
-
- TaskHelper.complete ~__context []
+ )
+ );
+ TaskHelper.complete ~__context [];
with e ->
+ error "Caught exception: %s" (ExnHelper.string_of_exn e);
+ log_backtrace ();
TaskHelper.failed ~__context (Api_errors.internal_error, ["Caught exception: " ^ (ExnHelper.string_of_exn e)]);
raise e)
| vdi :: vdis -> with_block_attached_device __context rpc session_id vdi mode (fun path -> loop (path :: acc) vdis) in
loop [] vdis
+(** Open an import_raw_vdi HTTP connection and run [f] with the socket *)
+let with_import_vdi __context rpc session_id vdi f =
+ let subtask_of = Context.get_task_id __context in
+ Server_helpers.exec_with_new_task "VDI.import"
+ (fun __context ->
+ (* Find a suitable host for the SR containing the VDI *)
+ let sr = Db.VDI.get_SR ~__context ~self:vdi in
+ let host = Importexport.find_host_for_sr ~__context sr in
+ let address = Db.Host.get_address ~__context ~self:host in
+ let importtask = Client.Task.create rpc session_id "VDI.import" "" in
+
+ let headers = Xapi_http.http_request
+ ~cookie:(["session_id", Ref.string_of session_id;
+ "task_id", Ref.string_of importtask;
+ "vdi", Ref.string_of vdi;
+ "chunked", "true"])
+ Http.Put address Constants.import_raw_vdi_uri in
+ let writer _ _ sock = f sock; true in
+ if not (Xmlrpcclient.do_secure_http_rpc ~use_stunnel_cache:false
+ ~task_id:(Ref.string_of (Context.get_task_id __context))
+ ~host:address ~port:Xapi_globs.default_ssl_port ~headers ~body:"" writer)
+ then failwith "with_import_vdi";
+ debug "Waiting for import task (%s) to complete" (Ref.string_of importtask);
+ (* wait for the task to complete before cleaning anything up *)
+ while Client.Task.get_status rpc session_id importtask = `pending do
+ Thread.delay 1.;
+ done;
+ Client.Task.destroy rpc session_id importtask;
+ )
(** Catch those smint exceptions and convert into meaningful internal errors *)
let with_api_errors f x =
exception Cancelled
exception NonZero
+(** The copying routine can operate on anything which looks like a file-descriptor/Stream *)
+module type Stream = sig
+ type stream
+ val write: stream -> int64 -> string -> int -> int -> unit
+end
+
+(** Writes directly to a file *)
+module FileStream = struct
+ type stream = Unix.file_descr
+ let write stream stream_offset buf off len =
+ let newoff = Unix.LargeFile.lseek stream stream_offset Unix.SEEK_SET in
+ (* Printf.printf "Unix.write buf len %d; offset %d; len %d\n" (String.length buf) offset len; *)
+ let n = Unix.write stream buf off len in
+ if n < len then failwith "Short write"
+end
+
+(** Marshals data across the network in chunks *)
+module NetworkStream = struct
+ open Sparse_encoding
+ type stream = Unix.file_descr
+ let write stream stream_offset buf off len =
+ let copy = String.create len in
+ String.blit buf off copy 0 len;
+ let x = { Chunk.start = stream_offset; data = copy } in
+ Chunk.marshal stream x
+end
+
+module DD(Output: Stream) = struct
+
(* dd with sparseness check *)
-let sparse_dd refresh_session ~__context sparse ifd ofd size bs =
+let sparse_dd refresh_session ~__context sparse ifd stream size bs : unit =
let round v = int_of_float (v *. 50.0) in
let update =
let oldvalue = ref (-1.0) in
begin
let this_chunk = Int64.to_int (min remaining (Int64.of_int bs)) in
Unixext.really_read ifd buf 0 this_chunk;
- begin
- if sparse && (allzero buf this_chunk)
- then
- ignore(Unix.LargeFile.lseek ofd (Int64.of_int this_chunk) Unix.SEEK_CUR)
- else
- let n = Unix.write ofd buf 0 this_chunk in
- (if n<this_chunk then failwith "Error!")
- end;
+
+ if not sparse || (not (allzero buf this_chunk))
+ then Output.write stream offset buf 0 this_chunk;
+
do_block (Int64.add offset (Int64.of_int this_chunk))
end
in
do_block 0L;
+ Output.write stream 0L "" 0 0; (* end of stream is a zero-sized chunk *)
update 1.0
+end
+
+module LocalDD = DD(FileStream)
+module RemoteDD = DD(NetworkStream)
+
(* SCTX-286: thin provisioning is thrown away over VDI.copy, VM.import(VM.export).
Return true if the newly created vdi must have zeroes written into it; default to false
under the assumption that "proper" storage devices (ie not our legacy LVM stuff) always
(* Use the sparse copy unless we must write zeroes into the new VDI *)
let sparse = not (must_write_zeroes_into_new_vdi ~__context vdi_dst) in
+ (* Copy locally unless this host can't see the destination SR *)
+ let sr_dst = Db.VDI.get_SR ~__context ~self:vdi_dst in
+ let local_copy = Importexport.check_sr_availability ~__context sr_dst in
+
let size = Db.VDI.get_virtual_size ~__context ~self:vdi_src in
let blocksize = 1024*1024 in
- debug "Sm_fs_ops.copy_vdi: copying %Ld in blocks of %d%s preserving sparseness" size blocksize (if sparse then "" else " NOT");
-
- let dd = sparse_dd refresh_session ~__context sparse in
-
- with_block_attached_device __context rpc session_id vdi_src `RO
- (fun device_src ->
- with_block_attached_device __context rpc session_id vdi_dst `RW
- (fun device_dst ->
- let ifd=Unix.openfile device_src [Unix.O_RDONLY] 0o600
- and ofd=Unix.openfile device_dst [Unix.O_WRONLY; Unix.O_SYNC] 0o600 in
- finally
- (fun () ->
- try
- dd ifd ofd size blocksize;
- with
- | Unix.Unix_error(Unix.EIO, _, _) ->
- raise (Api_errors.Server_error (Api_errors.vdi_io_error, ["Device I/O error"]))
- | e ->
- debug "Caught exception %s" (ExnHelper.string_of_exn e);
- log_backtrace ())
- (fun () ->
- Unix.close ifd;
- Unix.close ofd)
- )
- )
- )
+ debug "Sm_fs_ops.copy_vdi: %s-copying %Ld in blocks of %d%s preserving sparseness" (if local_copy then "locally" else "remotely") size blocksize (if sparse then "" else " NOT");
+ let local_dd = LocalDD.sparse_dd refresh_session ~__context sparse in
+ let remote_dd = RemoteDD.sparse_dd refresh_session ~__context sparse in
+
+try
+ with_block_attached_device __context rpc session_id vdi_src `RO
+ (fun device_src ->
+ let ifd=Unix.openfile device_src [Unix.O_RDONLY] 0o600 in
+ finally
+ (fun () ->
+ if local_copy
+ then with_block_attached_device __context rpc session_id vdi_dst `RW
+ (fun device_dst ->
+ let ofd=Unix.openfile device_dst [Unix.O_WRONLY; Unix.O_SYNC] 0o600 in
+ finally
+ (fun () ->
+ local_dd ifd ofd size blocksize
+ )
+ (fun () -> Unix.close ofd)
+
+ )
+ else with_import_vdi __context rpc session_id vdi_dst
+ (fun ofd ->
+ remote_dd ifd ofd size blocksize
+ )
+ )
+ (fun () -> Unix.close ifd)
+ )
+with
+| Unix.Unix_error(Unix.EIO, _, _) ->
+ raise (Api_errors.Server_error (Api_errors.vdi_io_error, ["Device I/O error"]))
+| e ->
+ debug "Caught exception %s" (ExnHelper.string_of_exn e);
+ log_backtrace ();
+ raise e
+)