]> xenbits.xensource.com Git - xen.git/commitdiff
tools/ocaml: Limit maximum in-flight requests / outstanding replies
authorEdwin Török <edvin.torok@citrix.com>
Wed, 12 Oct 2022 18:13:04 +0000 (19:13 +0100)
committerAndrew Cooper <andrew.cooper3@citrix.com>
Tue, 1 Nov 2022 15:20:41 +0000 (15:20 +0000)
Introduce a limit on the number of outstanding reply packets in the xenbus
queue.  This limits the number of in-flight requests: when the output queue is
full we'll stop processing inputs until the output queue has room again.

To avoid a busy loop on the Unix socket we only add it to the watched input
file descriptor set if we'd be able to call `input` on it.  Even though Dom0
is trusted and exempt from quotas a flood of events might cause a backlog
where events are produced faster than daemons in Dom0 can consume them, which
could lead to an unbounded queue size and OOM.

Therefore the xenbus queue limit must apply to all connections, Dom0 is not
exempt from it, although if everything works correctly it will eventually
catch up.

This prevents a malicious guest from sending more commands while it has
outstanding watch events or command replies in its input ring.  However if it
can cause the generation of watch events by other means (e.g. by Dom0, or
another cooperative guest) and stop reading its own ring then watch events
would've queued up without limit.

The xenstore protocol doesn't have a back-pressure mechanism, and doesn't
allow dropping watch events.  In fact, dropping watch events is known to break
some pieces of normal functionality.  This leaves little choice to safely
implement the xenstore protocol without exposing the xenstore daemon to
out-of-memory attacks.

Implement the fix as pipes with bounded buffers:
* Use a bounded buffer for watch events
* The watch structure will have a bounded receiving pipe of watch events
* The source will have an "overflow" pipe of pending watch events it couldn't
  deliver

Items are queued up on one end and are sent as far along the pipe as possible:

  source domain -> watch -> xenbus of target -> xenstore ring/socket of target

If the pipe is "full" at any point then back-pressure is applied and we prevent
more items from being queued up.  For the source domain this means that we'll
stop accepting new commands as long as its pipe buffer is not empty.

Before we try to enqueue an item we first check whether it is possible to send
it further down the pipe, by attempting to recursively flush the pipes. This
ensures that we retain the order of events as much as possible.

We might break causality of watch events if the target domain's queue is full
and we need to start using the watch's queue.  This is a breaking change in
the xenstore protocol, but only for domains which are not processing their
incoming ring as expected.

When a watch is deleted its entire pending queue is dropped (no code is needed
for that, because it is part of the 'watch' type).

There is a cache of watches that have pending events that we attempt to flush
at every cycle if possible.

Introduce 3 limits here:
* quota-maxwatchevents on watch event destination: when this is hit the
  source will not be allowed to queue up more watch events.
* quota-maxoustanding which is the number of responses not read from the ring:
  once exceeded, no more inputs are processed until all outstanding replies
  are consumed by the client.
* overflow queue on the watch event source: all watches that cannot be stored
  on destination are queued up here, a single command can trigger multiple
  watches (e.g. due to recursion).

The overflow queue currently doesn't have an upper bound, it is difficult to
accurately calculate one as it depends on whether you are Dom0 and how many
watches each path has registered and how many watch events you can trigger
with a single command (e.g. a commit).  However these events were already
using memory, this just moves them elsewhere, and as long as we correctly
block a domain it shouldn't result in unbounded memory usage.

Note that Dom0 is not excluded from these checks, it is important that Dom0 is
especially not excluded when it is the source, since there are many ways in
which a guest could trigger Dom0 to send it watch events.

This should protect against malicious frontends as long as the backend follows
the PV xenstore protocol and only exposes paths needed by the frontend, and
changes those paths at most once as a reaction to guest events, or protocol
state.

The queue limits are per watch, and per domain-pair, so even if one
communication channel would be "blocked", others would keep working, and the
domain itself won't get blocked as long as it doesn't overflow the queue of
watch events.

Similarly a malicious backend could cause the frontend to get blocked, but
this watch queue protects the frontend as well as long as it follows the PV
protocol.  (Although note that protection against malicious backends is only a
best effort at the moment)

This is part of XSA-326 / CVE-2022-42318.

Signed-off-by: Edwin Török <edvin.torok@citrix.com>
Acked-by: Christian Lindig <christian.lindig@citrix.com>
(cherry picked from commit 9284ae0c40fb5b9606947eaaec23dc71d0540e96)

