type t =
{
backend: backend;
- pkt_in: Packet.t Queue.t;
pkt_out: Packet.t Queue.t;
mutable partial_in: partial_buf;
mutable partial_out: string;
Xs_ring.close backend.mmap;
backend.eventchn_notify ();
(* Clear our old connection state *)
- Queue.clear t.pkt_in;
Queue.clear t.pkt_out;
t.partial_in <- init_partial_in ();
t.partial_out <- ""
(* NB: can throw Reconnect *)
let input con =
- let newpacket = ref false in
let to_read =
match con.partial_in with
| HaveHdr partial_pkt -> Partial.to_complete partial_pkt
if Partial.to_complete partial_pkt = 0 then (
let pkt = Packet.of_partialpkt partial_pkt in
con.partial_in <- init_partial_in ();
- Queue.push pkt con.pkt_in;
- newpacket := true
- )
+ Some pkt
+ ) else None
| NoHdr (i, buf) ->
(* we complete the partial header *)
if sz > 0 then
Bytes.blit b 0 buf (Partial.header_size () - i) sz;
con.partial_in <- if sz = i then
- HaveHdr (Partial.of_string (Bytes.to_string buf)) else NoHdr (i - sz, buf)
- );
- !newpacket
+ HaveHdr (Partial.of_string (Bytes.to_string buf)) else NoHdr (i - sz, buf);
+ None
+ )
let newcon backend = {
backend = backend;
- pkt_in = Queue.create ();
pkt_out = Queue.create ();
partial_in = init_partial_in ();
partial_out = "";
let peek_output con = Queue.peek con.pkt_out
-let input_len con = Queue.length con.pkt_in
-let has_in_packet con = Queue.length con.pkt_in > 0
-let get_in_packet con = Queue.pop con.pkt_in
let has_partial_input con = match con.partial_in with
| HaveHdr _ -> true
| NoHdr (n, _) -> n < Partial.header_size ()
val write_mmap : backend_mmap -> 'a -> string -> int -> int
val write : t -> string -> int -> int
val output : t -> bool
-val input : t -> bool
+val input : t -> Packet.t option
val newcon : backend -> t
val open_fd : Unix.file_descr -> t
val open_mmap : Xenmmap.mmap_interface -> (unit -> unit) -> t
val has_old_output : t -> bool
val has_output : t -> bool
val peek_output : t -> Packet.t
-val input_len : t -> int
-val has_in_packet : t -> bool
val has_partial_input : t -> bool
-val get_in_packet : t -> Packet.t
val has_more_input : t -> bool
val is_selectable : t -> bool
val get_fd : t -> Unix.file_descr
done
(* receive one packet - can sleep *)
-let pkt_recv con =
- let workdone = ref false in
- while not !workdone
- do
- workdone := Xb.input con.xb
- done;
- Xb.get_in_packet con.xb
+let rec pkt_recv con =
+ match Xb.input con.xb with
+ | Some packet -> packet
+ | None -> pkt_recv con
let pkt_recv_timeout con timeout =
let fd = Xb.get_fd con.xb in
let r, _, _ = Unix.select [ fd ] [] [] timeout in
if r = [] then
true, None
- else (
- let workdone = Xb.input con.xb in
- if workdone then
- false, (Some (Xb.get_in_packet con.xb))
- else
- false, None
- )
+ else
+ false, Xb.input con.xb
let queue_watchevent con data =
let ls = split_string ~limit:2 '\000' data in
Hashtbl.find con.transactions tid
let do_input con = Xenbus.Xb.input con.xb
-let has_input con = Xenbus.Xb.has_in_packet con.xb
-let pop_in con = Xenbus.Xb.get_in_packet con.xb
let has_more_input con = Xenbus.Xb.has_more_input con.xb
let has_output con = Xenbus.Xb.has_output con.xb
info "%s requests a reconnect" (Connection.get_domstr con);
History.reconnect con;
info "%s reconnection complete" (Connection.get_domstr con);
- false
+ None
| Failure exp ->
error "caught exception %s" exp;
error "got a bad client %s" (sprintf "%-8s" (Connection.get_domstr con));
Connection.mark_as_bad con;
- false
+ None
in
- if newpacket then (
- let packet = Connection.pop_in con in
+ match newpacket with
+ | None -> ()
+ | Some packet ->
let tid, rid, ty, data = Xenbus.Xb.Packet.unpack packet in
let req = {Packet.tid=tid; Packet.rid=rid; Packet.ty=ty; Packet.data=data} in
(Xenbus.Xb.Op.to_string ty) (sanitize_data data); *)
process_packet ~store ~cons ~doms ~con ~req;
write_access_log ~ty ~tid ~con:(Connection.get_domstr con) ~data;
- Connection.incr_ops con;
- )
+ Connection.incr_ops con
let do_output _store _cons _doms con =
if Connection.has_output con then (