From 9fe59bca832a42e9e9dd5ccfa34b70fc04520e8a Mon Sep 17 00:00:00 2001 From: Vincent Hanquez Date: Mon, 17 Aug 2009 13:32:53 +0100 Subject: [PATCH] grab bus name org.xen.vm.%uuid and process message correctly. also flush the queue after making the list, instead of queueing them indefinitely. --- xenvm/xenvm.ml | 54 +++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 45 insertions(+), 9 deletions(-) diff --git a/xenvm/xenvm.ml b/xenvm/xenvm.ml index 99a5c1d..845ec47 100644 --- a/xenvm/xenvm.ml +++ b/xenvm/xenvm.ml @@ -642,9 +642,15 @@ let monitor_rpc_json socket state = let monitor_rpc_dbus state = let use_session = state.vm_monitors.monitor_use_dbus_session in - let match_s = sprintf "type='method',interface='org.xen.vm.%s'" (String.replace "-" "_" state.vm_uuid) in + let intf = Printf.sprintf "org.xen.vm.%s" (String.replace "-" "_" state.vm_uuid) in + let match_s = sprintf "type='method',interface='%s'" intf in let bus = DBus.Bus.get (if use_session then DBus.Bus.Session else DBus.Bus.System) in - DBus.Bus.add_match bus match_s false; + let reply = DBus.Bus.request_name bus intf [ DBus.Bus.DoNotQueue ] in + (match reply with + | DBus.Bus.PrimaryOwner -> () + | _ -> failwith (Printf.sprintf "cannot grab dbus intf %s" intf) + ); + (*DBus.Bus.add_match bus match_s false;*) DBus.Connection.flush bus; let outgoing_mutex = Mutex.create () in @@ -666,10 +672,11 @@ let monitor_rpc_dbus state = with the return message when ready *) let t = try Some (Tasks.find msg_method) with exn -> None in match t with - | None -> Some (DBus.Message.new_error msg DBus.ERR_SERVICE_UNKNOWN "no rpc") + | None -> Some (DBus.Message.new_error msg DBus.ERR_SERVICE_UNKNOWN "no rpc") | Some (act, task_descr) -> let taskargs = List.map (fun (k, v) -> (k, Tasks.ValString v)) params in if task_descr.Tasks.need_threading then ( + info "creating thread for handling %s" msg_method; thread_create (fun () -> let rep = do_task state (act, taskargs) in let rep = xenvmlib_to_dbus rep in @@ -680,9 +687,9 @@ let monitor_rpc_dbus state = Some (xenvmlib_to_dbus (do_task state (act, taskargs))) in - let process_message msg = + let process_method_call msg = let params = DBus.Message.get msg in - let msg_method = match DBus.Message.get_member msg with None -> assert false | Some m -> m in + let msg_method = match DBus.Message.get_member msg with None -> "missingmethod" | Some m -> m in match params with | [ DBus.Array DBus.Dicts ((_, _), msg_params) ] -> let params = List.map (fun (k, v) -> @@ -691,31 +698,60 @@ let monitor_rpc_dbus state = | DBus.String key, DBus.Variant (DBus.String value) -> key, value | _ -> assert false (* replace by sensible error *) ) msg_params in - calltask msg msg_method params + (try calltask msg msg_method params + with exn -> + warn "dbus_monitor received exception: %s\n" (Printexc.to_string exn); + Some (DBus.Message.new_error msg DBus.ERR_FAILED "?") + ) | _ -> let err_msg = DBus.Message.new_error msg DBus.ERR_INVALID_ARGS "expecting string method followed by dictionnary" in Some (err_msg) in + let process_message msg = + let ty = DBus.Message.get_type msg in + match ty with + | DBus.Message.Error -> + let error_name = match DBus.Message.get_error_name msg with None -> (-1) | Some x -> (Obj.magic x) in + info "processing error message %d" error_name; + None + | DBus.Message.Method_call -> + info "processing message %s" (DBus.Message.string_of_message_ty ty); + process_method_call msg + | DBus.Message.Signal -> + None + | _ -> + info "unknown dbus message %s" (DBus.Message.string_of_message_ty ty); + let err_msg = DBus.Message.new_error msg DBus.ERR_INVALID_ARGS + "expecting string method followed by dictionnary" in + Some (err_msg) + in while not state.vm_monitors.monitor_dbus_quit do (* check for outgoing work *) let outmsgs = Mutex.execute outgoing_mutex (fun () -> - List.rev (Queue.fold (fun acc x -> x :: acc) [] outgoing) + let m = List.rev (Queue.fold (fun acc x -> x :: acc) [] outgoing) in + Queue.clear outgoing; + m ) in if outmsgs <> [] then ( List.iter (fun out -> let (_: int32) = DBus.Connection.send bus out in ()) outmsgs; DBus.Connection.flush bus ); - let still_connected = DBus.Connection.read_write bus 200 in + let still_connected = DBus.Connection.read_write_dispatch bus 200 in ignore still_connected; (* FIXME we should probably do something more sensible here *) let reply = match DBus.Connection.pop_message bus with | None -> None | Some msg -> process_message msg in - maybe (fun reply -> Mutex.execute outgoing_mutex (fun () -> Queue.push reply outgoing)) reply + maybe (fun reply -> + Mutex.execute outgoing_mutex (fun () -> + Queue.push reply outgoing + ) + ) reply done; + notify_quit state; () (* -- 2.39.5