tools/ocaml/libs/xb/xb.ml
tools/ocaml/libs/xb/xb.mli
tools/ocaml/libs/xs/queueop.ml
tools/ocaml/libs/xs/xsraw.ml
tools/ocaml/xenstored/connection.ml
tools/ocaml/xenstored/connections.ml
tools/ocaml/xenstored/define.ml
tools/ocaml/xenstored/oxenstored.conf.in
tools/ocaml/xenstored/process.ml
tools/ocaml/xenstored/xenstored.ml

index 4197a3888a687c5179baf247f6b52e0f0eeca478..b292ed7a874da0255b73153b96c9fe407c4a797e 100644 (file)
@@ -134,14 +134,44 @@ type backend = Fd of backend_fd | Xenmmap of backend_mmap
 
 type partial_buf = HaveHdr of Partial.pkt | NoHdr of int * bytes
 
+(*
+       separate capacity reservation for replies and watch events:
+       this allows a domain to keep working even when under a constant flood of
+       watch events
+*)
+type capacity = { maxoutstanding: int; maxwatchevents: int }
+
+module Queue = BoundedQueue
+
+type packet_class =
+       | CommandReply
+       | Watchevent
+
+let string_of_packet_class = function
+       | CommandReply -> "command_reply"
+       | Watchevent -> "watch_event"
+
 type t =
 {
        backend: backend;
-       pkt_out: Packet.t Queue.t;
+       pkt_out: (Packet.t, packet_class) Queue.t;
        mutable partial_in: partial_buf;
        mutable partial_out: string;
+       capacity: capacity
 }
 
+let to_read con =
+       match con.partial_in with
+               | HaveHdr partial_pkt -> Partial.to_complete partial_pkt
+               | NoHdr   (i, _)    -> i
+
+let debug t =
+       Printf.sprintf "XenBus state: partial_in: %d needed, partial_out: %d bytes, pkt_out: %d packets, %s"
+               (to_read t)
+               (String.length t.partial_out)
+               (Queue.length t.pkt_out)
+               (BoundedQueue.debug string_of_packet_class t.pkt_out)
+
 let init_partial_in () = NoHdr
        (Partial.header_size (), Bytes.make (Partial.header_size()) '\000')
 
@@ -199,7 +229,8 @@ let output con =
        let s = if String.length con.partial_out > 0 then
                        con.partial_out
                else if Queue.length con.pkt_out > 0 then
-                       Packet.to_string (Queue.pop con.pkt_out)
+                       let pkt = Queue.pop con.pkt_out in
+                       Packet.to_string pkt
                else
                        "" in
        (* send data from s, and save the unsent data to partial_out *)
@@ -212,12 +243,15 @@ let output con =
        (* after sending one packet, partial is empty *)
        con.partial_out = ""
 
+(* we can only process an input packet if we're guaranteed to have room
+   to store the response packet *)
+let can_input con = Queue.can_push con.pkt_out CommandReply
+
 (* NB: can throw Reconnect *)
 let input con =
-       let to_read =
-               match con.partial_in with
-               | HaveHdr partial_pkt -> Partial.to_complete partial_pkt
-               | NoHdr   (i, _)    -> i in
+       if not (can_input con) then None
+       else
+       let to_read = to_read con in
 
        (* try to get more data from input stream *)
        let b = Bytes.make to_read '\000' in
@@ -243,11 +277,22 @@ let input con =
                None
        )
 
