(* A module that supports finding a timer by handle as well as by expiry time. *)
module Timers = struct
- type handle = int
type 'a entry =
{
- handle: handle;
- expires_at: float;
+ handle : int;
+ mutable expires_at: float;
value: 'a;
}
- module Timers_by_handle = Map.Make (struct type t = handle let compare = compare end)
module Timers_by_expiry = Map.Make (struct type t = float let compare = compare end)
type 'a t =
{
- mutable by_handle: ('a entry) Timers_by_handle.t;
mutable by_expiry: (('a entry) list) Timers_by_expiry.t;
}
- let create () = { by_handle = Timers_by_handle.empty;
- by_expiry = Timers_by_expiry.empty }
+ let create () = { by_expiry = Timers_by_expiry.empty }
- let is_empty t = Timers_by_handle.is_empty t.by_handle
+ let is_empty t = Timers_by_expiry.is_empty t.by_expiry
let next_handle = ref 0
+ (** inserts an existing (but not inserted) entry in the map *)
+ let submit_timer t at e =
+ e.expires_at <- at;
+ let es = try Timers_by_expiry.find e.expires_at t.by_expiry with Not_found -> [] in
+ t.by_expiry <- Timers_by_expiry.add e.expires_at (e :: es) t.by_expiry
+
let add_timer t at v =
incr next_handle;
let e = { handle = !next_handle; expires_at = at; value = v } in
- t.by_handle <- Timers_by_handle.add e.handle e t.by_handle;
- let es = try Timers_by_expiry.find e.expires_at t.by_expiry with Not_found -> [] in
- t.by_expiry <- Timers_by_expiry.add e.expires_at (e :: es) t.by_expiry;
- e.handle
+ submit_timer t at e;
+ e
- let remove_timer t handle =
- let e = Timers_by_handle.find handle t.by_handle in
- let es = Timers_by_expiry.find e.expires_at t.by_expiry in
+ let remove_timer t entry =
+ let handle = entry.handle in
+ let es = Timers_by_expiry.find entry.expires_at t.by_expiry in
let es = List.filter (fun e' -> e'.handle <> handle) es in
- t.by_handle <- Timers_by_handle.remove handle t.by_handle;
t.by_expiry <- (match es with
- | [] -> Timers_by_expiry.remove e.expires_at t.by_expiry
- | _ -> Timers_by_expiry.add e.expires_at es t.by_expiry
+ | [] -> Timers_by_expiry.remove entry.expires_at t.by_expiry
+ | _ -> Timers_by_expiry.add entry.expires_at es t.by_expiry
)
exception Found of float
raise Not_found
with Found tim -> tim
+ (* Extracts the timers for time t, and return a list of values for those timers *)
let extract_timers_at t tim =
try
let es = Timers_by_expiry.find tim t.by_expiry in
t.by_expiry <- Timers_by_expiry.remove tim t.by_expiry;
- t.by_handle <- List.fold_left (fun byh e ->
- Timers_by_handle.remove e.handle byh
- ) t.by_handle es;
List.map (fun e -> e.value) es
with Not_found -> []
- let num_timers t =
- let cnt = ref 0 in
- Timers_by_handle.iter (fun _ _ -> incr cnt) t.by_handle;
- !cnt
end
-type timer_callbacks =
-{
- expiry_callback : unit -> unit
-}
-
type error = Unix.error * string * string
type handle = Unix.file_descr
and t =
{
mutable conns: conn_state ConnMap.t;
- mutable timers: timer_callbacks Timers.t;
+ mutable timers: (unit -> unit) Timers.t;
(* select state *)
readers: Unixext.Fdset.t;
writers: Unixext.Fdset.t;
(* dispatch state *)
mutable d_readers: Unixext.Fdset.t;
mutable d_writers: Unixext.Fdset.t;
+ (** Unix.gettimeofday() at the time the loop iteration started *)
+ mutable current_time: float;
}
let create () =
excepts = Unixext.Fdset.create ();
d_readers = Unixext.Fdset.create ();
d_writers = Unixext.Fdset.create ();
+ current_time = 0.0;
}
-let num_connections t =
- let cnt = ref 0 in
- ConnMap.iter (fun _ _ -> incr cnt) t.conns;
- !cnt
-
-let num_timers t = Timers.num_timers t.timers
-
(* connections *)
let register_conn t fd ?(enable_send_done=false) ?(enable_recv=true) callbacks =
let conn_state = ConnMap.find handle t.conns in
conn_state.callbacks <- callbacks
+let has_connections t = not (ConnMap.is_empty t.conns)
+
(* timers *)
-type timer = Timers.handle
+type timer = (unit -> unit) Timers.entry
-let start_timer t interval callback =
- let at = Unix.gettimeofday () +. interval in
- Timers.add_timer t.timers at callback
+let start_timer t time_offset_sec cb =
+ let at = Unix.gettimeofday () +. time_offset_sec in
+ Timers.add_timer t.timers at cb
+let start_periodic_timer t time_offset_sec period cb =
+ let orig_timer = ref (None: timer option) in
+ let resubmit_timer_closure () =
+ let orig_timer = match !orig_timer with None -> raise Not_found | Some x -> x in
+ Timers.submit_timer t.timers (t.current_time +. period) orig_timer;
+ cb (); (* invoke the user's callback *)
+ in
+ let new_timer = start_timer t time_offset_sec resubmit_timer_closure in
+ orig_timer := Some (new_timer);
+ new_timer
+
let cancel_timer t timer =
Timers.remove_timer t.timers timer
+let has_timers t = not (Timers.is_empty t.timers)
+
(* event dispatch *)
let buf = String.create 512
cs.callbacks.send_done_callback t fd
end
-let dispatch_timers t current_time =
+let dispatch_timers t =
let break = ref false in
- while (not (Timers.is_empty t.timers) && not !break) do
+ while ((not (Timers.is_empty t.timers)) && (not !break)) do
let first_expired = Timers.get_first_expiry_time t.timers in
- if first_expired > current_time then
+ if first_expired > t.current_time then
break := true
else begin
let cbs = Timers.extract_timers_at t.timers first_expired in
- List.iter (fun cb -> cb.expiry_callback ()) cbs
+ List.iter (fun cb -> cb ()) cbs
end
done
+let timers_empty t = Timers.is_empty t.timers
+
let dispatch t interval =
- let ctime = Unix.gettimeofday () in
+ t.current_time <- Unix.gettimeofday ();
let interval =
if Timers.is_empty t.timers then interval
else
interval before which the earliest timer
expires.
*)
- let block_until = if interval > 0.0 then ctime +. interval else ctime in
+ let block_until = if interval > 0.0 then t.current_time +. interval else t.current_time in
let first_expiry = Timers.get_first_expiry_time t.timers in
let block_until = (if first_expiry < block_until then first_expiry else block_until) in
- let interval = block_until -. ctime in
+ let interval = block_until -. t.current_time in
if interval < 0.0 then 0.0 else interval
in
let events =
) t.conns
| None -> ()
);
-
- dispatch_timers t ctime
+ dispatch_timers t
type timer
-type timer_callbacks =
-{
- expiry_callback : unit -> unit
-}
+(** Starts a timer that will fire once only, and return a handle to
+ this timer, so that it can be cancelled before it fires. The timer
+ is automatically cancelled once it has fired.
+*)
+val start_timer : t -> float (* offset, secs *) -> (unit -> unit) -> timer
+
+(** Starts a timer that will fire periodically. The timer needs
+ explicit cancellation.
+*)
+val start_periodic_timer: t -> float (* offset from current time, secs *) -> float (* period, secs *) -> (unit -> unit) -> timer
-(* currently, periodic timers are not supported. *)
-val start_timer : t -> float (* in seconds *) -> timer_callbacks -> timer
+(** Allows cancelling a timer before it fires. Non-periodic timers
+ are implicitly cancelled when their timer fires. Periodic timers
+ however need explicit cancellation.
+*)
val cancel_timer : t -> timer -> unit
(* Event loop management *)
-val num_connections : t -> int
-val num_timers : t -> int
+val has_timers : t -> bool
+
+val has_connections : t -> bool
+