From 78b65082a8735679e5a62a90e06cac2bd1fb1632 Mon Sep 17 00:00:00 2001 From: Prashanth Mundkur Date: Tue, 30 Jun 2009 12:47:26 -0700 Subject: [PATCH] [eventloop] expose a lower-level API to allow integration with D-Bus and SSL connections. --- common/connection_table.ml | 3 ++ libs/stdext/eventloop.ml | 87 +++++++++----------------------------- libs/stdext/eventloop.mli | 21 +++------ 3 files changed, 30 insertions(+), 81 deletions(-) diff --git a/common/connection_table.ml b/common/connection_table.ml index 862706e..53d185c 100644 --- a/common/connection_table.ml +++ b/common/connection_table.ml @@ -35,6 +35,9 @@ struct let get_conn h = ConnectionMap.find h !conns + let has_conn h = + ConnectionMap.mem h !conns + let remove_conn h = conns := ConnectionMap.remove h !conns diff --git a/libs/stdext/eventloop.ml b/libs/stdext/eventloop.ml index b447e2a..b4337fe 100644 --- a/libs/stdext/eventloop.ml +++ b/libs/stdext/eventloop.ml @@ -92,6 +92,7 @@ type error = Unix.error * string * string type handle = Unix.file_descr let handle_compare = compare +let handle_hash h = Unixext.int_of_file_descr h type conn_status = | Connecting @@ -102,20 +103,17 @@ type conn_callbacks = { accept_callback : t -> handle -> Unix.file_descr -> Unix.sockaddr -> unit; connect_callback : t -> handle -> unit; - recv_callback : t -> handle -> string -> (* offset *) int -> (* length *) int -> unit; - send_done_callback : t -> handle -> unit; - shutdown_callback : t -> handle -> unit; error_callback : t -> handle -> error -> unit; + recv_ready_callback : t -> handle -> Unix.file_descr -> unit; + send_ready_callback : t -> handle -> Unix.file_descr -> unit; } and conn_state = { mutable callbacks : conn_callbacks; mutable status : conn_status; - mutable send_done_enabled : bool; + mutable send_enabled : bool; mutable recv_enabled : bool; - - send_buf : Buffer.t; } and t = @@ -146,12 +144,11 @@ let create () = (* connections *) -let register_conn t fd ?(enable_send_done=false) ?(enable_recv=true) callbacks = +let register_conn t fd ?(enable_send=false) ?(enable_recv=true) callbacks = let conn_state = { callbacks = callbacks; status = Connected; - send_done_enabled = enable_send_done; + send_enabled = enable_send; recv_enabled = enable_recv; - send_buf = Buffer.create 16; } in t.conns <- ConnMap.add fd conn_state t.conns; @@ -192,13 +189,17 @@ let listen t handle = conn_state.recv_enabled <- true; conn_state.status <- Listening -let enable_send_done t handle = +let enable_send t handle = let conn_state = ConnMap.find handle t.conns in - conn_state.send_done_enabled <- true + conn_state.send_enabled <- true; + if conn_state.status = Connected then + Unixext.Fdset.set t.writers handle -let disable_send_done t handle = +let disable_send t handle = let conn_state = ConnMap.find handle t.conns in - conn_state.send_done_enabled <- false + conn_state.send_enabled <- false; + if conn_state.status = Connected then + Unixext.Fdset.clear t.writers handle let enable_recv t handle = let conn_state = ConnMap.find handle t.conns in @@ -212,15 +213,6 @@ let disable_recv t handle = if conn_state.status = Connected then Unixext.Fdset.clear t.readers handle -let send t handle s = - let conn_state = ConnMap.find handle t.conns in - Buffer.add_string conn_state.send_buf s; - Unixext.Fdset.set t.writers handle - -let has_pending_send t handle = - let conn_state = ConnMap.find handle t.conns in - Buffer.length conn_state.send_buf > 0 - let set_callbacks t handle callbacks = let conn_state = ConnMap.find handle t.conns in conn_state.callbacks <- callbacks @@ -256,9 +248,6 @@ let has_timers t = not (Timers.is_empty t.timers) (* event dispatch *) -let buf = String.create 512 -let buflen = String.length buf - let dispatch_read t fd cs = match cs.status with | Connecting -> @@ -284,42 +273,9 @@ let dispatch_read t fd cs = cs.callbacks.error_callback t fd (ec, f, s) ) | Connected -> - if cs.recv_enabled then - try - let read_bytes = Unix.read fd buf 0 buflen in - if read_bytes = 0 then - cs.callbacks.shutdown_callback t fd - else begin - dbg "<- %s" (String.sub buf 0 read_bytes); - cs.callbacks.recv_callback t fd buf 0 read_bytes - end - with - | Unix.Unix_error (Unix.EWOULDBLOCK, _, _) - | Unix.Unix_error (Unix.EAGAIN, _, _) - | Unix.Unix_error (Unix.EINTR, _, _) -> - () - | Unix.Unix_error (ec, f, s) -> - cs.callbacks.error_callback t fd (ec, f, s) - else - Unixext.Fdset.clear t.readers fd -let do_send t fd cs = - let payload = Buffer.contents cs.send_buf in - let payload_len = String.length payload in - try - (match Unix.write fd payload 0 payload_len with - | 0 -> () - | sent -> - dbg "-> %s" (String.sub payload 0 sent); - Buffer.clear cs.send_buf; - Buffer.add_substring cs.send_buf payload sent (payload_len - sent) - ) - with - | Unix.Unix_error (Unix.EWOULDBLOCK, _, _) - | Unix.Unix_error (Unix.EAGAIN, _, _) - | Unix.Unix_error (Unix.EINTR, _, _) -> - () - | Unix.Unix_error (ec, f, s) -> - cs.callbacks.error_callback t fd (ec, f, s) + if cs.recv_enabled + then cs.callbacks.recv_ready_callback t fd fd + else Unixext.Fdset.clear t.readers fd let dispatch_write t fd cs = match cs.status with @@ -342,12 +298,9 @@ let dispatch_write t fd cs = writes, we disable the write watch. *) Unixext.Fdset.clear t.writers fd | Connected -> - do_send t fd cs; - if Buffer.length cs.send_buf = 0 then begin - Unixext.Fdset.clear t.writers fd; - if cs.send_done_enabled then - cs.callbacks.send_done_callback t fd - end + if cs.send_enabled + then cs.callbacks.send_ready_callback t fd fd + else Unixext.Fdset.clear t.writers fd let dispatch_timers t = let break = ref false in diff --git a/libs/stdext/eventloop.mli b/libs/stdext/eventloop.mli index 8f11007..edaa2a4 100644 --- a/libs/stdext/eventloop.mli +++ b/libs/stdext/eventloop.mli @@ -27,37 +27,33 @@ type conn_callbacks = { accept_callback : t -> handle -> Unix.file_descr -> Unix.sockaddr -> unit; connect_callback : t -> handle -> unit; - recv_callback : t -> handle -> string -> (* offset *) int -> (* length *) int -> unit; - send_done_callback : t -> handle -> unit; - shutdown_callback : t -> handle -> unit; error_callback : t -> handle -> error -> unit; + recv_ready_callback : t -> handle -> Unix.file_descr -> unit; + send_ready_callback : t -> handle -> Unix.file_descr -> unit; } (* this is to allow collections indexed by connection handles. *) val handle_compare : handle -> handle -> int +val handle_hash : handle -> int (* Connection Management *) -(* by default, connections are disabled for the send_done callback, and enabled for all others. *) -val register_conn : t -> Unix.file_descr -> ?enable_send_done:bool -> ?enable_recv:bool -> conn_callbacks -> handle +(* by default, notifications for incoming data are disabled, and enabled for all others. *) +val register_conn : t -> Unix.file_descr -> ?enable_send:bool -> ?enable_recv:bool -> conn_callbacks -> handle val remove_conn : t -> handle -> unit val get_fd : t -> handle -> Unix.file_descr val connect : t -> handle -> Unix.sockaddr -> unit val listen : t -> handle -> unit -val enable_send_done : t -> handle -> unit -val disable_send_done : t -> handle -> unit +val enable_send : t -> handle -> unit +val disable_send : t -> handle -> unit val enable_recv : t -> handle -> unit val disable_recv : t -> handle -> unit -val send : t -> handle -> string -> unit -val has_pending_send : t -> handle -> bool - val set_callbacks : t -> handle -> conn_callbacks -> unit - (* Timers *) type timer @@ -96,6 +92,3 @@ val dispatch : t -> float -> unit val has_timers : t -> bool val has_connections : t -> bool - - - -- 2.39.5