-let newcon backend = {
+let classify t =
+       match t.Packet.ty with
+       | Op.Watchevent -> Watchevent
+       | _ -> CommandReply
+
+let newcon ~capacity backend =
+       let limit = function
+               | CommandReply -> capacity.maxoutstanding
+               | Watchevent -> capacity.maxwatchevents
+       in
+       {
        backend = backend;
-       pkt_out = Queue.create ();
+       pkt_out = Queue.create ~capacity:(capacity.maxoutstanding + capacity.maxwatchevents) ~classify ~limit;
        partial_in = init_partial_in ();
        partial_out = "";
+       capacity = capacity;
        }
 
 let open_fd fd = newcon (Fd { fd = fd; })
index 91c682162cea207810512d7dde48ab0f3c02ba9f..71b2754ca7880a6b645c104ca55cf82ff4877ae1 100644 (file)
@@ -66,10 +66,11 @@ type backend_mmap = {
 type backend_fd = { fd : Unix.file_descr; }
 type backend = Fd of backend_fd | Xenmmap of backend_mmap
 type partial_buf = HaveHdr of Partial.pkt | NoHdr of int * bytes
+type capacity = { maxoutstanding: int; maxwatchevents: int }
 type t
 val init_partial_in : unit -> partial_buf
 val reconnect : t -> unit
-val queue : t -> Packet.t -> unit
+val queue : t -> Packet.t -> unit option
 val read_fd : backend_fd -> 'a -> bytes -> int -> int
 val read_mmap : backend_mmap -> 'a -> bytes -> int -> int
 val read : t -> bytes -> int -> int
@@ -78,13 +79,14 @@ val write_mmap : backend_mmap -> 'a -> string -> int -> int
 val write : t -> string -> int -> int
 val output : t -> bool
 val input : t -> Packet.t option
-val newcon : backend -> t
-val open_fd : Unix.file_descr -> t
-val open_mmap : Xenmmap.mmap_interface -> (unit -> unit) -> t
+val newcon : capacity:capacity -> backend -> t
+val open_fd : Unix.file_descr -> capacity:capacity -> t
+val open_mmap : Xenmmap.mmap_interface -> (unit -> unit) -> capacity:capacity -> t
 val close : t -> unit
 val is_fd : t -> bool
 val is_mmap : t -> bool
 val output_len : t -> int
+val can_input: t -> bool
 val has_new_output : t -> bool
 val has_old_output : t -> bool
 val has_output : t -> bool
@@ -93,3 +95,4 @@ val has_partial_input : t -> bool
 val has_more_input : t -> bool
 val is_selectable : t -> bool
 val get_fd : t -> Unix.file_descr
+val debug: t -> string
index 9ff5bbd529ce4130069679b234b31fb52815f223..4e532cdaeacba5f5c40bff66adfb21c6c25ffa9f 100644 (file)
 open Xenbus
 
 let data_concat ls = (String.concat "\000" ls) ^ "\000"
+let queue con pkt = let r = Xb.queue con pkt in assert (r <> None)
 let queue_path ty (tid: int) (path: string) con =
        let data = data_concat [ path; ] in
-       Xb.queue con (Xb.Packet.create tid 0 ty data)
+       queue con (Xb.Packet.create tid 0 ty data)
 
 (* operations *)
 let directory tid path con = queue_path Xb.Op.Directory tid path con
@@ -27,48 +28,48 @@ let read tid path con = queue_path Xb.Op.Read tid path con
 let getperms tid path con = queue_path Xb.Op.Getperms tid path con
 
 let debug commands con =
-       Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Debug (data_concat commands))
+       queue con (Xb.Packet.create 0 0 Xb.Op.Debug (data_concat commands))
 
 let watch path data con =
        let data = data_concat [ path; data; ] in
-       Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Watch data)
+       queue con (Xb.Packet.create 0 0 Xb.Op.Watch data)
 
 let unwatch path data con =
        let data = data_concat [ path; data; ] in
-       Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Unwatch data)
+       queue con (Xb.Packet.create 0 0 Xb.Op.Unwatch data)
 
 let transaction_start con =
-       Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Transaction_start (data_concat []))
+       queue con (Xb.Packet.create 0 0 Xb.Op.Transaction_start (data_concat []))
 
 let transaction_end tid commit con =
        let data = data_concat [ (if commit then "T" else "F"); ] in
-       Xb.queue con (Xb.Packet.create tid 0 Xb.Op.Transaction_end data)
+       queue con (Xb.Packet.create tid 0 Xb.Op.Transaction_end data)
 
 let introduce domid mfn port con =
        let data = data_concat [ Printf.sprintf "%u" domid;
                                 Printf.sprintf "%nu" mfn;
                                 string_of_int port; ] in
-       Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Introduce data)
+       queue con (Xb.Packet.create 0 0 Xb.Op.Introduce data)
 
 let release domid con =
        let data = data_concat [ Printf.sprintf "%u" domid; ] in
-       Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Release data)
+       queue con (Xb.Packet.create 0 0 Xb.Op.Release data)
 
 let resume domid con =
        let data = data_concat [ Printf.sprintf "%u" domid; ] in
-       Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Resume data)
+       queue con (Xb.Packet.create 0 0 Xb.Op.Resume data)
 
 let getdomainpath domid con =
        let data = data_concat [ Printf.sprintf "%u" domid; ] in
-       Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Getdomainpath data)
+       queue con (Xb.Packet.create 0 0 Xb.Op.Getdomainpath data)
 
 let write tid path value con =
        let data = path ^ "\000" ^ value (* no NULL at the end *) in
-       Xb.queue con (Xb.Packet.create tid 0 Xb.Op.Write data)
+       queue con (Xb.Packet.create tid 0 Xb.Op.Write data)
 
 let mkdir tid path con = queue_path Xb.Op.Mkdir tid path con
 let rm tid path con = queue_path Xb.Op.Rm tid path con
 
 let setperms tid path perms con =
        let data = data_concat [ path; perms ] in
