type handle = Unix.file_descr
let handle_compare = compare
+let handle_hash h = Unixext.int_of_file_descr h
type conn_status =
| Connecting
{
accept_callback : t -> handle -> Unix.file_descr -> Unix.sockaddr -> unit;
connect_callback : t -> handle -> unit;
- recv_callback : t -> handle -> string -> (* offset *) int -> (* length *) int -> unit;
- send_done_callback : t -> handle -> unit;
- shutdown_callback : t -> handle -> unit;
error_callback : t -> handle -> error -> unit;
+ recv_ready_callback : t -> handle -> Unix.file_descr -> unit;
+ send_ready_callback : t -> handle -> Unix.file_descr -> unit;
}
and conn_state =
{
mutable callbacks : conn_callbacks;
mutable status : conn_status;
- mutable send_done_enabled : bool;
+ mutable send_enabled : bool;
mutable recv_enabled : bool;
-
- send_buf : Buffer.t;
}
and t =
(* connections *)
-let register_conn t fd ?(enable_send_done=false) ?(enable_recv=true) callbacks =
+let register_conn t fd ?(enable_send=false) ?(enable_recv=true) callbacks =
let conn_state = { callbacks = callbacks;
status = Connected;
- send_done_enabled = enable_send_done;
+ send_enabled = enable_send;
recv_enabled = enable_recv;
- send_buf = Buffer.create 16;
}
in
t.conns <- ConnMap.add fd conn_state t.conns;
conn_state.recv_enabled <- true;
conn_state.status <- Listening
-let enable_send_done t handle =
+let enable_send t handle =
let conn_state = ConnMap.find handle t.conns in
- conn_state.send_done_enabled <- true
+ conn_state.send_enabled <- true;
+ if conn_state.status = Connected then
+ Unixext.Fdset.set t.writers handle
-let disable_send_done t handle =
+let disable_send t handle =
let conn_state = ConnMap.find handle t.conns in
- conn_state.send_done_enabled <- false
+ conn_state.send_enabled <- false;
+ if conn_state.status = Connected then
+ Unixext.Fdset.clear t.writers handle
let enable_recv t handle =
let conn_state = ConnMap.find handle t.conns in
if conn_state.status = Connected then
Unixext.Fdset.clear t.readers handle
-let send t handle s =
- let conn_state = ConnMap.find handle t.conns in
- Buffer.add_string conn_state.send_buf s;
- Unixext.Fdset.set t.writers handle
-
-let has_pending_send t handle =
- let conn_state = ConnMap.find handle t.conns in
- Buffer.length conn_state.send_buf > 0
-
let set_callbacks t handle callbacks =
let conn_state = ConnMap.find handle t.conns in
conn_state.callbacks <- callbacks
(* event dispatch *)
-let buf = String.create 512
-let buflen = String.length buf
-
let dispatch_read t fd cs =
match cs.status with
| Connecting ->
cs.callbacks.error_callback t fd (ec, f, s)
)
| Connected ->
- if cs.recv_enabled then
- try
- let read_bytes = Unix.read fd buf 0 buflen in
- if read_bytes = 0 then
- cs.callbacks.shutdown_callback t fd
- else begin
- dbg "<- %s" (String.sub buf 0 read_bytes);
- cs.callbacks.recv_callback t fd buf 0 read_bytes
- end
- with
- | Unix.Unix_error (Unix.EWOULDBLOCK, _, _)
- | Unix.Unix_error (Unix.EAGAIN, _, _)
- | Unix.Unix_error (Unix.EINTR, _, _) ->
- ()
- | Unix.Unix_error (ec, f, s) ->
- cs.callbacks.error_callback t fd (ec, f, s)
- else
- Unixext.Fdset.clear t.readers fd
-let do_send t fd cs =
- let payload = Buffer.contents cs.send_buf in
- let payload_len = String.length payload in
- try
- (match Unix.write fd payload 0 payload_len with
- | 0 -> ()
- | sent ->
- dbg "-> %s" (String.sub payload 0 sent);
- Buffer.clear cs.send_buf;
- Buffer.add_substring cs.send_buf payload sent (payload_len - sent)
- )
- with
- | Unix.Unix_error (Unix.EWOULDBLOCK, _, _)
- | Unix.Unix_error (Unix.EAGAIN, _, _)
- | Unix.Unix_error (Unix.EINTR, _, _) ->
- ()
- | Unix.Unix_error (ec, f, s) ->
- cs.callbacks.error_callback t fd (ec, f, s)
+ if cs.recv_enabled
+ then cs.callbacks.recv_ready_callback t fd fd
+ else Unixext.Fdset.clear t.readers fd
let dispatch_write t fd cs =
match cs.status with
writes, we disable the write watch. *)
Unixext.Fdset.clear t.writers fd
| Connected ->
- do_send t fd cs;
- if Buffer.length cs.send_buf = 0 then begin
- Unixext.Fdset.clear t.writers fd;
- if cs.send_done_enabled then
- cs.callbacks.send_done_callback t fd
- end
+ if cs.send_enabled
+ then cs.callbacks.send_ready_callback t fd fd
+ else Unixext.Fdset.clear t.writers fd
let dispatch_timers t =
let break = ref false in
{
accept_callback : t -> handle -> Unix.file_descr -> Unix.sockaddr -> unit;
connect_callback : t -> handle -> unit;
- recv_callback : t -> handle -> string -> (* offset *) int -> (* length *) int -> unit;
- send_done_callback : t -> handle -> unit;
- shutdown_callback : t -> handle -> unit;
error_callback : t -> handle -> error -> unit;
+ recv_ready_callback : t -> handle -> Unix.file_descr -> unit;
+ send_ready_callback : t -> handle -> Unix.file_descr -> unit;
}
(* this is to allow collections indexed by connection handles. *)
val handle_compare : handle -> handle -> int
+val handle_hash : handle -> int
(* Connection Management *)
-(* by default, connections are disabled for the send_done callback, and enabled for all others. *)
-val register_conn : t -> Unix.file_descr -> ?enable_send_done:bool -> ?enable_recv:bool -> conn_callbacks -> handle
+(* by default, notifications for incoming data are disabled, and enabled for all others. *)
+val register_conn : t -> Unix.file_descr -> ?enable_send:bool -> ?enable_recv:bool -> conn_callbacks -> handle
val remove_conn : t -> handle -> unit
val get_fd : t -> handle -> Unix.file_descr
val connect : t -> handle -> Unix.sockaddr -> unit
val listen : t -> handle -> unit
-val enable_send_done : t -> handle -> unit
-val disable_send_done : t -> handle -> unit
+val enable_send : t -> handle -> unit
+val disable_send : t -> handle -> unit
val enable_recv : t -> handle -> unit
val disable_recv : t -> handle -> unit
-val send : t -> handle -> string -> unit
-val has_pending_send : t -> handle -> bool
-
val set_callbacks : t -> handle -> conn_callbacks -> unit
-
(* Timers *)
type timer
val has_timers : t -> bool
val has_connections : t -> bool
-
-
-