]> xenbits.xensource.com Git - xenclient/toolstack.git/commitdiff
add ability to receive message from thread with a queue that we actively check.
authorVincent Hanquez <vincent.hanquez@eu.citrix.com>
Thu, 13 Aug 2009 09:55:04 +0000 (10:55 +0100)
committerVincent Hanquez <vincent.hanquez@eu.citrix.com>
Thu, 13 Aug 2009 09:55:04 +0000 (10:55 +0100)
future work will involve beeing wakeup where there's actual events, instead of checking every 200ms.

xenvm/xenvm.ml

index f3f1625ce5dc076334cc03ed040eba2488ee1c8d..8a7143fe9b4b1e0f0c8b10c7e6a30cb2399c6063 100644 (file)
@@ -17,6 +17,7 @@
 open Printf
 open Pervasiveext
 open Stringext
+open Threadext
 open Vmconfig
 open Vmstate
 
@@ -655,16 +656,19 @@ let monitor_rpc_dbus state =
        DBus.Bus.add_match bus match_s false;
        DBus.Connection.flush bus;
 
+       let outgoing_mutex = Mutex.create () in
+       let outgoing = Queue.create () in
+
        let calltask msg msg_method params =
                let xenvmlib_to_dbus rep =
                        match rep with
-                       | Xenvmlib.Ok        -> Some (DBus.Message.new_method_return msg)
+                       | Xenvmlib.Ok        -> DBus.Message.new_method_return msg
                        | Xenvmlib.Msg s     ->
                                let rmsg = DBus.Message.new_method_return msg in
                                DBus.Message.append rmsg [ DBus.String s ];
-                               Some rmsg
-                       | Xenvmlib.Error err -> Some (DBus.Message.new_error msg DBus.ERR_FAILED err)
-                       | _                  -> None
+                               rmsg
+                       | Xenvmlib.Error err -> DBus.Message.new_error msg DBus.ERR_FAILED err
+                       | _                  -> DBus.Message.new_error msg DBus.ERR_FAILED "?"
                        in
                (* if the tasks need to be threaded like start,reboot,.. we returns
                  none to the caller and create a thread that is going to populate a queue
@@ -678,11 +682,11 @@ let monitor_rpc_dbus state =
                                thread_create (fun () ->
                                        let rep = do_task state (act, taskargs) in
                                        let rep = xenvmlib_to_dbus rep in
-                                       ignore rep (* FIXME put in the queue *)
+                                       Mutex.execute outgoing_mutex (fun () -> Queue.push rep outgoing);
                                ) ();
                                None
                        ) else
-                               xenvmlib_to_dbus (do_task state (act, taskargs))
+                               Some (xenvmlib_to_dbus (do_task state (act, taskargs)))
                in
 
        let process_message msg =
@@ -703,14 +707,22 @@ let monitor_rpc_dbus state =
                in
 
        while not state.vm_monitors.monitor_dbus_quit do
-               (* check for outputing work *)
-               DBus.Connection.read_write bus 200;
+               (* check for outgoing work *)
+               let outmsgs = Mutex.execute outgoing_mutex (fun () ->
+                       List.rev (Queue.fold (fun acc x -> x :: acc) [] outgoing)
+               ) in
+               if outmsgs <> [] then (
+                       List.iter (fun out -> let (_: int32) = DBus.Connection.send bus out in ()) outmsgs;
+                       DBus.Connection.flush bus
+               );
+               let still_connected = DBus.Connection.read_write bus 200 in
+               ignore still_connected; (* FIXME we should probably do something more sensible here *)
                let reply =
                        match DBus.Connection.pop_message bus with
                        | None     -> None
                        | Some msg -> process_message msg
                        in
-               maybe (fun reply -> DBus.Connection.send bus reply; DBus.Connection.flush bus) reply
+               maybe (fun reply -> Mutex.execute outgoing_mutex (fun () -> Queue.push reply outgoing)) reply
        done;
        ()