-       Xb.queue con (Xb.Packet.create tid 0 Xb.Op.Setperms data)
+       queue con (Xb.Packet.create tid 0 Xb.Op.Setperms data)
index 451f8b38dbcc18d65ec9ef0373ef0bbbaecc590f..cbd17280600c1cde207901c3936ce710ca145e7b 100644 (file)
@@ -36,8 +36,10 @@ type con = {
 let close con =
        Xb.close con.xb
 
+let capacity = { Xb.maxoutstanding = 1; maxwatchevents = 0; }
+
 let open_fd fd = {
-       xb = Xb.open_fd fd;
+       xb = Xb.open_fd ~capacity fd;
        watchevents = Queue.create ();
 }
 
index ace2aa5b4f53455b7e96cd02dad962672bd0a042..9aad451a2dbd758dea3645240faf138f6fd120e0 100644 (file)
@@ -20,12 +20,84 @@ open Stdext
 
 let xenstore_payload_max = 4096 (* xen/include/public/io/xs_wire.h *)
 
+type 'a bounded_sender = 'a -> unit option
+(** a bounded sender accepts an ['a] item and returns:
+    None - if there is no room to accept the item
+    Some () -  if it has successfully accepted/sent the item
+ *)
+
+module BoundedPipe : sig
+       type 'a t
+
+       (** [create ~capacity ~destination] creates a bounded pipe with a
+           local buffer holding at most [capacity] items.  Once the buffer is
+           full it will not accept further items.  items from the pipe are
+           flushed into [destination] as long as it accepts items.  The
+           destination could be another pipe.
+        *)
+       val create: capacity:int -> destination:'a bounded_sender -> 'a t
+
+       (** [is_empty t] returns whether the local buffer of [t] is empty. *)
+       val is_empty : _ t -> bool
+
+       (** [length t] the number of items in the internal buffer *)
+       val length: _ t -> int
+
+       (** [flush_pipe t] sends as many items from the local buffer as possible,
+                       which could be none. *)
+       val flush_pipe: _ t -> unit
+
+       (** [push t item] tries to [flush_pipe] and then push [item]
+           into the pipe if its [capacity] allows.
+           Returns [None] if there is no more room
+        *)
+       val push : 'a t -> 'a bounded_sender
+end = struct
+       (* items are enqueued in [q], and then flushed to [connect_to] *)
+       type 'a t =
+               { q: 'a Queue.t
+               ; destination: 'a bounded_sender
+               ; capacity: int
+               }
+
+       let create ~capacity ~destination =
+               { q = Queue.create (); capacity; destination }
+
+       let rec flush_pipe t =
+               if not Queue.(is_empty t.q) then
+                       let item = Queue.peek t.q in
+                       match t.destination item with
+                       | None -> () (* no room *)
+                       | Some () ->
+                               (* successfully sent item to next stage *)
+                               let _ = Queue.pop t.q in
+                               (* continue trying to send more items *)
+                               flush_pipe t
+
+       let push t item =
+               (* first try to flush as many items from this pipe as possible to make room,
+                  it is important to do this first to preserve the order of the items
+                *)
+               flush_pipe t;
+               if Queue.length t.q < t.capacity then begin
+                       (* enqueue, instead of sending directly.
+                          this ensures that [out] sees the items in the same order as we receive them
+                        *)
+                       Queue.push item t.q;
+                       Some (flush_pipe t)
+               end else None
+
+       let is_empty t = Queue.is_empty t.q
+       let length t = Queue.length t.q
+end
+
 type watch = {
        con: t;
        token: string;
        path: string;
        base: string;
        is_relative: bool;
+       pending_watchevents: Xenbus.Xb.Packet.t BoundedPipe.t;
 }
 
 and t = {
@@ -38,8 +110,36 @@ and t = {
        anonid: int;
        mutable stat_nb_ops: int;
        mutable perm: Perms.Connection.t;
+       pending_source_watchevents: (watch * Xenbus.Xb.Packet.t) BoundedPipe.t
 }
 
+module Watch = struct
+       module T = struct
+               type t = watch
+
+               let compare w1 w2 =
+                       (* cannot compare watches from different connections *)
+                       assert (w1.con == w2.con);
+                       match String.compare w1.token w2.token with
+                       | 0 -> String.compare w1.path w2.path
+                       | n -> n
+       end
+       module Set = Set.Make(T)
+
+       let flush_events t =
+               BoundedPipe.flush_pipe t.pending_watchevents;
+               not (BoundedPipe.is_empty t.pending_watchevents)
+
+       let pending_watchevents t =
+               BoundedPipe.length t.pending_watchevents
+end
+
+let source_flush_watchevents t =
+       BoundedPipe.flush_pipe t.pending_source_watchevents
+
+let source_pending_watchevents t =
+       BoundedPipe.length t.pending_source_watchevents
+
 let mark_as_bad con =
        match con.dom with
        |None -> ()
@@ -67,7 +167,8 @@ let watch_create ~con ~path ~token = {
        token = token;
        path = path;
        base = get_path con;
-       is_relative = path.[0] <> '/' && path.[0] <> '@'
+       is_relative = path.[0] <> '/' && path.[0] <> '@';
+       pending_watchevents = BoundedPipe.create ~capacity:!Define.maxwatchevents ~destination:(Xenbus.Xb.queue con.xb)
 }
 
 let get_con w = w.con
@@ -93,6 +194,9 @@ let make_perm dom =
        Perms.Connection.create ~perms:[Perms.READ; Perms.WRITE] domid
 
 let create xbcon dom =
+       let destination (watch, pkt) =
+               BoundedPipe.push watch.pending_watchevents pkt
+       in
        let id =
                match dom with
                | None -> let old = !anon_id_next in incr anon_id_next; old
@@ -109,6 +213,16 @@ let create xbcon dom =
        anonid = id;
        stat_nb_ops = 0;
        perm = make_perm dom;
+
+       (* the actual capacity will be lower, this is used as an overflow
+          buffer: anything that doesn't fit elsewhere gets put here, only
+          limited by the amount of watches that you can generate with a
+          single xenstore command (which is finite, although possibly very
+          large in theory for Dom0).  Once the pipe here has any contents the
+          domain is blocked from sending more commands until it is empty
+          again though.
+        *)
+       pending_source_watchevents = BoundedPipe.create ~capacity:Sys.max_array_length ~destination
        }
        in
        Logging.new_connection ~tid:Transaction.none ~con:(get_domstr con);
@@ -127,11 +241,17 @@ let set_target con target_domid =
 
 let is_backend_mmap con = Xenbus.Xb.is_mmap con.xb
 
-let send_reply con tid rid ty data =
+let packet_of con tid rid ty data =
        if (String.length data) > xenstore_payload_max && (is_backend_mmap con) then
-               Xenbus.Xb.queue con.xb (Xenbus.Xb.Packet.create tid rid Xenbus.Xb.Op.Error "E2BIG\000")
+               Xenbus.Xb.Packet.create tid rid Xenbus.Xb.Op.Error "E2BIG\000"
        else
-               Xenbus.Xb.queue con.xb (Xenbus.Xb.Packet.create tid rid ty data)
+               Xenbus.Xb.Packet.create tid rid ty data
+
+let send_reply con tid rid ty data =
+       let result = Xenbus.Xb.queue con.xb (packet_of con tid rid ty data) in
+       (* should never happen: we only process an input packet when there is room for an output packet *)
+       (* and the limit for replies is different from the limit for watch events *)
+       assert (result <> None)
 
 let send_error con tid rid err = send_reply con tid rid Xenbus.Xb.Op.Error (err ^ "\000")
 let send_ack con tid rid ty = send_reply con tid rid ty "OK\000"
@@ -181,11 +301,11 @@ let del_watch con path token =
        apath, w
 
 let del_watches con =
-  Hashtbl.clear con.watches;
+  Hashtbl.reset con.watches;
   con.nb_watches <- 0
 
 let del_transactions con =
-  Hashtbl.clear con.transactions
+  Hashtbl.reset con.transactions
 
 let list_watches con =
        let ll = Hashtbl.fold
@@ -208,21 +328,29 @@ let lookup_watch_perm path = function
 let lookup_watch_perms oldroot root path =
        lookup_watch_perm path oldroot @ lookup_watch_perm path (Some root)
 
-let fire_single_watch_unchecked watch =
+let fire_single_watch_unchecked source watch =
        let data = Utils.join_by_null [watch.path; watch.token; ""] in
-       send_reply watch.con Transaction.none 0 Xenbus.Xb.Op.Watchevent data
+       let pkt = packet_of watch.con Transaction.none 0 Xenbus.Xb.Op.Watchevent data in
+
+       match BoundedPipe.push source.pending_source_watchevents (watch, pkt) with
+       | Some () -> () (* packet queued *)
+       | None ->
+                       (* a well behaved Dom0 shouldn't be able to trigger this,
+                          if it happens it is likely a Dom0 bug causing runaway memory usage
+                        *)
+                       failwith "watch event overflow, cannot happen"
 
-let fire_single_watch (oldroot, root) watch =
+let fire_single_watch source (oldroot, root) watch =
        let abspath = get_watch_path watch.con watch.path |> Store.Path.of_string in
        let perms = lookup_watch_perms oldroot root abspath in
        if Perms.can_fire_watch watch.con.perm perms then
-               fire_single_watch_unchecked watch
+               fire_single_watch_unchecked source watch
        else
                let perms = perms |> List.map (Perms.Node.to_string ~sep:" ") |> String.concat ", " in
                let con = get_domstr watch.con in
                Logging.watch_not_fired ~con perms (Store.Path.to_string abspath)
 
-let fire_watch roots watch path =
+let fire_watch source roots watch path =
        let new_path =
                if watch.is_relative && path.[0] = '/'
                then begin
@@ -232,7 +360,7 @@ let fire_watch roots watch path =
                end else
                        path
        in
-       fire_single_watch roots { watch with path = new_path }
+       fire_single_watch source roots { watch with path = new_path }
 
 (* Search for a valid unused transaction id. *)
 let rec valid_transaction_id con proposed_id =
@@ -279,6 +407,7 @@ let get_transaction con tid =
 let do_input con = Xenbus.Xb.input con.xb
 let has_more_input con = Xenbus.Xb.has_more_input con.xb
 
+let can_input con = Xenbus.Xb.can_input con.xb && BoundedPipe.is_empty con.pending_source_watchevents
 let has_output con = Xenbus.Xb.has_output con.xb
 let has_old_output con = Xenbus.Xb.has_old_output con.xb
 let has_new_output con = Xenbus.Xb.has_new_output con.xb
@@ -286,7 +415,7 @@ let peek_output con = Xenbus.Xb.peek_output con.xb
 let do_output con = Xenbus.Xb.output con.xb
 
 let has_more_work con =
-       has_more_input con || not (has_old_output con) && has_new_output con
+       (has_more_input con && can_input con) || not (has_old_output con) && has_new_output con
 
 let incr_ops con = con.stat_nb_ops <- con.stat_nb_ops + 1
 
index 7efdf3e5e05e14fcdcf04c278bc2ee5f6e5a0195..39190c19ec5817b057a0e99be4d69bdca4b1b78b 100644 (file)
@@ -22,22 +22,30 @@ type t = {
        domains: (int, Connection.t) Hashtbl.t;
        ports: (Xeneventchn.t, Connection.t) Hashtbl.t;
        mutable watches: (string, Connection.watch list) Trie.t;
+       mutable has_pending_watchevents: Connection.Watch.Set.t
 }
 
 let create () = {
        anonymous = Hashtbl.create 37;
        domains = Hashtbl.create 37;
        ports = Hashtbl.create 37;
-       watches = Trie.create ()
+       watches = Trie.create ();
+       has_pending_watchevents = Connection.Watch.Set.empty;
 }
 
+let get_capacity () =
+       (* not multiplied by maxwatch on purpose: 2nd queue in watch itself! *)
+       { Xenbus.Xb.maxoutstanding = !Define.maxoutstanding; maxwatchevents = !Define.maxwatchevents }
+
 let add_anonymous cons fd _can_write =
-       let xbcon = Xenbus.Xb.open_fd fd in
+       let capacity = get_capacity () in
+       let xbcon = Xenbus.Xb.open_fd fd ~capacity in
        let con = Connection.create xbcon None in
        Hashtbl.add cons.anonymous (Xenbus.Xb.get_fd xbcon) con
 
 let add_domain cons dom =
-       let xbcon = Xenbus.Xb.open_mmap (Domain.get_interface dom) (fun () -> Domain.notify dom) in
+       let capacity = get_capacity () in
+       let xbcon = Xenbus.Xb.open_mmap ~capacity (Domain.get_interface dom) (fun () -> Domain.notify dom) in
        let con = Connection.create xbcon (Some dom) in
        Hashtbl.add cons.domains (Domain.get_id dom) con;
        match Domain.get_port dom with
@@ -48,7 +56,9 @@ let select ?(only_if = (fun _ -> true)) cons =
        Hashtbl.fold (fun _ con (ins, outs) ->
                if (only_if con) then (
                        let fd = Connection.get_fd con in
-                       (fd :: ins,  if Connection.has_output con then fd :: outs else outs)
+                       let in_fds = if Connection.can_input con then fd :: ins else ins in
+                       let out_fds = if Connection.has_output con then fd :: outs else outs in
+                       in_fds, out_fds
                ) else (ins, outs)
        )
        cons.anonymous ([], [])
@@ -67,10 +77,17 @@ let del_watches_of_con con watches =
        | [] -> None
        | ws -> Some ws
 
+let del_watches cons con =
+       Connection.del_watches con;
+       cons.watches <- Trie.map (del_watches_of_con con) cons.watches;
+       cons.has_pending_watchevents <-
+               cons.has_pending_watchevents |> Connection.Watch.Set.filter @@ fun w ->
+               Connection.get_con w != con
+
 let del_anonymous cons con =
        try
                Hashtbl.remove cons.anonymous (Connection.get_fd con);
-               cons.watches <- Trie.map (del_watches_of_con con) cons.watches;
+               del_watches cons con;
                Connection.close con
        with exn ->
                debug "del anonymous %s" (Printexc.to_string exn)
@@ -85,7 +102,7 @@ let del_domain cons id =
                    | Some p -> Hashtbl.remove cons.ports p
                    | None -> ())
                 | None -> ());
