]> xenbits.xensource.com Git - people/dstodden/blktap.git/commitdiff
PR-1129: Rate limiting intro.
authorDaniel Stodden <daniel.stodden@citrix.com>
Tue, 15 Feb 2011 09:37:45 +0000 (01:37 -0800)
committerDaniel Stodden <daniel.stodden@citrix.com>
Tue, 15 Feb 2011 09:37:45 +0000 (01:37 -0800)
Simple I/O rate limiting intro. Bridging resource utilization across a
group of independent tapdisks.

Comprising:

 - block-valve: Top-level filter issuing bandwith requests, deferring
   I/O to acknowldgement. Usage: valve:/path/to/sock/un

 - td-rated: Stand alone bridge process, listing to bandwidth
   requests, typically from valve:/ instances. Includes a plugin
   interface for various rate limiting algorithms.

Algorithms (yet slightly experimental):

 - "Token Bucket". A classic, with some trivial modifications to
   promote batching.

 - "Meminfo". Watching /proc/meminfo for pagecache congestion.

Signed-off-by: Daniel Stodden <daniel.stodden@citrix.com>
.hgignore
drivers/Makefile
drivers/block-valve.c [new file with mode: 0644]
drivers/block-valve.h [new file with mode: 0644]
drivers/tapdisk-disktype.c
drivers/tapdisk-disktype.h
drivers/td-rated.c [new file with mode: 0644]
mk/blktap.spec.in

index 36f08e5ab73639dde023f1dd2660a3fab20d202b..fb47d5340419bc22f6a1bf78d01c8579498af8bd 100644 (file)
--- a/.hgignore
+++ b/.hgignore
@@ -21,6 +21,7 @@
 ^drivers/tapdisk-client$
 ^drivers/tapdisk-diff$
 ^drivers/tapdisk-stream$
+^drivers/td-rated$
 ^vhd/vhd-index$
 ^vhd/vhd-update$
 ^vhd/vhd-util$
index 9c68fe4db8392d34441add3d9869d323ce13b373..f37639f3c16980a7e880d5f79acc133e970c23a9 100644 (file)
@@ -3,7 +3,7 @@ include $(BLKTAP_ROOT)/Rules.mk
 
 LIBVHDDIR  = $(BLKTAP_ROOT)/vhd/lib
 
-IBIN       = tapdisk2 td-util tapdisk-stream tapdisk-diff
+IBIN       = tapdisk2 td-util td-rated tapdisk-stream tapdisk-diff
 LOCK_UTIL  = lock-util
 INST_DIR   = /usr/sbin
 
@@ -54,6 +54,7 @@ BLK-OBJS  := block-aio.o
 BLK-OBJS  += block-ram.o
 BLK-OBJS  += block-cache.o
 BLK-OBJS  += block-vhd.o
+BLK-OBJS  += block-valve.o
 BLK-OBJS  += block-vindex.o
 BLK-OBJS  += block-lcache.o
 
