From ee17fe2348ba88231dd52eebad80985c84e11913 Mon Sep 17 00:00:00 2001 From: Prashanth Mundkur Date: Thu, 16 Jul 2009 14:17:47 -0700 Subject: [PATCH] [dbus_conn] async dbus connection --- common/dbus_conn.ml | 196 ++++++++++++++++++++++++++++++++++++++++ common/dbus_conn.mli | 31 +++++++ common/test/Makefile | 32 +++++++ common/test/dbus_mon.ml | 76 ++++++++++++++++ 4 files changed, 335 insertions(+) create mode 100644 common/dbus_conn.ml create mode 100644 common/dbus_conn.mli create mode 100644 common/test/Makefile create mode 100644 common/test/dbus_mon.ml diff --git a/common/dbus_conn.ml b/common/dbus_conn.ml new file mode 100644 index 0000000..e1dbdf2 --- /dev/null +++ b/common/dbus_conn.ml @@ -0,0 +1,196 @@ +(* + * Copyright (C) 2009 Citrix Ltd. + * Author Prashanth Mundkur + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + *) + +type t = +{ + ev_loop : Eventloop.t; + ev_handle : Eventloop.handle; + ev_fd : Unix.file_descr; + + bus : DBus.bus; + callbacks : callbacks; + + mutable watches : DBus.watch list; + mutable timeouts : (DBus.timeout * Eventloop.timer) list; + mutable inactive_timeouts : DBus.timeout list; +} + +and callbacks = +{ + msg_received_callback : t -> DBus.message -> unit; + error_callback : t -> Eventloop.error -> unit; +} + +module Conns = Connection_table.Make(struct type conn = t end) + +let send conn msg = + DBus.Connection.send conn.bus msg + +let enable_recv conn = + Eventloop.enable_recv conn.ev_loop conn.ev_handle + +let disable_recv conn = + Eventloop.disable_recv conn.ev_loop conn.ev_handle + +let dispatch conn = + ignore (DBus.Connection.dispatch conn.bus) + +let toggle_watch_callback conn watch = + (* Only handle enables here; disables are handled during event dispatch. *) + let flags = DBus.Watch.get_flags watch in + if List.mem DBus.Watch.Readable flags then + Eventloop.enable_recv conn.ev_loop conn.ev_handle; + if List.mem DBus.Watch.Writable flags then + Eventloop.enable_send conn.ev_loop conn.ev_handle + +let add_watch_callback conn watch = + conn.watches <- watch :: conn.watches; + toggle_watch_callback conn watch; + true + +let remove_watch_callback conn watch = + conn.watches <- List.filter (fun w -> w <> watch) conn.watches + +let remove_timeout_callback conn timeout = + (* Scan both lists to increase robustness (instead + of using the timeout state to select the list). *) + let updated_timeouts = + List.fold_left (fun acc (t, h) -> + if t = timeout then begin + Eventloop.cancel_timer conn.ev_loop h; + acc + end else + (t, h) :: acc + ) [] conn.timeouts + in + conn.timeouts <- updated_timeouts; + conn.inactive_timeouts <- List.filter (fun t -> t <> timeout) conn.inactive_timeouts + +let timeout_handler conn timeout () = + (* We cannot use remove_timeout_callback here since the + eventloop handle is now invalid. *) + let updated_timeouts = + List.fold_left (fun acc (t,h) -> + if t = timeout then acc else (t,h) :: acc + ) [] conn.timeouts + in + conn.timeouts <- updated_timeouts; + remove_timeout_callback conn timeout; + DBus.Timeout.handle timeout + +let add_timeout_callback conn timeout = + if DBus.Timeout.get_enabled timeout then + let expiry = DBus.Timeout.get_interval timeout in + (* Eventloop timers are currently in float seconds, + whereas DBus timeouts are in int milliseconds. *) + let expiry = (float_of_int expiry) /. 1000. in + let th = Eventloop.start_timer conn.ev_loop expiry (timeout_handler conn timeout) in + conn.timeouts <- (timeout, th) :: conn.timeouts + else + conn.inactive_timeouts <- timeout :: conn.inactive_timeouts; + true + +let toggle_timeout_callback conn timeout = + (* There is little need to optimize this, since we can + assume that DBus uses this only when the state of the + timer has changed. *) + remove_timeout_callback conn timeout; + ignore (add_timeout_callback conn timeout) + +let recv_ready_callback el h fd = + let conn = Conns.get_conn h in + (* Since the set of watches might be modified during the + callbacks, we need to check if the watches are still + active. Also, keep track whether a watch was dispatched; + if none was, we need to disable the event. *) + let can_dispatch w = + ((DBus.Watch.get_enabled w) + && (List.mem DBus.Watch.Readable (DBus.Watch.get_flags w))) in + let dispatched = ref false in + let watches = conn.watches in + List.iter (fun w -> + if List.memq w conn.watches && can_dispatch w + then begin + dispatched := true; + DBus.Watch.handle w [ DBus.Watch.Readable ] + end + ) watches; + if not !dispatched then + Eventloop.disable_recv el h + else + dispatch conn + +let send_ready_callback el h fd = + let conn = Conns.get_conn h in + (* Since the set of watches might be modified during the + callbacks, we need to check if the watches are still + active. Also, keep track whether a watch was dispatched; + if none was, we need to disable the event. *) + let can_dispatch w = + ((DBus.Watch.get_enabled w) + && (List.mem DBus.Watch.Writable (DBus.Watch.get_flags w))) in + let dispatched = ref false in + let watches = conn.watches in + List.iter (fun w -> + if List.memq w conn.watches && can_dispatch w + then begin + dispatched := true; + DBus.Watch.handle w [ DBus.Watch.Writable ] + end + ) watches; + if not !dispatched then + Eventloop.disable_send el h + +let error_callback el h err = + let conn = Conns.get_conn h in + conn.callbacks.error_callback conn err + +let db_callbacks = +{ + Eventloop.accept_callback = (fun _ _ _ _ -> assert false); + Eventloop.connect_callback = (fun _ _ -> assert false); + Eventloop.recv_ready_callback = recv_ready_callback; + Eventloop.send_ready_callback = send_ready_callback; + Eventloop.error_callback = error_callback; +} + +let attach bus ev_loop callbacks = + let fd = DBus.Connection.get_fd bus in + let ev_handle = Eventloop.register_conn ev_loop fd ~enable_send:false ~enable_recv:false db_callbacks in + let conn = { ev_loop = ev_loop; + ev_handle = ev_handle; + ev_fd = fd; + bus = bus; + watches = []; + timeouts = []; + inactive_timeouts = []; + callbacks = callbacks; + } in + let add_watch_fn = add_watch_callback conn in + let rm_watch_fn = remove_watch_callback conn in + let toggle_watch_fn = toggle_watch_callback conn in + let add_timeout_fn = add_timeout_callback conn in + let rm_timeout_fn = remove_timeout_callback conn in + let toggle_timeout_fn = toggle_timeout_callback conn in + let filter _ msg = (callbacks.msg_received_callback conn msg; true) in + DBus.Connection.set_watch_functions bus (add_watch_fn, rm_watch_fn, Some toggle_watch_fn); + DBus.Connection.set_timeout_functions bus (add_timeout_fn, rm_timeout_fn, Some toggle_timeout_fn); + DBus.Connection.add_filter bus filter; + Conns.add_conn ev_handle conn; + conn + +let detach conn = + Conns.remove_conn conn.ev_handle; + Eventloop.remove_conn conn.ev_loop conn.ev_handle diff --git a/common/dbus_conn.mli b/common/dbus_conn.mli new file mode 100644 index 0000000..df8ed6b --- /dev/null +++ b/common/dbus_conn.mli @@ -0,0 +1,31 @@ +(* + * Copyright (C) 2009 Citrix Ltd. + * Author Prashanth Mundkur + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + *) + +type t + +type callbacks = +{ + msg_received_callback : t -> DBus.message -> unit; + error_callback : t -> Eventloop.error -> unit; +} + +val attach : DBus.bus -> Eventloop.t -> callbacks -> t +val detach : t -> unit + +val send : t -> DBus.message -> int32 +val dispatch : t -> unit + +val enable_recv : t -> unit +val disable_recv : t -> unit diff --git a/common/test/Makefile b/common/test/Makefile new file mode 100644 index 0000000..f491304 --- /dev/null +++ b/common/test/Makefile @@ -0,0 +1,32 @@ +TOPLEVEL=../.. +include $(TOPLEVEL)/common.make + +DBUS_INSTALL_DIR ?= $(OCAML_DIR)/dist/lib/ocaml + +OCAMLINCLUDE += -I $(TOPLEVEL)/libs/stdext -I $(TOPLEVEL)/common \ + -I $(DBUS_INSTALL_DIR) + +OCAMLOPTFLAGS += -g -thread + +dbus_mon_OBJS = \ + $(TOPLEVEL)/common/connection_table $(TOPLEVEL)/common/async_conn \ + $(TOPLEVEL)/common/dbus_conn dbus_mon + +dbus_mon_LIBS = unix.cmxa \ + $(TOPLEVEL)/libs/uuid/uuid.cmxa \ + -ccopt -L -ccopt $(TOPLEVEL)/libs/stdext $(TOPLEVEL)/libs/stdext/stdext.cmxa \ + -ccopt -L -ccopt $(DBUS_INSTALL_DIR) $(DBUS_INSTALL_DIR)/dBus.cmxa + +ALL_OCAML_OBJS = $(dbus_mon_OBJS) + +INTF = + +OCAML_PROGRAM = dbus_mon + +PROGRAMS = $(OCAML_PROGRAM) + +all: $(INTF) $(PROGRAMS) + +bins: $(PROGRAMS) + +include $(TOPLEVEL)/Makefile.rules diff --git a/common/test/dbus_mon.ml b/common/test/dbus_mon.ml new file mode 100644 index 0000000..983d3c4 --- /dev/null +++ b/common/test/dbus_mon.ml @@ -0,0 +1,76 @@ +let dbus_conns = ref ([] : Dbus_conn.t list) + +let msg_received_callback conn m = + Printf.printf "Received %s:\n" (DBus.Message.string_of_message_ty (DBus.Message.get_type m)); + (match DBus.Message.get_sender m with + | Some s -> Printf.printf " Sender: %s\n" s + | None -> ()); + (match DBus.Message.get_destination m with + | Some s -> Printf.printf " Destination: %s\n" s + | None -> ()); + (match DBus.Message.get_path m with + | Some s -> Printf.printf " Path: %s\n" s + | None -> ()); + (match DBus.Message.get_interface m with + | Some s -> Printf.printf " Interface: %s\n" s + | None -> ()); + (match DBus.Message.get_member m with + | Some s -> Printf.printf " Method: %s\n" s + | None -> ()); + List.iter (fun arg -> + Printf.printf " Arg: %s\n" (DBus.string_of_ty arg) + ) (DBus.Message.get m) + +let error_callback conn err = + Printf.printf "Received error.\n" + +let callbacks = +{ + Dbus_conn.msg_received_callback = msg_received_callback; + Dbus_conn.error_callback = error_callback; +} + +let destination = "org.freedesktop.DBus" + +let make_ping () = + let path = "/" in + let interface = "org.freedesktop.DBus.Peer" in + let meth = "Ping" in + let msg = DBus.Message.new_method_call destination path interface meth + in msg + +let make_get_machine_id () = + let path = "/" in + let interface = "org.freedesktop.DBus.Peer" in + let meth = "GetMachineId" in + let msg = DBus.Message.new_method_call destination path interface meth + in msg + +let loop el conn = + while Eventloop.has_connections el || Eventloop.has_timers el + do + (* We need to keep this select timeout small, due to + the broken libdbus async API, which requires + dispatch to be called at each iteration of the main + loop. *) + Eventloop.dispatch el 0.5; + List.iter (fun db -> Dbus_conn.dispatch db) !dbus_conns; + done + +let main () = + let el = Eventloop.create () in + let bus = DBus.Bus.get DBus.Bus.System in + let conn = Dbus_conn.attach bus el callbacks in + dbus_conns := conn :: !dbus_conns; + Dbus_conn.enable_recv conn; + ignore (Dbus_conn.send conn (make_ping ())); + ignore (Dbus_conn.send conn (make_get_machine_id ())); + loop el conn + +let _ = + Printexc.record_backtrace true; + + try main () + with e -> + Printf.printf "Uncaught exception: %s at\n" (Printexc.to_string e); + Printf.printf "%s\n" (Printexc.get_backtrace ()) -- 2.39.5