-               cons.watches <- Trie.map (del_watches_of_con con) cons.watches;
+               del_watches cons con;
                Connection.close con
        with exn ->
                debug "del domain %u: %s" id (Printexc.to_string exn)
@@ -136,31 +153,33 @@ let del_watch cons con path token =
                cons.watches <- Trie.set cons.watches key watches;
        watch
 
-let del_watches cons con =
-       Connection.del_watches con;
-       cons.watches <- Trie.map (del_watches_of_con con) cons.watches
-
 (* path is absolute *)
-let fire_watches ?oldroot root cons path recurse =
+let fire_watches ?oldroot source root cons path recurse =
        let key = key_of_path path in
        let path = Store.Path.to_string path in
        let roots = oldroot, root in
        let fire_watch _ = function
                | None         -> ()
-               | Some watches -> List.iter (fun w -> Connection.fire_watch roots w path) watches
+               | Some watches -> List.iter (fun w -> Connection.fire_watch source roots w path) watches
        in
        let fire_rec _x = function
                | None         -> ()
                | Some watches ->
-                       List.iter (Connection.fire_single_watch roots) watches
+                       List.iter (Connection.fire_single_watch source roots) watches
        in
        Trie.iter_path fire_watch cons.watches key;
        if recurse then
                Trie.iter fire_rec (Trie.sub cons.watches key)
 
