]> xenbits.xensource.com Git - xenclient/toolstack.git/commitdiff
grab bus name org.xen.vm.%uuid and process message correctly.
authorVincent Hanquez <vincent.hanquez@eu.citrix.com>
Mon, 17 Aug 2009 12:32:53 +0000 (13:32 +0100)
committerVincent Hanquez <vincent.hanquez@eu.citrix.com>
Mon, 17 Aug 2009 12:32:53 +0000 (13:32 +0100)
also flush the queue after making the list, instead of queueing them indefinitely.

xenvm/xenvm.ml

index 99a5c1dffffeb7bc69ec4e9e71d8d3de33dbfbab..845ec47bc0d7b0e150e8814d7234a78ac2d58514 100644 (file)
@@ -642,9 +642,15 @@ let monitor_rpc_json socket state =
 
 let monitor_rpc_dbus state =
        let use_session = state.vm_monitors.monitor_use_dbus_session in
-       let match_s = sprintf "type='method',interface='org.xen.vm.%s'" (String.replace "-" "_" state.vm_uuid) in
+       let intf = Printf.sprintf "org.xen.vm.%s" (String.replace "-" "_" state.vm_uuid) in
+       let match_s = sprintf "type='method',interface='%s'" intf in
        let bus = DBus.Bus.get (if use_session then DBus.Bus.Session else DBus.Bus.System) in
-       DBus.Bus.add_match bus match_s false;
+       let reply = DBus.Bus.request_name bus intf [ DBus.Bus.DoNotQueue ] in
+       (match reply with
+       | DBus.Bus.PrimaryOwner -> ()
+       | _                     -> failwith (Printf.sprintf "cannot grab dbus intf %s" intf)
+       );
+       (*DBus.Bus.add_match bus match_s false;*)
        DBus.Connection.flush bus;
 
        let outgoing_mutex = Mutex.create () in
@@ -666,10 +672,11 @@ let monitor_rpc_dbus state =
                  with the return message when ready *)
                let t = try Some (Tasks.find msg_method) with exn -> None in
                match t with
-               | None   -> Some (DBus.Message.new_error msg DBus.ERR_SERVICE_UNKNOWN "no rpc")
+               | None -> Some (DBus.Message.new_error msg DBus.ERR_SERVICE_UNKNOWN "no rpc")
                | Some (act, task_descr) ->
                        let taskargs = List.map (fun (k, v) -> (k, Tasks.ValString v)) params in
                        if task_descr.Tasks.need_threading then (
+                               info "creating thread for handling %s" msg_method;
                                thread_create (fun () ->
                                        let rep = do_task state (act, taskargs) in
                                        let rep = xenvmlib_to_dbus rep in
@@ -680,9 +687,9 @@ let monitor_rpc_dbus state =
                                Some (xenvmlib_to_dbus (do_task state (act, taskargs)))
                in
 
-       let process_message msg =
+       let process_method_call msg =
                let params = DBus.Message.get msg in
-               let msg_method = match DBus.Message.get_member msg with None -> assert false | Some m -> m in
+               let msg_method = match DBus.Message.get_member msg with None -> "missingmethod" | Some m -> m in
                match params with
                | [ DBus.Array DBus.Dicts ((_, _), msg_params) ] ->
                        let params = List.map (fun (k, v) ->
@@ -691,31 +698,60 @@ let monitor_rpc_dbus state =
                                | DBus.String key, DBus.Variant (DBus.String value) -> key, value
                                | _ -> assert false (* replace by sensible error *)
                        ) msg_params in
-                       calltask msg msg_method params
+                       (try calltask msg msg_method params
+                       with exn ->
+                               warn "dbus_monitor received exception: %s\n" (Printexc.to_string exn);
+                               Some (DBus.Message.new_error msg DBus.ERR_FAILED "?")
+                       )
                | _ ->
                        let err_msg = DBus.Message.new_error msg DBus.ERR_INVALID_ARGS
                                      "expecting string method followed by dictionnary" in
                        Some (err_msg)
                in
+       let process_message msg =
+               let ty = DBus.Message.get_type msg in
+               match ty with
+               | DBus.Message.Error ->
+                       let error_name = match DBus.Message.get_error_name msg with None -> (-1) | Some x -> (Obj.magic x) in
+                       info "processing error message %d" error_name;
+                       None
+               | DBus.Message.Method_call ->
+                       info "processing message %s" (DBus.Message.string_of_message_ty ty);
+                       process_method_call msg
+               | DBus.Message.Signal ->
+                       None
+               | _ ->
+                       info "unknown dbus message %s" (DBus.Message.string_of_message_ty ty);
+                       let err_msg = DBus.Message.new_error msg DBus.ERR_INVALID_ARGS
+                                     "expecting string method followed by dictionnary" in
+                       Some (err_msg)
+               in
 
        while not state.vm_monitors.monitor_dbus_quit do
                (* check for outgoing work *)
                let outmsgs = Mutex.execute outgoing_mutex (fun () ->
-                       List.rev (Queue.fold (fun acc x -> x :: acc) [] outgoing)
+                       let m = List.rev (Queue.fold (fun acc x -> x :: acc) [] outgoing) in
+                       Queue.clear outgoing;
+                       m
                ) in
                if outmsgs <> [] then (
                        List.iter (fun out -> let (_: int32) = DBus.Connection.send bus out in ()) outmsgs;
                        DBus.Connection.flush bus
                );
-               let still_connected = DBus.Connection.read_write bus 200 in
+               let still_connected = DBus.Connection.read_write_dispatch bus 200 in
                ignore still_connected; (* FIXME we should probably do something more sensible here *)
                let reply =
                        match DBus.Connection.pop_message bus with
                        | None     -> None
                        | Some msg -> process_message msg
                        in
-               maybe (fun reply -> Mutex.execute outgoing_mutex (fun () -> Queue.push reply outgoing)) reply
+               maybe (fun reply ->
+                       Mutex.execute outgoing_mutex (fun () ->
+                               Queue.push reply outgoing
+                       )
+               ) reply
        done;
+       notify_quit state;
        ()
 
 (*