open Printf
open Pervasiveext
open Stringext
+open Threadext
open Vmconfig
open Vmstate
DBus.Bus.add_match bus match_s false;
DBus.Connection.flush bus;
+ let outgoing_mutex = Mutex.create () in
+ let outgoing = Queue.create () in
+
let calltask msg msg_method params =
let xenvmlib_to_dbus rep =
match rep with
- | Xenvmlib.Ok -> Some (DBus.Message.new_method_return msg)
+ | Xenvmlib.Ok -> DBus.Message.new_method_return msg
| Xenvmlib.Msg s ->
let rmsg = DBus.Message.new_method_return msg in
DBus.Message.append rmsg [ DBus.String s ];
- Some rmsg
- | Xenvmlib.Error err -> Some (DBus.Message.new_error msg DBus.ERR_FAILED err)
- | _ -> None
+ rmsg
+ | Xenvmlib.Error err -> DBus.Message.new_error msg DBus.ERR_FAILED err
+ | _ -> DBus.Message.new_error msg DBus.ERR_FAILED "?"
in
(* if the tasks need to be threaded like start,reboot,.. we returns
none to the caller and create a thread that is going to populate a queue
thread_create (fun () ->
let rep = do_task state (act, taskargs) in
let rep = xenvmlib_to_dbus rep in
- ignore rep (* FIXME put in the queue *)
+ Mutex.execute outgoing_mutex (fun () -> Queue.push rep outgoing);
) ();
None
) else
- xenvmlib_to_dbus (do_task state (act, taskargs))
+ Some (xenvmlib_to_dbus (do_task state (act, taskargs)))
in
let process_message msg =
in
while not state.vm_monitors.monitor_dbus_quit do
- (* check for outputing work *)
- DBus.Connection.read_write bus 200;
+ (* check for outgoing work *)
+ let outmsgs = Mutex.execute outgoing_mutex (fun () ->
+ List.rev (Queue.fold (fun acc x -> x :: acc) [] outgoing)
+ ) in
+ if outmsgs <> [] then (
+ List.iter (fun out -> let (_: int32) = DBus.Connection.send bus out in ()) outmsgs;
+ DBus.Connection.flush bus
+ );
+ let still_connected = DBus.Connection.read_write bus 200 in
+ ignore still_connected; (* FIXME we should probably do something more sensible here *)
let reply =
match DBus.Connection.pop_message bus with
| None -> None
| Some msg -> process_message msg
in
- maybe (fun reply -> DBus.Connection.send bus reply; DBus.Connection.flush bus) reply
+ maybe (fun reply -> Mutex.execute outgoing_mutex (fun () -> Queue.push reply outgoing)) reply
done;
()