+let send_watchevents cons con =
+       cons.has_pending_watchevents <-
+               cons.has_pending_watchevents |> Connection.Watch.Set.filter Connection.Watch.flush_events;
+       Connection.source_flush_watchevents con
+
 let fire_spec_watches root cons specpath =
+       let source = find_domain cons 0 in
        iter cons (fun con ->
-               List.iter (Connection.fire_single_watch (None, root)) (Connection.get_watches con specpath))
+               List.iter (Connection.fire_single_watch source (None, root)) (Connection.get_watches con specpath))
 
 let set_target cons domain target_domain =
        let con = find_domain cons domain in
@@ -196,3 +215,13 @@ let debug cons =
        let anonymous = Hashtbl.fold (fun _ con accu -> Connection.debug con :: accu) cons.anonymous [] in
        let domains = Hashtbl.fold (fun _ con accu -> Connection.debug con :: accu) cons.domains [] in
        String.concat "" (domains @ anonymous)
+
+let debug_watchevents cons con =
+       (* == (physical equality)
+          has to be used here because w.con.xb.backend might contain a [unit->unit] value causing regular
+          comparison to fail due to having a 'functional value' which cannot be compared.
+        *)
+       let s = cons.has_pending_watchevents |> Connection.Watch.Set.filter (fun w -> w.con == con) in
+       let pending = s |> Connection.Watch.Set.elements
+               |> List.map (fun w -> Connection.Watch.pending_watchevents w) |> List.fold_left (+) 0 in
+       Printf.sprintf "Watches with pending events: %d, pending events total: %d" (Connection.Watch.Set.cardinal s) pending
index 1a5d2f34a67829a8517c484dd31f9ac0481d9e17..9e52367094749fa4ff32dc21ee689e8cd6cf8a11 100644 (file)
@@ -25,6 +25,13 @@ let default_config_dir = Paths.xen_config_dir
 let maxwatch = ref (100)
 let maxtransaction = ref (10)
 let maxrequests = ref (1024)   (* maximum requests per transaction *)