diff --git a/drivers/block-valve.c b/drivers/block-valve.c
new file mode 100644 (file)
index 0000000..9c0ed74
--- /dev/null
@@ -0,0 +1,692 @@
+/*
+ * Copyright (c) 2010, Citrix Systems, Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in the
+ *       documentation and/or other materials provided with the distribution.
+ *     * Neither the name of XenSource Inc. nor the names of its contributors
+ *       may be used to endorse or promote products derived from this software
+ *       without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+
+#include "tapdisk.h"
+#include "tapdisk-driver.h"
+#include "tapdisk-server.h"
+#include "tapdisk-interface.h"
+
+#include "block-valve.h"
+
+typedef struct td_valve td_valve_t;
+typedef struct td_valve_request td_valve_request_t;
+
+struct td_valve_request {
+       td_request_t            treq;
+       int                     secs;
+
+       struct list_head        entry;
+       td_valve_t             *valve;
+};
+
+struct td_valve_stats {
+       unsigned long long      stor;
+       unsigned long long      forw;
+};
+
+struct td_valve {
+       char                   *brname;
+       unsigned long           flags;
+
+       int                     sock;
+       event_id_t              sock_id;
+
+       event_id_t              sched_id;
+       event_id_t              retry_id;
+
+       unsigned int            cred;
+       unsigned int            need;
+       unsigned int            done;
+
+       struct list_head        stor;
+       struct list_head        forw;
+
+       td_valve_request_t      reqv[MAX_REQUESTS];
+       td_valve_request_t     *free[MAX_REQUESTS];
+       int                     n_free;
+
+       struct td_valve_stats   stats;
+};
+
+#define td_valve_for_each_stored_request(_req, _next, _valve)          \
+       list_for_each_entry_safe(_req, _next, &(_valve)->stor, entry)
+
+#define td_valve_for_each_forwarded_request(_req, _next, _valve)       \
+       list_for_each_entry_safe(_req, _next, &(_valve)->forw, entry)
+
+#define TD_VALVE_CONNECT_INTERVAL 2 /* s */
+
+#define TD_VALVE_RDLIMIT  (1<<0)
+#define TD_VALVE_WRLIMIT  (1<<1)
+#define TD_VALVE_KILLED   (1<<31)
+
+static void valve_schedule_retry(td_valve_t *);
+static void valve_conn_receive(td_valve_t *);
+static void valve_conn_request(td_valve_t *, unsigned long);
+static void valve_forward_stored_requests(td_valve_t *);
+static void valve_kill(td_valve_t *);
+
+#define DBG(_f, _a...)    if (1) { tlog_syslog(TLOG_DBG, _f, ##_a); }
+#define INFO(_f, _a...)   tlog_syslog(TLOG_INFO, "valve: " _f, ##_a)
+#define WARN(_f, _a...)   tlog_syslog(TLOG_WARN, "WARNING: "_f " in %s:%d", \
+                                     ##_a, __func__, __LINE__)
+#define ERR(_f, _a...)    tlog_syslog(TLOG_WARN, "ERROR: " _f " in %s:%d", \
+                                     ##_a, __func__, __LINE__)
+#define VERR(_err, _f, _a...) tlog_syslog(TLOG_WARN,                    \
+                                         "ERROR: err=%d (%s), " _f ".", \
+                                         _err, strerror(-(_err)), ##_a)
+#undef  PERROR
+#define PERROR(_f, _a...) VERR(-errno, _f, ##_a)
+
+#define BUG() do {                                             \
+               ERR("Aborting");                                \
+               td_panic();                                     \
+       } while (0)
+
+#define BUG_ON(_cond)                                          \
+       if (unlikely(_cond)) {                                  \
+               ERR("(%s) = %ld", #_cond, (long)(_cond));       \
+               BUG();                                          \
+       }
+
+#define WARN_ON(_cond) ({                                      \
+       int __cond = _cond;                                     \
+       if (unlikely(__cond))                                   \
+               WARN("(%s) = %ld", #_cond, (long)(_cond));      \
+       __cond;                                         \
+})
+
+#define ARRAY_SIZE(_a)   (sizeof(_a)/sizeof((_a)[0]))
+#define TREQ_SIZE(_treq) ((unsigned int)(_treq.secs) << 9)
+
+static td_valve_request_t *
+valve_alloc_request(td_valve_t *valve)
+{
+       td_valve_request_t *req = NULL;
+
+       if (valve->n_free)
+               req = valve->free[--valve->n_free];
+
+       return req;
+}
+
+static void
+valve_free_request(td_valve_t *valve, td_valve_request_t *req)
+{
+       BUG_ON(valve->n_free >= ARRAY_SIZE(valve->free));
+       list_del_init(&req->entry);
+       valve->free[valve->n_free++] = req;
+}
+
+static void
+__valve_sock_event(event_id_t id, char mode, void *private)
+{
+       td_valve_t *valve = private;
+
+       valve_conn_receive(valve);
+
+       valve_forward_stored_requests(valve);
+}
+
+static void
+valve_set_done_pending(td_valve_t *valve)
+{
+       WARN_ON(valve->done == 0);
+       tapdisk_server_mask_event(valve->sched_id, 0);
+}
+
+static void
+valve_clear_done_pending(td_valve_t *valve)
+{
+       WARN_ON(valve->done != 0);
+       tapdisk_server_mask_event(valve->sched_id, 1);
+}
+
+static void
+__valve_sched_event(event_id_t id, char mode, void *private)
+{
+       td_valve_t *valve = private;
+
+       if (likely(valve->done > 0))
+               /* flush valve->done */
+               valve_conn_request(valve, 0);
+}
+
+static void
+valve_sock_close(td_valve_t *valve)
+{
+       if (valve->sock >= 0) {
+               close(valve->sock);
+               valve->sock = -1;
+       }
+
+       if (valve->sock_id >= 0) {
+               tapdisk_server_unregister_event(valve->sock_id);
+               valve->sock_id = -1;
+       }
+
+       if (valve->sched_id >= 0) {
+               tapdisk_server_unregister_event(valve->sched_id);
+               valve->sched_id = -1;
+       }
+}
+
+static int
+valve_sock_open(td_valve_t *valve)
+{
+       struct sockaddr_un addr = { .sun_family = AF_UNIX };
+       int s, id, err;
+
+       s = socket(AF_UNIX, SOCK_STREAM, 0);
+       if (s < 0) {
+               PERROR("socket");
+               err = -errno;
+               goto fail;
+       }
+
+       valve->sock = s;
+
+       if (valve->brname[0] == '/')
+               snprintf(addr.sun_path, sizeof(addr.sun_path),
+                        valve->brname);
+       else
+               snprintf(addr.sun_path, sizeof(addr.sun_path),
+                        "%s/%s", TD_VALVE_SOCKDIR, valve->brname);
+
+       err = connect(valve->sock, &addr, sizeof(addr));
+       if (err) {
+               err = -errno;
+               goto fail;
+       }
+
+       id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
+                                          valve->sock, 0,
+                                          __valve_sock_event,
+                                          valve);
+       if (id < 0) {
+               err = id;
+               goto fail;
+       }
+
+       valve->sock_id = id;
+
+       id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT,
+                                          -1, 0,
+                                          __valve_sched_event,
+                                          valve);
+       if (id < 0) {
+               err = id;
+               goto fail;
+       }
+
+       valve->sched_id = id;
+
+       INFO("Connected to %s", addr.sun_path);
+
+       valve->cred = 0;
+       valve->need = 0;
+       valve->done = 0;
+
+       valve_clear_done_pending(valve);
+
+       return 0;
+
+fail:
+       valve_sock_close(valve);
+       return err;
+}
+
+static int
+valve_sock_send(td_valve_t *valve, const void *msg, size_t size)
+{
+       ssize_t n;
+
+       n = send(valve->sock, msg, size, MSG_DONTWAIT);
+       if (n < 0)
+               return -errno;
+       if (n != size)
+               return -EPROTO;
+
+       return 0;
+}
+
+static int
+valve_sock_recv(td_valve_t *valve, void *msg, size_t size)
+{
+       ssize_t n;
+
+       n = recv(valve->sock, msg, size, MSG_DONTWAIT);
+       if (n < 0)
+               return -errno;
+
+       return n;
+}
+
+static void
+__valve_retry_timeout(event_id_t id, char mode, void *private)
+{
+       td_valve_t *valve = private;
+       int err;
+
+       err = valve_sock_open(valve);
+       if (!err)
+               tapdisk_server_unregister_event(valve->retry_id);
+}
+
+static void
+valve_schedule_retry(td_valve_t *valve)
+{
+       int id;
+
+       BUG_ON(valve->sock_id >= 0);
+
+       id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT,
+                                          -1, TD_VALVE_CONNECT_INTERVAL,
+                                          __valve_retry_timeout,
+                                          valve);
+       BUG_ON(id < 0);
+
+       valve->retry_id = id;
+}
+
+static void
+valve_conn_open(td_valve_t *valve)
+{
+       int err;
+
+       BUG_ON(valve->flags & TD_VALVE_KILLED);
+
+       err = valve_sock_open(valve);
+       if (err) {
+               WARN("%s: %s", valve->brname, strerror(-err));
+               valve_schedule_retry(valve);
+       }
+}
+
+static void
+valve_conn_close(td_valve_t *valve, int reset)
+{
+       td_valve_request_t *req, *next;
+
+       valve_sock_close(valve);
+
+       if (reset)
+               td_valve_for_each_stored_request(req, next, valve) {
+                       td_forward_request(req->treq);
+                       valve->stats.forw++;
+                       valve_free_request(valve, req);
+               }
+
+       WARN_ON(!list_empty(&valve->stor));
+}
+
+static void
+valve_conn_reset(td_valve_t *valve)
+{
+       valve_conn_close(valve, 1);
+       valve_conn_open(valve);
+}
+
+void
+valve_conn_receive(td_valve_t *valve)
+{
+       unsigned long buf[32], cred = 0;
+       ssize_t n;
+       int i, err;
+
+       n = valve_sock_recv(valve, buf, sizeof(buf));
+       if (!n) {
+               err = -ECONNRESET;
+               goto reset;
+       }
+
+       if (n < 0) {
+               err = n;
+               if (err != -EAGAIN)
+                       goto reset;
+       }
+
+       for (i = 0; i < n / sizeof(buf[0]); i++) {
+               err = WARN_ON(buf[i] >= TD_RLB_REQUEST_MAX);
+               if (err)
+                       goto kill;
+
+               cred += buf[i];
+       }
+
+       if (cred > valve->need) {
+               err = -EINVAL;
+               goto reset;
+       }
+
+       valve->cred += cred;
+       valve->need -= cred;
+
+       return;
+
+reset:
+       VERR(err, "resetting connection");
+       valve_conn_reset(valve);
+       return;
+
+kill:
+       ERR("Killing valve.");
+       valve_kill(valve);
+}
+
+static void
+valve_conn_request(td_valve_t *valve, unsigned long size)
+{
+       struct td_valve_req _req;
+       int err;
+
+       _req.need    = size;
+       _req.done    = valve->done;
+
+       valve->need += size;
+       valve->done  = 0;
+
+       valve_clear_done_pending(valve);
+
+       err = valve_sock_send(valve, &_req, sizeof(_req));
+       if (!err)
+               return;
+
+       VERR(err, "resetting connection");
+       valve_conn_reset(valve);
+}
+
+static int
+valve_expend_request(td_valve_t *valve, const td_request_t treq)
+{
+       if (valve->flags & TD_VALVE_KILLED)
+               return 0;
+
+       if (valve->sock < 0)
+               return 0;
+
+       if (valve->cred < TREQ_SIZE(treq))
+               return -EAGAIN;
+
+       valve->cred -= TREQ_SIZE(treq);
+
+       return 0;
+}
+
+static void
+__valve_complete_treq(td_request_t treq, int error)
+{
+       td_valve_request_t *req = treq.cb_data;
+       td_valve_t *valve = req->valve;
+
+       BUG_ON(req->secs < treq.secs);
+       req->secs -= treq.secs;
+
+       valve->done += TREQ_SIZE(treq);
+       valve_set_done_pending(valve);
+
+       if (!req->secs) {
+               td_complete_request(req->treq, error);
+               valve_free_request(valve, req);
+       }
+}
+
+static void
+valve_forward_stored_requests(td_valve_t *valve)
+{
+       td_valve_request_t *req, *next;
+       td_request_t clone;
+       int err;
+
+       td_valve_for_each_stored_request(req, next, valve) {
+
+               err = valve_expend_request(valve, req->treq);
+               if (err)
+                       break;
+
+               clone         = req->treq;
+               clone.cb      = __valve_complete_treq;
+               clone.cb_data = req;
+
+               td_forward_request(clone);
+               valve->stats.forw++;
+
+               list_move(&req->entry, &valve->forw);
+       }
+}
+
+static int
+valve_store_request(td_valve_t *valve, td_request_t treq)
+{
+       td_valve_request_t *req;
+
+       req = valve_alloc_request(valve);
+       if (!req)
+               return -EBUSY;
+
+       valve_conn_request(valve, TREQ_SIZE(treq));
+
+       req->treq = treq;
+       req->secs = treq.secs;
+
+       list_add_tail(&req->entry, &valve->stor);
+       valve->stats.stor++;
+
+       return 0;
+}
+
+static void
+valve_kill(td_valve_t *valve)
+{
+       valve->flags |= TD_VALVE_KILLED;
+       valve_conn_close(valve, 1);
+}
+
+static void
+valve_init(td_valve_t *valve, unsigned long flags)
+{
+       int i;
+
+       memset(valve, 0, sizeof(*valve));
+
+       INIT_LIST_HEAD(&valve->stor);
+       INIT_LIST_HEAD(&valve->forw);
+
+       valve->sock     = -1;
+       valve->sock_id  = -1;
+
+       valve->retry_id = -1;
+       valve->sched_id = -1;
+
+       valve->flags    = flags;
+
+       for (i = ARRAY_SIZE(valve->reqv) - 1; i >= 0; i--) {
+               td_valve_request_t *req = &valve->reqv[i];
+
+               req->valve = valve;
+               INIT_LIST_HEAD(&req->entry);
+
+               valve_free_request(valve, req);
+       }
+}
+
+static int
+td_valve_close(td_driver_t *driver)
+{
+       td_valve_t *valve = driver->data;
+
+       WARN_ON(!list_empty(&valve->stor));
+       WARN_ON(!list_empty(&valve->forw));
+
+       valve_conn_close(valve, 0);
+
+       if (valve->brname) {
+               free(valve->brname);
+               valve->brname = NULL;
+       }
+
+       return 0;
+}
+
+static int
+td_valve_open(td_driver_t *driver,
+             const char *name, td_flag_t flags)
+{
+       td_valve_t *valve = driver->data;
+       int err;
+
+       valve_init(valve, TD_VALVE_WRLIMIT);
+
+       valve->brname = strdup(name);
+       if (!valve->brname) {
+               err = -errno;
+               goto fail;
+       }
+
+       valve_conn_open(valve);
+
+       return 0;
+
+fail:
+       td_valve_close(driver);
+       return err;
+}
+
+static void
+td_valve_queue_request(td_driver_t *driver, td_request_t treq)
+{
+       td_valve_t *valve = driver->data;
+       int err;
+
+       switch (treq.op) {
+
+       case TD_OP_READ:
+               if (valve->flags & TD_VALVE_RDLIMIT)
+                       break;
+
+               goto forward;
+
+       case TD_OP_WRITE:
+               if (valve->flags & TD_VALVE_WRLIMIT)
+                       break;
+
+               goto forward;
+
+       default:
+               BUG();
+       }
+
+       err = valve_expend_request(valve, treq);
+       if (!err)
+               goto forward;
+
+       err = valve_store_request(valve, treq);
+       if (err)
+               td_complete_request(treq, -EBUSY);
+
+       return;
+
+forward:
+       td_forward_request(treq);
+       valve->stats.forw++;
+}
+
+static int
+td_valve_get_parent_id(td_driver_t *driver, td_disk_id_t *id)
+{
+       return -EINVAL;
+}
+
+static int
+td_valve_validate_parent(td_driver_t *driver,
+                        td_driver_t *parent_driver, td_flag_t flags)
+{
+       return -EINVAL;
+}
+
+static void
+td_valve_stats(td_driver_t *driver, td_stats_t *st)
+{
+       td_valve_t *valve = driver->data;
+       td_valve_request_t *req, *next;
+       int n_reqs;
+
+       tapdisk_stats_field(st, "bridge", "d", valve->brname);
+       tapdisk_stats_field(st, "flags", "#x", valve->flags);
+
+       tapdisk_stats_field(st, "cred", "d", valve->cred);
+       tapdisk_stats_field(st, "need", "d", valve->need);
+       tapdisk_stats_field(st, "done", "d", valve->done);
+
+       /*
+        * stored is [ waiting, total-waits ]
+        */
+
+       n_reqs = 0;
+       td_valve_for_each_stored_request(req, next, valve)
+               n_reqs++;
+
+       tapdisk_stats_field(st, "stor", "[");
+       tapdisk_stats_val(st, "d", n_reqs);
+       tapdisk_stats_val(st, "llu", valve->stats.stor);
+       tapdisk_stats_leave(st, ']');
+
+       /*
+        * forwarded is [ in-flight, total-requests ]
+        */
+
+       n_reqs = 0;
+       td_valve_for_each_forwarded_request(req, next, valve)
+               n_reqs++;
+
+       tapdisk_stats_field(st, "forw", "[");
+       tapdisk_stats_val(st, "d", n_reqs);
+       tapdisk_stats_val(st, "llu", valve->stats.forw);
+       tapdisk_stats_leave(st, ']');
+}
+
+struct tap_disk tapdisk_valve = {
+       .disk_type                  = "tapdisk_valve",
+       .flags                      = 0,
+       .private_data_size          = sizeof(td_valve_t),
+       .td_open                    = td_valve_open,
+       .td_close                   = td_valve_close,
+       .td_queue_read              = td_valve_queue_request,
+       .td_queue_write             = td_valve_queue_request,
+       .td_get_parent_id           = td_valve_get_parent_id,
+       .td_validate_parent         = td_valve_validate_parent,
+       .td_stats                   = td_valve_stats,
+};
diff --git a/drivers/block-valve.h b/drivers/block-valve.h
new file mode 100644 (file)
index 0000000..344c53c
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2011, Citrix Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in the
+ *       documentation and/or other materials provided with the distribution.
+ *     * Neither the name of XenSource Inc. nor the names of its contributors
+ *       may be used to endorse or promote products derived from this software
+ *       without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _TAPDISK_VALVE_H_
+#define _TAPDISK_VALVE_H_
+
+#define TD_VALVE_SOCKDIR          "/var/run/blktap/ratelimit"
+#define TD_RLB_CONN_MAX           1024
+#define TD_RLB_REQUEST_MAX        (8 << 20)
+
+struct td_valve_req {
+       unsigned long need;
+       unsigned long done;
+};
+
+#endif /* _TAPDISK_VALVE_H_ */
index 158cd5f104a84f67eb67243e047259382b515596..c1767ae209f73f3154b62d040e91f9731ce13b9e 100644 (file)
@@ -99,6 +99,11 @@ static const disk_info_t local_cache_disk = {
        0,
 };
 
