From: Vincent Hanquez Date: Thu, 13 Aug 2009 09:55:04 +0000 (+0100) Subject: add ability to receive message from thread with a queue that we actively check. X-Git-Url: http://xenbits.xensource.com/gitweb?a=commitdiff_plain;h=44a42bd3d999e35005ff4652af56c849b0494fe1;p=xenclient%2Ftoolstack.git add ability to receive message from thread with a queue that we actively check. future work will involve beeing wakeup where there's actual events, instead of checking every 200ms. --- diff --git a/xenvm/xenvm.ml b/xenvm/xenvm.ml index f3f1625..8a7143f 100644 --- a/xenvm/xenvm.ml +++ b/xenvm/xenvm.ml @@ -17,6 +17,7 @@ open Printf open Pervasiveext open Stringext +open Threadext open Vmconfig open Vmstate @@ -655,16 +656,19 @@ let monitor_rpc_dbus state = DBus.Bus.add_match bus match_s false; DBus.Connection.flush bus; + let outgoing_mutex = Mutex.create () in + let outgoing = Queue.create () in + let calltask msg msg_method params = let xenvmlib_to_dbus rep = match rep with - | Xenvmlib.Ok -> Some (DBus.Message.new_method_return msg) + | Xenvmlib.Ok -> DBus.Message.new_method_return msg | Xenvmlib.Msg s -> let rmsg = DBus.Message.new_method_return msg in DBus.Message.append rmsg [ DBus.String s ]; - Some rmsg - | Xenvmlib.Error err -> Some (DBus.Message.new_error msg DBus.ERR_FAILED err) - | _ -> None + rmsg + | Xenvmlib.Error err -> DBus.Message.new_error msg DBus.ERR_FAILED err + | _ -> DBus.Message.new_error msg DBus.ERR_FAILED "?" in (* if the tasks need to be threaded like start,reboot,.. we returns none to the caller and create a thread that is going to populate a queue @@ -678,11 +682,11 @@ let monitor_rpc_dbus state = thread_create (fun () -> let rep = do_task state (act, taskargs) in let rep = xenvmlib_to_dbus rep in - ignore rep (* FIXME put in the queue *) + Mutex.execute outgoing_mutex (fun () -> Queue.push rep outgoing); ) (); None ) else - xenvmlib_to_dbus (do_task state (act, taskargs)) + Some (xenvmlib_to_dbus (do_task state (act, taskargs))) in let process_message msg = @@ -703,14 +707,22 @@ let monitor_rpc_dbus state = in while not state.vm_monitors.monitor_dbus_quit do - (* check for outputing work *) - DBus.Connection.read_write bus 200; + (* check for outgoing work *) + let outmsgs = Mutex.execute outgoing_mutex (fun () -> + List.rev (Queue.fold (fun acc x -> x :: acc) [] outgoing) + ) in + if outmsgs <> [] then ( + List.iter (fun out -> let (_: int32) = DBus.Connection.send bus out in ()) outmsgs; + DBus.Connection.flush bus + ); + let still_connected = DBus.Connection.read_write bus 200 in + ignore still_connected; (* FIXME we should probably do something more sensible here *) let reply = match DBus.Connection.pop_message bus with | None -> None | Some msg -> process_message msg in - maybe (fun reply -> DBus.Connection.send bus reply; DBus.Connection.flush bus) reply + maybe (fun reply -> Mutex.execute outgoing_mutex (fun () -> Queue.push reply outgoing)) reply done; ()