+let maxoutstanding = ref (1024) (* maximum outstanding requests, i.e. in-flight requests / domain *)
+let maxwatchevents = ref (1024)
+(*
+       maximum outstanding watch events per watch,
+       recommended >= maxoutstanding to avoid blocking backend transactions due to
+       malicious frontends
+ *)
 
 let gc_max_overhead = ref 120 (* 120% see comment in xenstored.ml *)
 let conflict_burst_limit = ref 5.0
index 4ae48e42d47da1de519e0fef192d0ea022921208..9d034e744b4b032896546b8dbd0244178d082aa4 100644 (file)
@@ -62,6 +62,8 @@ quota-maxwatch = 100
 quota-transaction = 10
 quota-maxrequests = 1024
 quota-path-max = 1024
+quota-maxoutstanding = 1024
+quota-maxwatchevents = 1024
 
 # Activate filed base backend
 persistent = false
index 72629ee38b398b4fa6d8bc8855a25ded0a246890..d2a3ba064ed55e019003d2726faad1b6c0b463dc 100644 (file)
@@ -56,7 +56,7 @@ let split_one_path data con =
        | path :: "" :: [] -> Store.Path.create path (Connection.get_path con)
        | _                -> raise Invalid_Cmd_Args
 
-let process_watch t cons =
+let process_watch source t cons =
        let oldroot = t.Transaction.oldroot in
        let newroot = Store.get_root t.store in
        let ops = Transaction.get_paths t |> List.rev in