+static const disk_info_t valve_disk = {
+       "valve",
+       "group rate limiting (valve)",
+       0,
+};
 
 const disk_info_t *tapdisk_disk_types[] = {
        [DISK_TYPE_AIO] = &aio_disk,
@@ -112,6 +117,7 @@ const disk_info_t *tapdisk_disk_types[] = {
        [DISK_TYPE_VINDEX]      = &vhd_index_disk,
        [DISK_TYPE_LOG] = &log_disk,
        [DISK_TYPE_LOCAL_CACHE] = &local_cache_disk,
+       [DISK_TYPE_VALVE]       = &valve_disk,
        0,
 };
 
@@ -132,6 +138,7 @@ extern struct tap_disk tapdisk_vhd_index;
 extern struct tap_disk tapdisk_log;
 #endif
 extern struct tap_disk tapdisk_local_cache;
+extern struct tap_disk tapdisk_valve;
 
 const struct tap_disk *tapdisk_disk_drivers[] = {
        [DISK_TYPE_AIO]         = &tapdisk_aio,
@@ -151,6 +158,7 @@ const struct tap_disk *tapdisk_disk_drivers[] = {
        [DISK_TYPE_LOG]         = &tapdisk_log,
 #endif
        [DISK_TYPE_LOCAL_CACHE] = &tapdisk_local_cache,
+       [DISK_TYPE_VALVE]       = &tapdisk_valve,
        0,
 };
 
index abcfd5e15e53315ab95a29ccd6ae483560403721..cb5fedf16641651d2827141aa60f09f25ffa4c42 100644 (file)
@@ -41,6 +41,7 @@
 #define DISK_TYPE_LOG         9
 #define DISK_TYPE_REMUS       10
 #define DISK_TYPE_LOCAL_CACHE 11
+#define DISK_TYPE_VALVE       12
 
 #define DISK_TYPE_NAME_MAX    32
 
diff --git a/drivers/td-rated.c b/drivers/td-rated.c
new file mode 100644 (file)
index 0000000..8730fa1
--- /dev/null
@@ -0,0 +1,1709 @@
+/*
+ * Copyright (c) 2011, Citrix Systems.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in the
+ *       documentation and/or other materials provided with the distribution.
+ *     * Neither the name of XenSource Inc. nor the names of its contributors
+ *       may be used to endorse or promote products derived from this software
+ *       without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <errno.h>
+#include <stdarg.h>
+#include <signal.h>
+#include <getopt.h>
+#include <syslog.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <sys/time.h>
+
+#include "block-valve.h"
+#include "compiler.h"
+#include "list.h"
+
+static void
+rlb_vlog_vfprintf(int prio, const char *fmt, va_list ap)
+{
+       vfprintf(stderr, fmt, ap); fputc('\n', stderr);
+}
+
+static void (*rlb_vlog)(int prio, const char *fmt, va_list ap);
+
+__printf(2, 3)
+static void
+rlb_log(int prio, const char *fmt, ...)
+{
+       va_list ap;
+       va_start(ap, fmt); rlb_vlog(prio, fmt, ap); va_end(ap);
+}
+
+static int debug = 0;
+
+#define DBG(_l, _f, _a...) if (debug >= _l) { rlb_log(LOG_DEBUG, _f, ##_a); }
+#define INFO(_f, _a...)    rlb_log(LOG_INFO, _f, ##_a)
+#define WARN(_f, _a...)    rlb_log(LOG_WARNING, "WARNING: " _f " in %s:%d", \
+                                  ##_a, __func__, __LINE__)
+#define ERR(_f, _a...)     rlb_log(LOG_ERR, "ERROR: " _f " in %s:%d", \
+                                  ##_a, __func__, __LINE__)
+#define PERROR(_f, _a...)  rlb_log(LOG_ERR, _f ": %s in %s:%d", \
+                                  ##_a, strerror(errno), __func__, __LINE__)
+
+#define BUG() do {                                             \
+               ERR("Aborting");                                \
+               abort();                                        \
+       } while (0)
+
+#define BUG_ON(_cond)                                          \
+       if (unlikely(_cond)) {                                  \
+               ERR("(%s) = %d", #_cond, _cond);                \
+               BUG();                                          \
+       }
+
+#define WARN_ON(_cond) ({                                      \
+       int __cond = _cond;                                     \
+       if (unlikely(__cond))                                   \
+               WARN("(%s) = %d", #_cond, _cond);               \
+       __cond;                                         \
+})
+
+#define MAX(a, b)       ((a) > (b) ? (a) : (b))
+#define MIN(a, b)       ((a) < (b) ? (a) : (b))
+
+#define ARRAY_SIZE(_a)  (sizeof(_a)/sizeof((_a)[0]))
+
+typedef struct ratelimit_bridge        td_rlb_t;
+typedef struct ratelimit_connection    td_rlb_conn_t;
+
+struct ratelimit_connection {
+       int                            sock;
+
+       unsigned long                  need; /* I/O requested */
+       unsigned long                  gntd; /* I/O granted, pending */
+
+       struct list_head               open; /* connected */
+       struct list_head               wait; /* need > 0 */
+
+       struct {
+               struct timeval         since;
+               struct timeval         total;
+       } wstat;
+};
+
+#define RLB_CONN_MAX                   1024
+
+struct ratelimit_ops {
+       void    (*usage)(td_rlb_t *rlb, FILE *stream, void *data);
+
+       int     (*create)(td_rlb_t *rlb, int argc, char **argv, void **data);
+       void    (*destroy)(td_rlb_t *rlb, void *data);
+
+       void    (*info)(td_rlb_t *rlb, void *data);
+
+       void    (*settimeo)(td_rlb_t *rlb, struct timeval **tv, void *data);
+       void    (*timeout)(td_rlb_t *rlb, void *data);
+       void    (*dispatch)(td_rlb_t *rlb, void *data);
+       void    (*reset)(td_rlb_t *rlb, void *data);
+};
+
+struct ratelimit_bridge {
+       char                          *name;
+       char                          *ident;
+
+       struct sockaddr_un             addr;
+       char                          *path;
+       int                            sock;
+
+       struct list_head               open; /* all connections */
+       struct list_head               wait; /* all in need */
+
+       struct timeval                 ts, now;
+
+       td_rlb_conn_t                  connv[RLB_CONN_MAX];
+       td_rlb_conn_t                 *free[RLB_CONN_MAX];
+       int                            n_free;
+
+       struct rlb_valve {
+               struct ratelimit_ops  *ops;
+               void                  *data;
+       } valve;
+};
+
+#define rlb_for_each_conn(_conn, _rlb)                                 \
+       list_for_each_entry(_conn, &(_rlb)->open, open)
+
+#define rlb_for_each_conn_safe(_conn, _next, _rlb)                     \
+       list_for_each_entry_safe(_conn, _next, &(_rlb)->open, open)
+
+#define rlb_for_each_waiting(_conn, _next, _rlb)                       \
+       list_for_each_entry(_conn, _next, &(_rlb)->wait, wait)
+
+#define rlb_for_each_waiting_safe(_conn, _next, _rlb)                  \
+       list_for_each_entry_safe(_conn, _next, &(_rlb)->wait, wait)
+
+#define rlb_conn_entry(_list)                  \
+       list_entry(_list, td_rlb_conn_t, open)
+
+#define rlb_wait_entry(_list)                  \
+       list_entry(_list, td_rlb_conn_t, wait)
+
+static struct ratelimit_ops *rlb_find_valve(const char *name);
+
+static int rlb_create_valve(td_rlb_t *, struct rlb_valve *,
+                           const char *name, int argc, char **argv);
+
+/*
+ * util
+ */
+
+#define case_G case 'G': case 'g'
+#define case_M case 'M': case 'm'
+#define case_K case 'K': case 'k'
+
+static long
+rlb_strtol(const char *s)
+{
+       unsigned long l, u = 1;
+       char *end, p, q;
+
+       l = strtoul(s, &end, 0);
+       if (!*end)
+               return l;
+
+       p = *end++;
+
+       switch (p) {
+       case_G: case_M: case_K:
+
+               q = *end++;
+
+               switch (q) {
+               case 'i':
+                       switch (p) {
+                       case_G:
+                               u *= 1000;
+                       case_M:
+                               u *= 1000;
+                       case_K:
+                               u *= 1000;
+                       }
+                       break;
+
+               case 0:
+                       switch (p) {
+                       case_G:
+                               u *= 1024;
+                       case_M:
+                               u *= 1024;
+                       case_K:
+                               u *= 1024;
+                       }
+                       break;
+
+               default:
+                       goto fail;
+               }
+               break;
+
+       case 0:
+               break;
+
+       default:
+               goto fail;
+       }
+
+       return l * u;
+
+fail:
+       return -EINVAL;
+}
+
+static char*
+vmprintf(const char *fmt, va_list ap)
+{
+       char *s;
+       int n;
+
+       n = vasprintf(&s, fmt, ap);
+       if (n < 0)
+               s = NULL;
+
+       return s;
+}
+
+__printf(1, 2)
+static char*
+mprintf(const char *fmt, ...)
+{
+       va_list ap;
+       char *s;
+
+       va_start(ap, fmt);
+       s = vmprintf(fmt, ap);
+       va_end(ap);
+
+       return s;
+}
+
+static int
+sysctl_vscanf(const char *name, const char *fmt, va_list ap)
+{
+       char *path = NULL;
+       FILE *s = NULL;
+       int rv;
+
+       path = mprintf("/proc/sys/%s", name);
+       if (!path) {
+               rv = -errno;
+               goto fail;
+       }
+
+       s = fopen(path, "r");
+       if (!s) {
+               rv = -errno;
+               goto fail;
+       }
+
+       rv = vfscanf(s, fmt, ap);
+fail:
+       if (s)
+               fclose(s);
+
+       if (path)
+               free(path);
+
+       return rv;
+}
+
+static int
+sysctl_scanf(const char *name, const char *fmt, ...)
+{
+       va_list(ap);
+       int rv;
+
+       va_start(ap, fmt);
+       rv = sysctl_vscanf(name, fmt, ap);
+       va_end(ap);
+
+       return rv;
+}
+
+static long
+sysctl_strtoul(const char *name)
+{
+       unsigned val;
+       int n;
+
+       n = sysctl_scanf(name, "%lu", &val);
+       if (n < 0)
+               return n;
+       if (n != 1)
+               return -EINVAL;
+
+       return val;
+}
+
+
+static long long
+rlb_tv_usec(const struct timeval *tv)
+{
+       long long us;
+
+       us  = tv->tv_sec;
+       us *= 1000000;
+       us += tv->tv_usec;
+
+       return us;
+}
+
+static long long
+rlb_usec_since(td_rlb_t *rlb, const struct timeval *since)
+{
+       struct timeval delta;
+
+       timersub(&rlb->now, since, &delta);
+
+       return rlb_tv_usec(&delta);
+}
+
+static inline void
+rlb_argv_shift(int *optind, int *argc, char ***argv)
+{
+       /* reset optind and args after '--' */
+
+       *optind -= 1;
+
+       *argc   -= *optind;
+       *argv   += *optind;
+
+       *optind  = 1;
+}
+
+/*
+ * socket I/O
+ */
+
+static void
+rlb_sock_close(td_rlb_t *rlb)
+{
+       if (rlb->path) {
+               unlink(rlb->path);
+               rlb->path = NULL;
+       }
+
+       if (rlb->sock >= 0) {
+               close(rlb->sock);
+               rlb->sock = -1;
+       }
+}
+
+static int
+rlb_sock_open(td_rlb_t *rlb)
+{
+       int s, err;
+
+       rlb->sock = -1;
+
+       s = socket(AF_UNIX, SOCK_STREAM, 0);
+       if (s < 0) {
+               PERROR("socket");
+               err = -errno;
+               goto fail;
+       }
+
+       rlb->sock = s;
+
+       rlb->addr.sun_family = AF_UNIX;
+       snprintf(rlb->addr.sun_path, sizeof(rlb->addr.sun_path),
+                "%s/%s", TD_VALVE_SOCKDIR, rlb->name);
+
+       err = bind(rlb->sock, &rlb->addr, sizeof(rlb->addr));
+       if (err) {
+               PERROR("%s", rlb->addr.sun_path);
+               err = -errno;
+               goto fail;
+       }
+
+       rlb->path = rlb->addr.sun_path;
+
+       err = listen(rlb->sock, RLB_CONN_MAX);
+       if (err) {
+               PERROR("listen(%s)", rlb->addr.sun_path);
+               err = -errno;
+               goto fail;
+       }
+
+       return 0;
+
+fail:
+       rlb_sock_close(rlb);
+       return err;
+}
+
+static int
+rlb_sock_send(td_rlb_t *rlb, td_rlb_conn_t *conn,
+             const void *msg, size_t size)
+{
+       ssize_t n;
+
+       n = send(conn->sock, msg, size, MSG_DONTWAIT);
+       if (n < 0)
+               return -errno;
+       if (n && n != size)
+               return -EPROTO;
+
+       return 0;
+}
+
+static int
+rlb_sock_recv(td_rlb_t *rlb, td_rlb_conn_t *conn,
+             void *msg, size_t size)
+{
+       ssize_t n;
+
+       n = recv(conn->sock, msg, size, MSG_DONTWAIT);
+       if (n < 0)
+               return -errno;
+
+       return n;
+}
+
+static td_rlb_conn_t *
+rlb_conn_alloc(td_rlb_t *rlb)
+{
+       td_rlb_conn_t *conn = NULL;
+
+       if (likely(rlb->n_free > 0))
+               conn = rlb->free[--rlb->n_free];
+
+       return conn;
+}
+
+static void
+rlb_conn_free(td_rlb_t *rlb, td_rlb_conn_t *conn)
+{
+       BUG_ON(rlb->n_free >= RLB_CONN_MAX);
+
+       rlb->free[rlb->n_free++] = conn;
+}
+
+static int
+rlb_conn_id(td_rlb_t *rlb, td_rlb_conn_t *conn)
+{
+       return conn - rlb->connv;
+}
+
+static void
+rlb_conn_info(td_rlb_t *rlb, td_rlb_conn_t *conn)
+{
+       long long wtime;
+       int waits;
+
+       wtime = 0;
+       waits = !list_empty(&conn->wait);
+       if (waits)
+               wtime = rlb_usec_since(rlb, &conn->wstat.since) / 1000;
+
+       WARN_ON(!!conn->need != waits);
+
+       INFO("conn[%d] needs %lu (since %llu ms, total %lu.%06lu s),"
+            " %lu granted",
+            rlb_conn_id(rlb, conn), conn->need, wtime,
+            conn->wstat.total.tv_sec, conn->wstat.total.tv_usec,
+            conn->gntd);
+}
+
+static void
+rlb_conn_infos(td_rlb_t *rlb)
+{
+       td_rlb_conn_t *conn;
+
+       rlb_for_each_conn(conn, rlb)
+               rlb_conn_info(rlb, conn);
+}
+
+static void
+rlb_conn_close(td_rlb_t *rlb, td_rlb_conn_t *conn)
+{
+       int s = conn->sock;
+
+       INFO("Connection %d closed.", rlb_conn_id(rlb, conn));
+       rlb_conn_info(rlb, conn);
+
+       if (s) {
+               close(s);
+               conn->sock = -1;
+       }
+
+       list_del_init(&conn->wait);
+       list_del(&conn->open);
+
+       rlb_conn_free(rlb, conn);
+}
+
+static void
+rlb_conn_receive(td_rlb_t *rlb, td_rlb_conn_t *conn)
+{
+       struct td_valve_req buf[32], req = { -1, -1 };
+       ssize_t n;
+       int i, err;
+
+       n = rlb_sock_recv(rlb, conn, buf, sizeof(buf));
+       if (!n)
+               goto close;
+
+       if (n < 0) {
+               err = n;
+               if (err != -EAGAIN)
+                       goto fail;
+       }
+
+       if (unlikely(n % sizeof(req))) {
+               err = -EPROTO;
+               goto fail;
+       }
+
+       for (i = 0; i < n / sizeof(buf[0]); i++) {
+               req = buf[i];
+
+               if (unlikely(req.need > TD_RLB_REQUEST_MAX)) {
+                       err = -EINVAL;
+                       goto fail;
+               }
+
+               if (unlikely(req.done > conn->gntd)) {
+                       err = -EINVAL;
+                       goto fail;
+               }
+
+               conn->need += req.need;
+               conn->gntd -= req.done;
+
+               DBG(8, "rcv: %lu/%lu need=%lu gntd=%lu",
+                   req.need, req.done, conn->need, conn->gntd);
+
+               if (unlikely(conn->need > TD_RLB_REQUEST_MAX)) {
+                       err = -EINVAL;
+                       goto fail;
+               }
+       }
+
+       if (conn->need && list_empty(&conn->wait)) {
+               list_add_tail(&conn->wait, &rlb->wait);
+               conn->wstat.since = rlb->now;
+       }
+
+       return;
+
+fail:
+       WARN("err = %d (%s)"
+            " (need %ld/%ld, %ld/%ld done),"
+            " closing connection.",
+            err, strerror(-err),
+            req.need, conn->need, req.done, conn->gntd);
+
+       rlb_conn_info(rlb, conn);
+close:
+       rlb_conn_close(rlb, conn);
+}
+
+static void
+rlb_conn_respond(td_rlb_t *rlb, td_rlb_conn_t *conn, unsigned long need)
+{
+       int err;
+
+       BUG_ON(need > conn->need);
+
+       err = rlb_sock_send(rlb, conn, &need, sizeof(need));
+       if (err)
+               goto fail;
+
+       conn->need -= need;
+       conn->gntd += need;
+
+       DBG(8, "snd: %lu need=%lu gntd=%lu", need, conn->need, conn->gntd);
+
+       if (!conn->need) {
+               struct timeval delta;
+
+               timersub(&rlb->now, &conn->wstat.since, &delta);
+               timeradd(&conn->wstat.total, &delta, &conn->wstat.total);
+
+               list_del_init(&conn->wait);
+       }
+
+       return;
+
+fail:
+       WARN("err = %d, killing connection.", err);
+       rlb_conn_close(rlb, conn);
+}
+
+static void
+rlb_accept_conn(td_rlb_t *rlb)
+{
+       td_rlb_conn_t *conn;
+       int s, err;
+
+       s = accept(rlb->sock, NULL, NULL);
+       if (!s) {
+               err = -errno;
+               goto fail;
+       }
+
+       conn = rlb_conn_alloc(rlb);
+       if (!conn) {
+               err = -ENOMEM;
+               close(s);
+               goto fail;
+       }
+
+       INFO("Accepting connection %d.", conn - rlb->connv);
+
+       memset(conn, 0, sizeof(*conn));
+       INIT_LIST_HEAD(&conn->wait);
+       conn->sock = s;
+       list_add_tail(&conn->open, &rlb->open);
+
+       return;
+
+fail:
+       WARN("err = %d", err);
+}
+
+static long long
+rlb_pending(td_rlb_t *rlb)
+{
+       td_rlb_conn_t *conn;
+       long long pend = 0;
+
+       rlb_for_each_conn(conn, rlb)
+               pend += conn->gntd;
+
+       return pend;
+}
+
+/*
+ * token bucket valve
+ */
+
+typedef struct ratelimit_token td_rlb_token_t;
+
+struct ratelimit_token {
+       long                      cred;
+       long                      cap;
+       long                      rate;
+       struct timeval            timeo;
+};
+
+static void
+rlb_token_settimeo(td_rlb_t *rlb, struct timeval **_tv, void *data)
+{
+       td_rlb_token_t *token = data;
+       struct timeval *tv = &token->timeo;
+       long long us;
+
+       if (list_empty(&rlb->wait)) {
+               *_tv = NULL;
+               return;
+       }
+
+       WARN_ON(token->cred >= 0);
+
+       us  = -token->cred;
+       us *= 1000000;
+       us /= token->rate;
+
+       tv->tv_sec  = us / 1000000;
+       tv->tv_usec = us % 1000000;
+
+       WARN_ON(!timerisset(tv));
+
+       *_tv = tv;
+}
+
+static void
+rlb_token_refill(td_rlb_t *rlb, td_rlb_token_t *token)
+{
+       struct timeval tv;
+       long long cred, max_usec;
+
+       /* max time needed to refill up to cap */
+
+       max_usec  = token->cap - token->cred;
+       max_usec *= 1000000;
+       max_usec += token->rate - 1;
+       max_usec /= token->rate;
+
+       /* actual credit gained */
+
+       timersub(&rlb->now, &rlb->ts, &tv);
+
+       cred  = rlb_tv_usec(&tv);
+       cred  = MIN(cred, max_usec);
+       cred *= token->rate;
+       cred /= 1000000;
+
+       /* up to cap */
+
+       token->cred += cred;
+       token->cred  = MIN(token->cred, token->cap);
+}
+
+static void
+rlb_token_dispatch(td_rlb_t *rlb, void *data)
+{
+       td_rlb_token_t *token = data;
+       td_rlb_conn_t *conn, *next;
+
+       rlb_token_refill(rlb, token);
+
+       rlb_for_each_waiting_safe(conn, next, rlb) {
+               if (token->cred < 0)
+                       break;
+
+               token->cred -= conn->need;
+
+               rlb_conn_respond(rlb, conn, conn->need);
+       }
+}
+
+static void
+rlb_token_reset(td_rlb_t *rlb, void *data)
+{
+       td_rlb_token_t *token = data;
+
+       token->cred = token->cap;
+}
+
+static void
+rlb_token_destroy(td_rlb_t *rlb, void *data)
+{
+       td_rlb_token_t *token = data;
+
+       if (token)
+               free(token);
+}
+
+static int
+rlb_token_create(td_rlb_t *rlb, int argc, char **argv, void **data)
+{
+       td_rlb_token_t *token;
+       int err;
+
+       token = calloc(1, sizeof(*token));
+       if (!token) {
+               err = -ENOMEM;
+               goto fail;
+       }
+
+       token->rate = 0;
+       token->cap  = 0;
+
+       do {
+               const struct option longopts[] = {
+                       { "rate",        1, NULL, 'r' },
+                       { "cap",         1, NULL, 'c' },
+                       { NULL,          0, NULL,  0  }
+               };
+               int c;
+
+               c = getopt_long(argc, argv, "r:c:", longopts, NULL);
+               if (c < 0)
+                       break;
+
+               switch (c) {
+               case 'r':
+                       token->rate = rlb_strtol(optarg);
+                       if (token->rate < 0) {
+                               ERR("invalid --rate");
+                               goto usage;
+                       }
+                       break;
+
+               case 'c':
+                       token->cap = rlb_strtol(optarg);
+                       if (token->cap < 0) {
+                               ERR("invalid --cap");
+                               goto usage;
+                       }
+                       break;
+
+               case '?':
+                       goto usage;
+
+               default:
+                       BUG();
+               }
+       } while (1);
+
+       if (!token->rate) {
+               ERR("--rate required");
+               goto usage;
+       }
+
+       rlb_token_reset(rlb, token);
+
+       *data = token;
+
+       return 0;
+
+fail:
+       if (token)
+               free(token);
+
+       return err;
+
+usage:
+       err = -EINVAL;
+       goto fail;
+}
+
+static void
+rlb_token_usage(td_rlb_t *rlb, FILE *stream, void *data)
+{
+       fprintf(stream,
+               " {-t|--type}=token --"
+               " {-r|--rate}=<rate [KMG]>"
+               " {-c|--cap}=<size [KMG]>");
+}
+
+static void
+rlb_token_info(td_rlb_t *rlb, void *data)
+{
+       td_rlb_token_t *token = data;
+
+       INFO("TOKEN: rate: %ld B/s cap: %ld B cred: %ld B",
+            token->rate, token->cap, token->cred);
+}
+
+static struct ratelimit_ops rlb_token_ops = {
+       .usage    = rlb_token_usage,
+       .create   = rlb_token_create,
+       .destroy  = rlb_token_destroy,
+       .info     = rlb_token_info,
+
+       .settimeo = rlb_token_settimeo,
+       .timeout  = rlb_token_dispatch,
+       .dispatch = rlb_token_dispatch,
+       .reset    = rlb_token_reset,
+};
+
+/*
+ * meminfo valve
+ */
+
+typedef struct ratelimit_meminfo td_rlb_meminfo_t;
+
+struct ratelimit_meminfo {
+       unsigned int                   period;
+       struct timeval                 ts;
+
+       FILE                          *s;
+
+       unsigned long                  total;
+       unsigned long                  dirty;
+       unsigned long                  writeback;
+
+       unsigned int                   limit_hi;
+       unsigned int                   limit_lo;
+       unsigned int                   congested;
+
+       struct rlb_valve               valve;
+       struct timeval                 timeo;
+};
+
+static void
+rlb_meminfo_info(td_rlb_t *rlb, void *data)
+{
+       td_rlb_meminfo_t *m = data;
+
+       INFO("MEMINFO: lo/hi: %u/%u%% period: %u ms",
+            m->limit_lo, m->limit_hi, m->period);
+
+       INFO("MEMINFO: total %lu kB, dirty/writeback %lu/%lu kB",
+            m->total, m->dirty, m->writeback);
+
+       m->valve.ops->info(rlb, m->valve.data);
+}
+
+static void
+rlb_meminfo_close(td_rlb_meminfo_t *m)
+{
+       if (m->s) {
+               fclose(m->s);
+               m->s = NULL;
+       }
+}
+
+static int
+rlb_meminfo_open(td_rlb_meminfo_t *m)
+{
+       FILE *s;
+       int err;
+
+       m->s = NULL;
+
+       s = fopen("/proc/meminfo", "r");
+       if (!s) {
+               err = -errno;
+               goto fail;
+       }
+
+       m->s = s;
+
+       return 0;
+
+fail:
+       rlb_meminfo_close(m);
+       return err;
+}
+
+static inline int __test_bit(int n, unsigned long *bitmap)
+{
+       return !!(*bitmap & (1UL<<n));
+}
+
+static inline void __clear_bit(int n, unsigned long *bitmap)
+{
+       *bitmap &= ~(1UL<<n);
+}
+
+static struct ratelimit_meminfo_scan {
+       const char    *format;
+       ptrdiff_t      ptrdiff;
+} rlb_meminfo_scanfs[] = {
+       { "MemTotal:  %lu kB",
+         offsetof(struct ratelimit_meminfo, total) },
+       { "Dirty:     %lu kB",
+         offsetof(struct ratelimit_meminfo, dirty) },
+       { "Writeback: %lu kB",
+         offsetof(struct ratelimit_meminfo, writeback) },
+};
+
+static int
+rlb_meminfo_scan(td_rlb_meminfo_t *m)
+{
+       const int n_keys = ARRAY_SIZE(rlb_meminfo_scanfs);
+       unsigned long pending;
+       int err;
+
+       err = rlb_meminfo_open(m);
+       if (err)
+               goto fail;
+
+       pending = (1UL << n_keys) - 1;
+
+       do {
+               char buf[80], *b;
+               int i;
+
+               b = fgets(buf, sizeof(buf), m->s);
+               if (!b)
+                       break;
+
+               for (i = 0; i < n_keys; i++) {
+                       struct ratelimit_meminfo_scan *scan;
+                       unsigned long val, *ptr;
+                       int n;
+
+                       if (!__test_bit(i, &pending))
+                               continue;
+
+                       scan = &rlb_meminfo_scanfs[i];
+
+                       n = sscanf(buf, scan->format, &val);
+                       if (n != 1)
+                               continue;
+
+                       ptr  = (void*)m + scan->ptrdiff;
+                       *ptr = val;
+
+                       __clear_bit(i, &pending);
+               }
+
+       } while (pending);
+
+       if (pending) {
+               err = -ESRCH;
+               goto fail;
+       }
+
+       err = 0;
+fail:
+       rlb_meminfo_close(m);
+       return err;
+}
+
+static void
+rlb_meminfo_usage(td_rlb_t *rlb, FILE *stream, void *data)
+{
+       td_rlb_meminfo_t *m = data;
+
+       fprintf(stream,
+               " {-t|--type}=meminfo "
+               " {-H|--high}=<percent> {-l|--low}=<percent>"
+               " {-p|--period}=<msecs> --");
+
+       if (m && m->valve.ops) {
+               m->valve.ops->usage(rlb, stream, m->valve.data);
+       } else
+               fprintf(stream, " {-t|--type}={...}");
+}
+
+static void
+rlb_meminfo_destroy(td_rlb_t *rlb, void *data)
+{
+       td_rlb_meminfo_t *m = data;
+
+       if (m) {
+               if (m->valve.data) {
+                       m->valve.ops->destroy(rlb, m->valve.data);
+                       m->valve.data = NULL;
+               }
+
+               free(m);
+       }
+}
+
+static int
+rlb_meminfo_create(td_rlb_t *rlb, int argc, char **argv, void **data)
+{
+       td_rlb_meminfo_t *m;
+       const char *type;
+       long dbr;
+       int err;
+
+       m = calloc(1, sizeof(*m));
+       if (!m) {
+               err = -errno;
+               goto fail;
+       }
+
+       type      = NULL;
+       m->period = 100;
+
+       do {
+               const struct option longopts[] = {
+                       { "period",    1, NULL, 'p' },
+                       { "type",      1, NULL, 't' },
+                       { "high",      1, NULL, 'H' },
+                       { "low",       1, NULL, 'L' },
+                       { NULL,        0, NULL,  0  }
+               };
+               int c;
+
+               c = getopt_long(argc, argv, "p:t:H:L:", longopts, NULL);
+               if (c < 0)
+                       break;
+
+               switch (c) {
+               case 'p':
+                       m->period = rlb_strtol(optarg);
+                       if (m->period < 0)
+                               goto usage;
+                       break;
+
+               case 'H':
+                       m->limit_hi = strtoul(optarg, NULL, 0);
+                       break;
+
+               case 'L':
+                       m->limit_lo = strtoul(optarg, NULL, 0);
+                       break;
+
+               case 't':
+                       type = optarg;
+                       break;
+
+               case '?':
+                       goto usage;
+
+               default:
+                       BUG();
+               }
+       } while (1);
+
+       if (!type) {
+               ERR("--type required");
+               goto usage;
+       }
+
+       if (!m->limit_hi || !m->limit_lo) {
+               ERR("--high/--low required");
+               goto usage;
+       }
+
+       if (m->limit_lo >= m->limit_hi) {
+               ERR("invalid --high/--low ratio");
+               goto usage;
+       }
+
+       dbr = sysctl_strtoul("vm/dirty_background_ratio");
+       if (dbr < 0) {
+               err = dbr;
+               goto fail;
+       }
+
+       if (0 && m->limit_lo < dbr) {
+               ERR("--low %u is less than vm.dirty_background_ratio (= %ld)",
+                   m->limit_lo, dbr);
+               err = -EINVAL;
+               goto fail;
+       }
+
+       *data = m;
+
+       rlb_argv_shift(&optind, &argc, &argv);
+
+       err = rlb_create_valve(rlb, &m->valve, type, argc, argv);
+       if (err) {
+               if (err == -EINVAL)
+                       goto usage;
+               goto fail;
+       }
+
+       err = rlb_meminfo_scan(m);
+       if (err)
+               goto fail;
+
+       return 0;
+
+fail:
+       rlb_meminfo_destroy(rlb, m);
+       WARN("err = %d", err);
+       return err;
+
+usage:
+       err = -EINVAL;
+       return err;
+};
+
+static void
+rlb_meminfo_settimeo(td_rlb_t *rlb, struct timeval **_tv, void *data)
+{
+       td_rlb_meminfo_t *m = data;
+       int idle;
+
+       idle = list_empty(&rlb->wait);
+       BUG_ON(!idle && !m->congested);
+
+       if (m->congested) {
+               m->valve.ops->settimeo(rlb, _tv, m->valve.data);
+               return;
+       }
+
+       *_tv = NULL;
+}
+
+static void
+rlb_meminfo_timeout(td_rlb_t *rlb, void *data)
+{
+       td_rlb_meminfo_t *m = data;
+
+       WARN_ON(!m->congested);
+
+       if (m->congested)
+               m->valve.ops->timeout(rlb, m->valve.data);
+}
+
+static int
+rlb_meminfo_test_high(td_rlb_t *rlb, td_rlb_meminfo_t *m, long long cred)
+{
+       long long lo;
+
+       if (m->congested) {
+               /* hysteresis */
+
+               lo  = m->total;
+               lo *= m->limit_lo;
+               lo /= 100;
+
+               if (cred >= lo)
+                       return 0;
+
+       } else
+               if (cred <= 0) {
+                       m->valve.ops->reset(rlb, m->valve.data);
+                       return 1;
+               }
+
+       return m->congested;
+}
+
+static void
+rlb_meminfo_dispatch_low(td_rlb_t *rlb, td_rlb_meminfo_t *m,
+                        long long *_cred)
+{
+       td_rlb_conn_t *conn, *next;
+       long long cred = *_cred, grant;
+
+       rlb_for_each_waiting_safe(conn, next, rlb) {
+
+               if (cred <= 0)
+                       break;
+
+               grant = MIN(cred, conn->need);
+
+               rlb_conn_respond(rlb, conn, grant);
+
+               cred -= grant;
+       }
+
+       *_cred = cred;
+}
+
+static void
+rlb_meminfo_dispatch(td_rlb_t *rlb, void *data)
+{
+       td_rlb_meminfo_t *m = data;
+       long long us, hi, cred, dirty, pend;
+
+       /* we run only once per m->period */
+
+       us = rlb_usec_since(rlb, &m->ts);
+       if (us / 1000 > m->period) {
+               rlb_meminfo_scan(m);
+               m->ts = rlb->now;
+       }
+
+       /* uncongested credit:
+          memory below hi watermark minus pending I/O */
+
+       hi  = m->total;
+       hi *= m->limit_hi;
+       hi /= 100;
+
+       dirty = m->dirty + m->writeback;
+
+       cred  = hi - dirty;
+       cred *= 1000;
+
+       pend  = rlb_pending(rlb);
+       cred -= pend;
+
+       m->congested = rlb_meminfo_test_high(rlb, m, cred);
+
+       DBG(3, "dirty=%lld (%lld) pend=%llu cred=%lld %s",
+           dirty, dirty * 100 / m->total, pend, cred,
+           m->congested ? "congested" : "");
+
+       if (!m->congested) {
+               rlb_meminfo_dispatch_low(rlb, m, &cred);
+
+               m->congested = rlb_meminfo_test_high(rlb, m, cred);
+       }
+
+       if (m->congested)
+               m->valve.ops->dispatch(rlb, m->valve.data);
+}
+
+static struct ratelimit_ops rlb_meminfo_ops = {
+       .usage    = rlb_meminfo_usage,
+       .create   = rlb_meminfo_create,
+       .destroy  = rlb_meminfo_destroy,
+       .info     = rlb_meminfo_info,
+
+       .settimeo = rlb_meminfo_settimeo,
+       .timeout  = rlb_meminfo_timeout,
+       .dispatch = rlb_meminfo_dispatch,
+};
+
+/*
+ * main loop
+ */
+
+static void
+rlb_info(td_rlb_t *rlb)
+{
+       rlb->valve.ops->info(rlb, rlb->valve.data);
+
+       rlb_conn_infos(rlb);
+}
+
+static sigset_t rlb_sigunblock;
+static sigset_t rlb_sigpending;
+
+static void
+rlb_sigmark(int signo)
+{
+       INFO("Caught SIG%d", signo);
+       sigaddset(&rlb_sigpending, signo);
+}
+
+static int
+rlb_siginit(void)
+{
+       struct sigaction sa_ignore  = { .sa_handler = SIG_IGN };
+       struct sigaction sa_pending = { .sa_handler = rlb_sigmark };
+       sigset_t sigmask;
+       int err = 0;
+
+       if (!err)
+               err = sigaction(SIGPIPE, &sa_ignore, NULL);
+       if (!err)
+               err = sigaction(SIGINT,  &sa_pending, NULL);
+       if (!err)
+               err = sigaction(SIGTERM, &sa_pending, NULL);
+       if (!err)
+               err = sigaction(SIGUSR1, &sa_pending, NULL);
+       if (err) {
+               err = -errno;
+               goto fail;
+       }
+
+       sigemptyset(&sigmask);
+       sigaddset(&sigmask, SIGINT);
+       sigaddset(&sigmask, SIGTERM);
+       sigaddset(&sigmask, SIGUSR1);
+
+       err = sigprocmask(SIG_BLOCK, &sigmask, &rlb_sigunblock);
+       if (err) {
+               err = -errno;
+               goto fail;
+       }
+
+fail:
+       return err;
+}
+
+static int
+rlb_main_signaled(td_rlb_t *rlb)
+{
+       if (sigismember(&rlb_sigpending, SIGUSR1))
+               rlb_info(rlb);
+
+       if (sigismember(&rlb_sigpending, SIGINT) ||
+           sigismember(&rlb_sigpending, SIGTERM))
+               return -EINTR;
+
+       return 0;
+}
+
+
+static struct ratelimit_ops *
+rlb_find_valve(const char *name)
+{
+       struct ratelimit_ops *ops = NULL;
+
+       switch (name[0]) {
+#if 0
+       case 'l':
+               if (!strcmp(name, "leaky"))
+                       ops = &rlb_leaky_ops;
+               break;
+#endif
+
+       case 't':
+               if (!strcmp(name, "token"))
+                       ops = &rlb_token_ops;
+               break;
+
+       case 'm':
+               if (!strcmp(name, "meminfo"))
+                       ops = &rlb_meminfo_ops;
+               break;
+       }
+
+       return ops;
+}
+
+static int
+rlb_main_iterate(td_rlb_t *rlb)
+{
+       td_rlb_conn_t *conn, *next;
+       struct timeval *tv;
+       struct timespec _ts, *ts = &_ts;
+       int nfds, err;
+       fd_set rfds;
+
+       FD_ZERO(&rfds);
+       nfds = 0;
+
+       if (stdin) {
+               FD_SET(STDIN_FILENO, &rfds);
+               nfds = MAX(nfds, STDIN_FILENO);
+       }
+
+       if (rlb->sock >= 0) {
+               FD_SET(rlb->sock, &rfds);
+               nfds = MAX(nfds, rlb->sock);
+       }
+
+       rlb_for_each_conn(conn, rlb) {
+               FD_SET(conn->sock, &rfds);
+               nfds = MAX(nfds, conn->sock);
+       }
+
+       rlb->valve.ops->settimeo(rlb, &tv, rlb->valve.data);
+       if (tv) {
+               TIMEVAL_TO_TIMESPEC(tv, ts);
+       } else
+               ts = NULL;
+
+       rlb->ts = rlb->now;
+
+       nfds = pselect(nfds + 1, &rfds, NULL, NULL, ts, &rlb_sigunblock);
+       if (nfds < 0) {
+               err = -errno;
+               if (err != -EINTR)
+                       PERROR("select");
+               goto fail;
+       }
+
+       gettimeofday(&rlb->now, NULL);
+
+       if (!nfds) {
+               BUG_ON(!ts);
+               rlb->valve.ops->timeout(rlb, rlb->valve.data);
+       }
+
+       if (nfds) {
+               rlb_for_each_conn_safe(conn, next, rlb)
+                       if (FD_ISSET(conn->sock, &rfds)) {
+                               rlb_conn_receive(rlb, conn);
+                               if (!--nfds)
+                                       break;
+                       }
+
+               rlb->valve.ops->dispatch(rlb, rlb->valve.data);
+       }
+
+       if (unlikely(nfds)) {
+               if (FD_ISSET(STDIN_FILENO, &rfds)) {
+                       getc(stdin);
+                       rlb_info(rlb);
+                       nfds--;
+               }
+       }
+
+       if (unlikely(nfds)) {
+               if (FD_ISSET(rlb->sock, &rfds)) {
+                       rlb_accept_conn(rlb);
+                       nfds--;
+               }
+       }
+
+       BUG_ON(nfds);
+       err = 0;
+fail:
+       return err;
+}
+
+static int
+rlb_main_run(td_rlb_t *rlb)
+{
+       int err;
+
+       do {
+               err = rlb_main_iterate(rlb);
+               if (err) {
+                       if (err != -EINTR)
+                               break;
+
+                       err = rlb_main_signaled(rlb);
+                       if (err) {
+                               err = 0;
+                               break;
+                       }
+               }
+
+       } while (rlb->sock >= 0 || !list_empty(&rlb->open));
+
+       return err;
+}
+
+static void
+rlb_shutdown(td_rlb_t *rlb)
+{
+       td_rlb_conn_t *conn, *next;
+
+       rlb_for_each_conn_safe(conn, next, rlb)
+               rlb_conn_close(rlb, conn);
+
+       rlb_sock_close(rlb);
+}
+
+static void
+rlb_usage(td_rlb_t *rlb, const char *prog, FILE *stream)
+{
+       fprintf(stream, "Usage: %s <name>", prog);
+
+       if (rlb && rlb->valve.ops)
+               rlb->valve.ops->usage(rlb, stream, rlb->valve.data);
+       else
+               fprintf(stream,
+                       " {-t|--type}={token|meminfo}"
+                       " [-h|--help] [-D|--debug=<n>]");
+
+       fprintf(stream, "\n");
+}
+
+static void
+rlb_destroy(td_rlb_t *rlb)
+{
+       rlb_shutdown(rlb);
+
+       if (rlb->valve.data) {
+               rlb->valve.ops->destroy(rlb, rlb->valve.data);
+               rlb->valve.data = NULL;
+       }
+
+       if (rlb->name) {
+               free(rlb->name);
+               rlb->name = NULL;
+       }
+}
+
+static int
+rlb_create(td_rlb_t *rlb, const char *name)
+{
+       int i, err;
+
+       memset(rlb, 0, sizeof(*rlb));
+       INIT_LIST_HEAD(&rlb->open);
+       INIT_LIST_HEAD(&rlb->wait);
+       rlb->sock = -1;
+
+       for (i = RLB_CONN_MAX - 1; i >= 0; i--)
+               rlb_conn_free(rlb, &rlb->connv[i]);
+
+       rlb->name = strdup(name);
+       if (!rlb->name) {
+               err = -errno;
+               goto fail;
+       }
+
+       err = rlb_sock_open(rlb);
+       if (err)
+               goto fail;
+
+       gettimeofday(&rlb->now, NULL);
+
+       return 0;
+
+fail:
+       WARN("err = %d", err);
+       rlb_destroy(rlb);
+       return err;
+}
+
+static int
+rlb_create_valve(td_rlb_t *rlb, struct rlb_valve *v,
+                const char *name, int argc, char **argv)
+{
+       struct ratelimit_ops *ops;
+       int err;
+
+       ops = rlb_find_valve(name);
+       if (!ops) {
+               err = -ESRCH;
+               goto fail;
+       }
+
+       v->ops = ops;
+
+       err = v->ops->create(rlb, argc, argv, &v->data);
+
+fail:
+       return err;
+}
+
+static void
+rlb_openlog(const char *name, int facility)
+{
+       static char ident[32];
+
+       snprintf(ident, sizeof(ident), "%s[%d]", name, getpid());
+       ident[sizeof(ident)-1] = 0;
+
+       openlog(ident, 0, facility);
+
+       rlb_vlog = vsyslog;
+}
+
+int
+main(int argc, char **argv)
+{
+       td_rlb_t _rlb, *rlb;
+       const char *prog, *type;
+       int err;
+
+       setbuf(stdin, NULL);
+       setlinebuf(stderr);
+
+       rlb      = NULL;
+       prog     = basename(argv[0]);
+       type     = NULL;
+       rlb_vlog = rlb_vlog_vfprintf;
+
+       do {
+               const struct option longopts[] = {
+                       { "help",        0, NULL, 'h' },
+                       { "type",        1, NULL, 't' },
+                       { "debug",       0, NULL, 'D' },
+                       { NULL,          0, NULL,  0  },
+               };
+               int c;
+
+               c = getopt_long(argc, argv, "ht:D:", longopts, NULL);
+               if (c < 0)
+                       break;
+
+               switch (c) {
+               case 'h':
+                       rlb_usage(NULL, prog, stdout);
+                       return 0;
+
+               case 't':
+                       type = optarg;
+                       break;
+
+               case 'D':
+                       debug = strtoul(optarg, NULL, 0);
+                       break;
+
+               case '?':
+                       goto usage;
+
+               default:
+                       BUG();
+               }
+
+       } while (1);
+
+       if (!type)
+               goto usage;
+
+       if (argc - optind < 1)
+               goto usage;
+
+       err = rlb_siginit();
+       if (err)
+               goto fail;
+
+       err = rlb_create(&_rlb, argv[optind++]);
+       if (err)
+               goto fail;
+
+       rlb = &_rlb;
+
+       rlb_argv_shift(&optind, &argc, &argv);
+
+       err = rlb_create_valve(rlb, &rlb->valve, type, argc, argv);
+       if (err) {
+               if (err == -EINVAL)
+                       goto usage;
+               goto fail;
+       }
+
+       if (!debug) {
+               err = daemon(0, 0);
+               if (err)
+                       goto fail;
+
+               stdin = stdout = stderr = NULL;
+               rlb_openlog(prog, LOG_DAEMON);
+       }
+
+       INFO("TD ratelimit bridge: %s, pid %d", rlb->path, getpid());
+
+       rlb_info(rlb);
+
+       err = rlb_main_run(rlb);
+
+       if (err)
+               INFO("Exiting with status %d", -err);
+
+fail:
+       if (rlb)
+               rlb_destroy(rlb);
+
+       return -err;
+
+usage:
+       rlb_usage(rlb, prog, stderr);
+       err = -EINVAL;
+       goto fail;
+}
index a46062fa17459bac24b4e5bbe14fa8dd79df43a7..f9d199b6df04e0c1605961332f3ceead29fafc66 100644 (file)
@@ -49,6 +49,7 @@ rm -rf $RPM_BUILD_ROOT
 %{_sbindir}/tapdisk2
 %{_sbindir}/tap-ctl
 %{_sbindir}/td-util
+%{_sbindir}/td-rated
 %{_sbindir}/lock-util
 %{_sbindir}/vhd-update
 %{_sbindir}/vhd-util