@@ -66,8 +66,9 @@ let process_watch t cons =
                | Xenbus.Xb.Op.Rm       -> true, None, oldroot
                | Xenbus.Xb.Op.Setperms -> false, Some oldroot, newroot
                | _              -> raise (Failure "huh ?") in
-               Connections.fire_watches ?oldroot root cons (snd op) recurse in
-       List.iter (fun op -> do_op_watch op cons) ops
+               Connections.fire_watches ?oldroot source root cons (snd op) recurse in
+       List.iter (fun op -> do_op_watch op cons) ops;
+       Connections.send_watchevents cons source
 
 let create_implicit_path t perm path =
        let dirname = Store.Path.get_parent path in
@@ -99,6 +100,20 @@ let do_debug con t _domains cons data =
        | "watches" :: _ ->
                let watches = Connections.debug cons in
                Some (watches ^ "\000")
+       | "xenbus" :: domid :: _ ->
+               let domid = int_of_string domid in
+               let con = Connections.find_domain cons domid in
+               let s = Printf.sprintf "xenbus: %s; overflow queue length: %d, can_input: %b, has_more_input: %b, has_old_output: %b, has_new_output: %b, has_more_work: %b. pending: %s"
+                       (Xenbus.Xb.debug con.xb)
+                       (Connection.source_pending_watchevents con)
+                       (Connection.can_input con)
+                       (Connection.has_more_input con)
+                       (Connection.has_old_output con)
+                       (Connection.has_new_output con)
+                       (Connection.has_more_work con)
+                       (Connections.debug_watchevents cons con)
+               in
+               Some s
        | "mfn" :: domid :: _ ->
                let domid = int_of_string domid in
                let con = Connections.find_domain cons domid in
@@ -207,7 +222,7 @@ let reply_ack fct con t doms cons data =
        fct con t doms cons data;
        Packet.Ack (fun () ->
                if Transaction.get_id t = Transaction.none then
-                       process_watch t cons
+                       process_watch con t cons
        )
 
 let reply_data fct con t doms cons data =
@@ -366,7 +381,7 @@ let do_watch con t _domains cons data =
        Packet.Ack (fun () ->
                (* xenstore.txt says this watch is fired immediately,
                   implying even if path doesn't exist or is unreadable *)
-               Connection.fire_single_watch_unchecked watch)
+               Connection.fire_single_watch_unchecked con watch)
 
 let do_unwatch con _t _domains cons data =
        let (node, token) =
@@ -397,7 +412,7 @@ let do_transaction_end con t domains cons data =
        if not success then
                raise Transaction_again;
        if commit then begin
-               process_watch t cons;
+               process_watch con t cons;
                match t.Transaction.ty with
                | Transaction.No ->
                        () (* no need to record anything *)
@@ -565,7 +580,8 @@ let process_packet ~store ~cons ~doms ~con ~req =
 let do_input store cons doms con =
        let newpacket =
                try
-                       Connection.do_input con
+                       if Connection.can_input con then Connection.do_input con
+                       else None
                with Xenbus.Xb.Reconnect ->
                        info "%s requests a reconnect" (Connection.get_domstr con);
                        History.reconnect con;
@@ -593,6 +609,7 @@ let do_input store cons doms con =
                Connection.incr_ops con
 
 let do_output _store _cons _doms con =
+       Connection.source_flush_watchevents con;
        if Connection.has_output con then (
                if Connection.has_new_output con then (
                        let packet = Connection.peek_output con in
index 0b6343dfc789175c1db8d0f0ffd1105cd0704c63..4f8fab2dd13a526a264c0d50ae4c8f43b1ab4b43 100644 (file)
@@ -102,6 +102,8 @@ let parse_config filename =
                ("quota-maxentity", Config.Set_int Quota.maxent);
                ("quota-maxsize", Config.Set_int Quota.maxsize);
                ("quota-maxrequests", Config.Set_int Define.maxrequests);
+               ("quota-maxoutstanding", Config.Set_int Define.maxoutstanding);
+               ("quota-maxwatchevents", Config.Set_int Define.maxwatchevents);
                ("quota-path-max", Config.Set_int Define.path_max);
                ("gc-max-overhead", Config.Set_int Define.gc_max_overhead);
                ("test-eagain", Config.Set_bool Transaction.test_eagain);