From db5c23cb63733f8121e3a13709e9adca0cdeca19 Mon Sep 17 00:00:00 2001 From: Daniel Stodden Date: Tue, 15 Feb 2011 01:37:45 -0800 Subject: [PATCH] PR-1129: Rate limiting intro. Simple I/O rate limiting intro. Bridging resource utilization across a group of independent tapdisks. Comprising: - block-valve: Top-level filter issuing bandwith requests, deferring I/O to acknowldgement. Usage: valve:/path/to/sock/un - td-rated: Stand alone bridge process, listing to bandwidth requests, typically from valve:/ instances. Includes a plugin interface for various rate limiting algorithms. Algorithms (yet slightly experimental): - "Token Bucket". A classic, with some trivial modifications to promote batching. - "Meminfo". Watching /proc/meminfo for pagecache congestion. Signed-off-by: Daniel Stodden --- .hgignore | 1 + drivers/Makefile | 3 +- drivers/block-valve.c | 692 +++++++++++++++ drivers/block-valve.h | 41 + drivers/tapdisk-disktype.c | 8 + drivers/tapdisk-disktype.h | 1 + drivers/td-rated.c | 1709 ++++++++++++++++++++++++++++++++++++ mk/blktap.spec.in | 1 + 8 files changed, 2455 insertions(+), 1 deletion(-) create mode 100644 drivers/block-valve.c create mode 100644 drivers/block-valve.h create mode 100644 drivers/td-rated.c diff --git a/.hgignore b/.hgignore index 36f08e5..fb47d53 100644 --- a/.hgignore +++ b/.hgignore @@ -21,6 +21,7 @@ ^drivers/tapdisk-client$ ^drivers/tapdisk-diff$ ^drivers/tapdisk-stream$ +^drivers/td-rated$ ^vhd/vhd-index$ ^vhd/vhd-update$ ^vhd/vhd-util$ diff --git a/drivers/Makefile b/drivers/Makefile index 9c68fe4..f37639f 100644 --- a/drivers/Makefile +++ b/drivers/Makefile @@ -3,7 +3,7 @@ include $(BLKTAP_ROOT)/Rules.mk LIBVHDDIR = $(BLKTAP_ROOT)/vhd/lib -IBIN = tapdisk2 td-util tapdisk-stream tapdisk-diff +IBIN = tapdisk2 td-util td-rated tapdisk-stream tapdisk-diff LOCK_UTIL = lock-util INST_DIR = /usr/sbin @@ -54,6 +54,7 @@ BLK-OBJS := block-aio.o BLK-OBJS += block-ram.o BLK-OBJS += block-cache.o BLK-OBJS += block-vhd.o +BLK-OBJS += block-valve.o BLK-OBJS += block-vindex.o BLK-OBJS += block-lcache.o diff --git a/drivers/block-valve.c b/drivers/block-valve.c new file mode 100644 index 0000000..9c0ed74 --- /dev/null +++ b/drivers/block-valve.c @@ -0,0 +1,692 @@ +/* + * Copyright (c) 2010, Citrix Systems, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of XenSource Inc. nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include +#include +#include +#include +#include + +#include "tapdisk.h" +#include "tapdisk-driver.h" +#include "tapdisk-server.h" +#include "tapdisk-interface.h" + +#include "block-valve.h" + +typedef struct td_valve td_valve_t; +typedef struct td_valve_request td_valve_request_t; + +struct td_valve_request { + td_request_t treq; + int secs; + + struct list_head entry; + td_valve_t *valve; +}; + +struct td_valve_stats { + unsigned long long stor; + unsigned long long forw; +}; + +struct td_valve { + char *brname; + unsigned long flags; + + int sock; + event_id_t sock_id; + + event_id_t sched_id; + event_id_t retry_id; + + unsigned int cred; + unsigned int need; + unsigned int done; + + struct list_head stor; + struct list_head forw; + + td_valve_request_t reqv[MAX_REQUESTS]; + td_valve_request_t *free[MAX_REQUESTS]; + int n_free; + + struct td_valve_stats stats; +}; + +#define td_valve_for_each_stored_request(_req, _next, _valve) \ + list_for_each_entry_safe(_req, _next, &(_valve)->stor, entry) + +#define td_valve_for_each_forwarded_request(_req, _next, _valve) \ + list_for_each_entry_safe(_req, _next, &(_valve)->forw, entry) + +#define TD_VALVE_CONNECT_INTERVAL 2 /* s */ + +#define TD_VALVE_RDLIMIT (1<<0) +#define TD_VALVE_WRLIMIT (1<<1) +#define TD_VALVE_KILLED (1<<31) + +static void valve_schedule_retry(td_valve_t *); +static void valve_conn_receive(td_valve_t *); +static void valve_conn_request(td_valve_t *, unsigned long); +static void valve_forward_stored_requests(td_valve_t *); +static void valve_kill(td_valve_t *); + +#define DBG(_f, _a...) if (1) { tlog_syslog(TLOG_DBG, _f, ##_a); } +#define INFO(_f, _a...) tlog_syslog(TLOG_INFO, "valve: " _f, ##_a) +#define WARN(_f, _a...) tlog_syslog(TLOG_WARN, "WARNING: "_f " in %s:%d", \ + ##_a, __func__, __LINE__) +#define ERR(_f, _a...) tlog_syslog(TLOG_WARN, "ERROR: " _f " in %s:%d", \ + ##_a, __func__, __LINE__) +#define VERR(_err, _f, _a...) tlog_syslog(TLOG_WARN, \ + "ERROR: err=%d (%s), " _f ".", \ + _err, strerror(-(_err)), ##_a) +#undef PERROR +#define PERROR(_f, _a...) VERR(-errno, _f, ##_a) + +#define BUG() do { \ + ERR("Aborting"); \ + td_panic(); \ + } while (0) + +#define BUG_ON(_cond) \ + if (unlikely(_cond)) { \ + ERR("(%s) = %ld", #_cond, (long)(_cond)); \ + BUG(); \ + } + +#define WARN_ON(_cond) ({ \ + int __cond = _cond; \ + if (unlikely(__cond)) \ + WARN("(%s) = %ld", #_cond, (long)(_cond)); \ + __cond; \ +}) + +#define ARRAY_SIZE(_a) (sizeof(_a)/sizeof((_a)[0])) +#define TREQ_SIZE(_treq) ((unsigned int)(_treq.secs) << 9) + +static td_valve_request_t * +valve_alloc_request(td_valve_t *valve) +{ + td_valve_request_t *req = NULL; + + if (valve->n_free) + req = valve->free[--valve->n_free]; + + return req; +} + +static void +valve_free_request(td_valve_t *valve, td_valve_request_t *req) +{ + BUG_ON(valve->n_free >= ARRAY_SIZE(valve->free)); + list_del_init(&req->entry); + valve->free[valve->n_free++] = req; +} + +static void +__valve_sock_event(event_id_t id, char mode, void *private) +{ + td_valve_t *valve = private; + + valve_conn_receive(valve); + + valve_forward_stored_requests(valve); +} + +static void +valve_set_done_pending(td_valve_t *valve) +{ + WARN_ON(valve->done == 0); + tapdisk_server_mask_event(valve->sched_id, 0); +} + +static void +valve_clear_done_pending(td_valve_t *valve) +{ + WARN_ON(valve->done != 0); + tapdisk_server_mask_event(valve->sched_id, 1); +} + +static void +__valve_sched_event(event_id_t id, char mode, void *private) +{ + td_valve_t *valve = private; + + if (likely(valve->done > 0)) + /* flush valve->done */ + valve_conn_request(valve, 0); +} + +static void +valve_sock_close(td_valve_t *valve) +{ + if (valve->sock >= 0) { + close(valve->sock); + valve->sock = -1; + } + + if (valve->sock_id >= 0) { + tapdisk_server_unregister_event(valve->sock_id); + valve->sock_id = -1; + } + + if (valve->sched_id >= 0) { + tapdisk_server_unregister_event(valve->sched_id); + valve->sched_id = -1; + } +} + +static int +valve_sock_open(td_valve_t *valve) +{ + struct sockaddr_un addr = { .sun_family = AF_UNIX }; + int s, id, err; + + s = socket(AF_UNIX, SOCK_STREAM, 0); + if (s < 0) { + PERROR("socket"); + err = -errno; + goto fail; + } + + valve->sock = s; + + if (valve->brname[0] == '/') + snprintf(addr.sun_path, sizeof(addr.sun_path), + valve->brname); + else + snprintf(addr.sun_path, sizeof(addr.sun_path), + "%s/%s", TD_VALVE_SOCKDIR, valve->brname); + + err = connect(valve->sock, &addr, sizeof(addr)); + if (err) { + err = -errno; + goto fail; + } + + id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, + valve->sock, 0, + __valve_sock_event, + valve); + if (id < 0) { + err = id; + goto fail; + } + + valve->sock_id = id; + + id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, + -1, 0, + __valve_sched_event, + valve); + if (id < 0) { + err = id; + goto fail; + } + + valve->sched_id = id; + + INFO("Connected to %s", addr.sun_path); + + valve->cred = 0; + valve->need = 0; + valve->done = 0; + + valve_clear_done_pending(valve); + + return 0; + +fail: + valve_sock_close(valve); + return err; +} + +static int +valve_sock_send(td_valve_t *valve, const void *msg, size_t size) +{ + ssize_t n; + + n = send(valve->sock, msg, size, MSG_DONTWAIT); + if (n < 0) + return -errno; + if (n != size) + return -EPROTO; + + return 0; +} + +static int +valve_sock_recv(td_valve_t *valve, void *msg, size_t size) +{ + ssize_t n; + + n = recv(valve->sock, msg, size, MSG_DONTWAIT); + if (n < 0) + return -errno; + + return n; +} + +static void +__valve_retry_timeout(event_id_t id, char mode, void *private) +{ + td_valve_t *valve = private; + int err; + + err = valve_sock_open(valve); + if (!err) + tapdisk_server_unregister_event(valve->retry_id); +} + +static void +valve_schedule_retry(td_valve_t *valve) +{ + int id; + + BUG_ON(valve->sock_id >= 0); + + id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, + -1, TD_VALVE_CONNECT_INTERVAL, + __valve_retry_timeout, + valve); + BUG_ON(id < 0); + + valve->retry_id = id; +} + +static void +valve_conn_open(td_valve_t *valve) +{ + int err; + + BUG_ON(valve->flags & TD_VALVE_KILLED); + + err = valve_sock_open(valve); + if (err) { + WARN("%s: %s", valve->brname, strerror(-err)); + valve_schedule_retry(valve); + } +} + +static void +valve_conn_close(td_valve_t *valve, int reset) +{ + td_valve_request_t *req, *next; + + valve_sock_close(valve); + + if (reset) + td_valve_for_each_stored_request(req, next, valve) { + td_forward_request(req->treq); + valve->stats.forw++; + valve_free_request(valve, req); + } + + WARN_ON(!list_empty(&valve->stor)); +} + +static void +valve_conn_reset(td_valve_t *valve) +{ + valve_conn_close(valve, 1); + valve_conn_open(valve); +} + +void +valve_conn_receive(td_valve_t *valve) +{ + unsigned long buf[32], cred = 0; + ssize_t n; + int i, err; + + n = valve_sock_recv(valve, buf, sizeof(buf)); + if (!n) { + err = -ECONNRESET; + goto reset; + } + + if (n < 0) { + err = n; + if (err != -EAGAIN) + goto reset; + } + + for (i = 0; i < n / sizeof(buf[0]); i++) { + err = WARN_ON(buf[i] >= TD_RLB_REQUEST_MAX); + if (err) + goto kill; + + cred += buf[i]; + } + + if (cred > valve->need) { + err = -EINVAL; + goto reset; + } + + valve->cred += cred; + valve->need -= cred; + + return; + +reset: + VERR(err, "resetting connection"); + valve_conn_reset(valve); + return; + +kill: + ERR("Killing valve."); + valve_kill(valve); +} + +static void +valve_conn_request(td_valve_t *valve, unsigned long size) +{ + struct td_valve_req _req; + int err; + + _req.need = size; + _req.done = valve->done; + + valve->need += size; + valve->done = 0; + + valve_clear_done_pending(valve); + + err = valve_sock_send(valve, &_req, sizeof(_req)); + if (!err) + return; + + VERR(err, "resetting connection"); + valve_conn_reset(valve); +} + +static int +valve_expend_request(td_valve_t *valve, const td_request_t treq) +{ + if (valve->flags & TD_VALVE_KILLED) + return 0; + + if (valve->sock < 0) + return 0; + + if (valve->cred < TREQ_SIZE(treq)) + return -EAGAIN; + + valve->cred -= TREQ_SIZE(treq); + + return 0; +} + +static void +__valve_complete_treq(td_request_t treq, int error) +{ + td_valve_request_t *req = treq.cb_data; + td_valve_t *valve = req->valve; + + BUG_ON(req->secs < treq.secs); + req->secs -= treq.secs; + + valve->done += TREQ_SIZE(treq); + valve_set_done_pending(valve); + + if (!req->secs) { + td_complete_request(req->treq, error); + valve_free_request(valve, req); + } +} + +static void +valve_forward_stored_requests(td_valve_t *valve) +{ + td_valve_request_t *req, *next; + td_request_t clone; + int err; + + td_valve_for_each_stored_request(req, next, valve) { + + err = valve_expend_request(valve, req->treq); + if (err) + break; + + clone = req->treq; + clone.cb = __valve_complete_treq; + clone.cb_data = req; + + td_forward_request(clone); + valve->stats.forw++; + + list_move(&req->entry, &valve->forw); + } +} + +static int +valve_store_request(td_valve_t *valve, td_request_t treq) +{ + td_valve_request_t *req; + + req = valve_alloc_request(valve); + if (!req) + return -EBUSY; + + valve_conn_request(valve, TREQ_SIZE(treq)); + + req->treq = treq; + req->secs = treq.secs; + + list_add_tail(&req->entry, &valve->stor); + valve->stats.stor++; + + return 0; +} + +static void +valve_kill(td_valve_t *valve) +{ + valve->flags |= TD_VALVE_KILLED; + valve_conn_close(valve, 1); +} + +static void +valve_init(td_valve_t *valve, unsigned long flags) +{ + int i; + + memset(valve, 0, sizeof(*valve)); + + INIT_LIST_HEAD(&valve->stor); + INIT_LIST_HEAD(&valve->forw); + + valve->sock = -1; + valve->sock_id = -1; + + valve->retry_id = -1; + valve->sched_id = -1; + + valve->flags = flags; + + for (i = ARRAY_SIZE(valve->reqv) - 1; i >= 0; i--) { + td_valve_request_t *req = &valve->reqv[i]; + + req->valve = valve; + INIT_LIST_HEAD(&req->entry); + + valve_free_request(valve, req); + } +} + +static int +td_valve_close(td_driver_t *driver) +{ + td_valve_t *valve = driver->data; + + WARN_ON(!list_empty(&valve->stor)); + WARN_ON(!list_empty(&valve->forw)); + + valve_conn_close(valve, 0); + + if (valve->brname) { + free(valve->brname); + valve->brname = NULL; + } + + return 0; +} + +static int +td_valve_open(td_driver_t *driver, + const char *name, td_flag_t flags) +{ + td_valve_t *valve = driver->data; + int err; + + valve_init(valve, TD_VALVE_WRLIMIT); + + valve->brname = strdup(name); + if (!valve->brname) { + err = -errno; + goto fail; + } + + valve_conn_open(valve); + + return 0; + +fail: + td_valve_close(driver); + return err; +} + +static void +td_valve_queue_request(td_driver_t *driver, td_request_t treq) +{ + td_valve_t *valve = driver->data; + int err; + + switch (treq.op) { + + case TD_OP_READ: + if (valve->flags & TD_VALVE_RDLIMIT) + break; + + goto forward; + + case TD_OP_WRITE: + if (valve->flags & TD_VALVE_WRLIMIT) + break; + + goto forward; + + default: + BUG(); + } + + err = valve_expend_request(valve, treq); + if (!err) + goto forward; + + err = valve_store_request(valve, treq); + if (err) + td_complete_request(treq, -EBUSY); + + return; + +forward: + td_forward_request(treq); + valve->stats.forw++; +} + +static int +td_valve_get_parent_id(td_driver_t *driver, td_disk_id_t *id) +{ + return -EINVAL; +} + +static int +td_valve_validate_parent(td_driver_t *driver, + td_driver_t *parent_driver, td_flag_t flags) +{ + return -EINVAL; +} + +static void +td_valve_stats(td_driver_t *driver, td_stats_t *st) +{ + td_valve_t *valve = driver->data; + td_valve_request_t *req, *next; + int n_reqs; + + tapdisk_stats_field(st, "bridge", "d", valve->brname); + tapdisk_stats_field(st, "flags", "#x", valve->flags); + + tapdisk_stats_field(st, "cred", "d", valve->cred); + tapdisk_stats_field(st, "need", "d", valve->need); + tapdisk_stats_field(st, "done", "d", valve->done); + + /* + * stored is [ waiting, total-waits ] + */ + + n_reqs = 0; + td_valve_for_each_stored_request(req, next, valve) + n_reqs++; + + tapdisk_stats_field(st, "stor", "["); + tapdisk_stats_val(st, "d", n_reqs); + tapdisk_stats_val(st, "llu", valve->stats.stor); + tapdisk_stats_leave(st, ']'); + + /* + * forwarded is [ in-flight, total-requests ] + */ + + n_reqs = 0; + td_valve_for_each_forwarded_request(req, next, valve) + n_reqs++; + + tapdisk_stats_field(st, "forw", "["); + tapdisk_stats_val(st, "d", n_reqs); + tapdisk_stats_val(st, "llu", valve->stats.forw); + tapdisk_stats_leave(st, ']'); +} + +struct tap_disk tapdisk_valve = { + .disk_type = "tapdisk_valve", + .flags = 0, + .private_data_size = sizeof(td_valve_t), + .td_open = td_valve_open, + .td_close = td_valve_close, + .td_queue_read = td_valve_queue_request, + .td_queue_write = td_valve_queue_request, + .td_get_parent_id = td_valve_get_parent_id, + .td_validate_parent = td_valve_validate_parent, + .td_stats = td_valve_stats, +}; diff --git a/drivers/block-valve.h b/drivers/block-valve.h new file mode 100644 index 0000000..344c53c --- /dev/null +++ b/drivers/block-valve.h @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2011, Citrix Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of XenSource Inc. nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _TAPDISK_VALVE_H_ +#define _TAPDISK_VALVE_H_ + +#define TD_VALVE_SOCKDIR "/var/run/blktap/ratelimit" +#define TD_RLB_CONN_MAX 1024 +#define TD_RLB_REQUEST_MAX (8 << 20) + +struct td_valve_req { + unsigned long need; + unsigned long done; +}; + +#endif /* _TAPDISK_VALVE_H_ */ diff --git a/drivers/tapdisk-disktype.c b/drivers/tapdisk-disktype.c index 158cd5f..c1767ae 100644 --- a/drivers/tapdisk-disktype.c +++ b/drivers/tapdisk-disktype.c @@ -99,6 +99,11 @@ static const disk_info_t local_cache_disk = { 0, }; +static const disk_info_t valve_disk = { + "valve", + "group rate limiting (valve)", + 0, +}; const disk_info_t *tapdisk_disk_types[] = { [DISK_TYPE_AIO] = &aio_disk, @@ -112,6 +117,7 @@ const disk_info_t *tapdisk_disk_types[] = { [DISK_TYPE_VINDEX] = &vhd_index_disk, [DISK_TYPE_LOG] = &log_disk, [DISK_TYPE_LOCAL_CACHE] = &local_cache_disk, + [DISK_TYPE_VALVE] = &valve_disk, 0, }; @@ -132,6 +138,7 @@ extern struct tap_disk tapdisk_vhd_index; extern struct tap_disk tapdisk_log; #endif extern struct tap_disk tapdisk_local_cache; +extern struct tap_disk tapdisk_valve; const struct tap_disk *tapdisk_disk_drivers[] = { [DISK_TYPE_AIO] = &tapdisk_aio, @@ -151,6 +158,7 @@ const struct tap_disk *tapdisk_disk_drivers[] = { [DISK_TYPE_LOG] = &tapdisk_log, #endif [DISK_TYPE_LOCAL_CACHE] = &tapdisk_local_cache, + [DISK_TYPE_VALVE] = &tapdisk_valve, 0, }; diff --git a/drivers/tapdisk-disktype.h b/drivers/tapdisk-disktype.h index abcfd5e..cb5fedf 100644 --- a/drivers/tapdisk-disktype.h +++ b/drivers/tapdisk-disktype.h @@ -41,6 +41,7 @@ #define DISK_TYPE_LOG 9 #define DISK_TYPE_REMUS 10 #define DISK_TYPE_LOCAL_CACHE 11 +#define DISK_TYPE_VALVE 12 #define DISK_TYPE_NAME_MAX 32 diff --git a/drivers/td-rated.c b/drivers/td-rated.c new file mode 100644 index 0000000..8730fa1 --- /dev/null +++ b/drivers/td-rated.c @@ -0,0 +1,1709 @@ +/* + * Copyright (c) 2011, Citrix Systems. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of XenSource Inc. nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "block-valve.h" +#include "compiler.h" +#include "list.h" + +static void +rlb_vlog_vfprintf(int prio, const char *fmt, va_list ap) +{ + vfprintf(stderr, fmt, ap); fputc('\n', stderr); +} + +static void (*rlb_vlog)(int prio, const char *fmt, va_list ap); + +__printf(2, 3) +static void +rlb_log(int prio, const char *fmt, ...) +{ + va_list ap; + va_start(ap, fmt); rlb_vlog(prio, fmt, ap); va_end(ap); +} + +static int debug = 0; + +#define DBG(_l, _f, _a...) if (debug >= _l) { rlb_log(LOG_DEBUG, _f, ##_a); } +#define INFO(_f, _a...) rlb_log(LOG_INFO, _f, ##_a) +#define WARN(_f, _a...) rlb_log(LOG_WARNING, "WARNING: " _f " in %s:%d", \ + ##_a, __func__, __LINE__) +#define ERR(_f, _a...) rlb_log(LOG_ERR, "ERROR: " _f " in %s:%d", \ + ##_a, __func__, __LINE__) +#define PERROR(_f, _a...) rlb_log(LOG_ERR, _f ": %s in %s:%d", \ + ##_a, strerror(errno), __func__, __LINE__) + +#define BUG() do { \ + ERR("Aborting"); \ + abort(); \ + } while (0) + +#define BUG_ON(_cond) \ + if (unlikely(_cond)) { \ + ERR("(%s) = %d", #_cond, _cond); \ + BUG(); \ + } + +#define WARN_ON(_cond) ({ \ + int __cond = _cond; \ + if (unlikely(__cond)) \ + WARN("(%s) = %d", #_cond, _cond); \ + __cond; \ +}) + +#define MAX(a, b) ((a) > (b) ? (a) : (b)) +#define MIN(a, b) ((a) < (b) ? (a) : (b)) + +#define ARRAY_SIZE(_a) (sizeof(_a)/sizeof((_a)[0])) + +typedef struct ratelimit_bridge td_rlb_t; +typedef struct ratelimit_connection td_rlb_conn_t; + +struct ratelimit_connection { + int sock; + + unsigned long need; /* I/O requested */ + unsigned long gntd; /* I/O granted, pending */ + + struct list_head open; /* connected */ + struct list_head wait; /* need > 0 */ + + struct { + struct timeval since; + struct timeval total; + } wstat; +}; + +#define RLB_CONN_MAX 1024 + +struct ratelimit_ops { + void (*usage)(td_rlb_t *rlb, FILE *stream, void *data); + + int (*create)(td_rlb_t *rlb, int argc, char **argv, void **data); + void (*destroy)(td_rlb_t *rlb, void *data); + + void (*info)(td_rlb_t *rlb, void *data); + + void (*settimeo)(td_rlb_t *rlb, struct timeval **tv, void *data); + void (*timeout)(td_rlb_t *rlb, void *data); + void (*dispatch)(td_rlb_t *rlb, void *data); + void (*reset)(td_rlb_t *rlb, void *data); +}; + +struct ratelimit_bridge { + char *name; + char *ident; + + struct sockaddr_un addr; + char *path; + int sock; + + struct list_head open; /* all connections */ + struct list_head wait; /* all in need */ + + struct timeval ts, now; + + td_rlb_conn_t connv[RLB_CONN_MAX]; + td_rlb_conn_t *free[RLB_CONN_MAX]; + int n_free; + + struct rlb_valve { + struct ratelimit_ops *ops; + void *data; + } valve; +}; + +#define rlb_for_each_conn(_conn, _rlb) \ + list_for_each_entry(_conn, &(_rlb)->open, open) + +#define rlb_for_each_conn_safe(_conn, _next, _rlb) \ + list_for_each_entry_safe(_conn, _next, &(_rlb)->open, open) + +#define rlb_for_each_waiting(_conn, _next, _rlb) \ + list_for_each_entry(_conn, _next, &(_rlb)->wait, wait) + +#define rlb_for_each_waiting_safe(_conn, _next, _rlb) \ + list_for_each_entry_safe(_conn, _next, &(_rlb)->wait, wait) + +#define rlb_conn_entry(_list) \ + list_entry(_list, td_rlb_conn_t, open) + +#define rlb_wait_entry(_list) \ + list_entry(_list, td_rlb_conn_t, wait) + +static struct ratelimit_ops *rlb_find_valve(const char *name); + +static int rlb_create_valve(td_rlb_t *, struct rlb_valve *, + const char *name, int argc, char **argv); + +/* + * util + */ + +#define case_G case 'G': case 'g' +#define case_M case 'M': case 'm' +#define case_K case 'K': case 'k' + +static long +rlb_strtol(const char *s) +{ + unsigned long l, u = 1; + char *end, p, q; + + l = strtoul(s, &end, 0); + if (!*end) + return l; + + p = *end++; + + switch (p) { + case_G: case_M: case_K: + + q = *end++; + + switch (q) { + case 'i': + switch (p) { + case_G: + u *= 1000; + case_M: + u *= 1000; + case_K: + u *= 1000; + } + break; + + case 0: + switch (p) { + case_G: + u *= 1024; + case_M: + u *= 1024; + case_K: + u *= 1024; + } + break; + + default: + goto fail; + } + break; + + case 0: + break; + + default: + goto fail; + } + + return l * u; + +fail: + return -EINVAL; +} + +static char* +vmprintf(const char *fmt, va_list ap) +{ + char *s; + int n; + + n = vasprintf(&s, fmt, ap); + if (n < 0) + s = NULL; + + return s; +} + +__printf(1, 2) +static char* +mprintf(const char *fmt, ...) +{ + va_list ap; + char *s; + + va_start(ap, fmt); + s = vmprintf(fmt, ap); + va_end(ap); + + return s; +} + +static int +sysctl_vscanf(const char *name, const char *fmt, va_list ap) +{ + char *path = NULL; + FILE *s = NULL; + int rv; + + path = mprintf("/proc/sys/%s", name); + if (!path) { + rv = -errno; + goto fail; + } + + s = fopen(path, "r"); + if (!s) { + rv = -errno; + goto fail; + } + + rv = vfscanf(s, fmt, ap); +fail: + if (s) + fclose(s); + + if (path) + free(path); + + return rv; +} + +static int +sysctl_scanf(const char *name, const char *fmt, ...) +{ + va_list(ap); + int rv; + + va_start(ap, fmt); + rv = sysctl_vscanf(name, fmt, ap); + va_end(ap); + + return rv; +} + +static long +sysctl_strtoul(const char *name) +{ + unsigned val; + int n; + + n = sysctl_scanf(name, "%lu", &val); + if (n < 0) + return n; + if (n != 1) + return -EINVAL; + + return val; +} + + +static long long +rlb_tv_usec(const struct timeval *tv) +{ + long long us; + + us = tv->tv_sec; + us *= 1000000; + us += tv->tv_usec; + + return us; +} + +static long long +rlb_usec_since(td_rlb_t *rlb, const struct timeval *since) +{ + struct timeval delta; + + timersub(&rlb->now, since, &delta); + + return rlb_tv_usec(&delta); +} + +static inline void +rlb_argv_shift(int *optind, int *argc, char ***argv) +{ + /* reset optind and args after '--' */ + + *optind -= 1; + + *argc -= *optind; + *argv += *optind; + + *optind = 1; +} + +/* + * socket I/O + */ + +static void +rlb_sock_close(td_rlb_t *rlb) +{ + if (rlb->path) { + unlink(rlb->path); + rlb->path = NULL; + } + + if (rlb->sock >= 0) { + close(rlb->sock); + rlb->sock = -1; + } +} + +static int +rlb_sock_open(td_rlb_t *rlb) +{ + int s, err; + + rlb->sock = -1; + + s = socket(AF_UNIX, SOCK_STREAM, 0); + if (s < 0) { + PERROR("socket"); + err = -errno; + goto fail; + } + + rlb->sock = s; + + rlb->addr.sun_family = AF_UNIX; + snprintf(rlb->addr.sun_path, sizeof(rlb->addr.sun_path), + "%s/%s", TD_VALVE_SOCKDIR, rlb->name); + + err = bind(rlb->sock, &rlb->addr, sizeof(rlb->addr)); + if (err) { + PERROR("%s", rlb->addr.sun_path); + err = -errno; + goto fail; + } + + rlb->path = rlb->addr.sun_path; + + err = listen(rlb->sock, RLB_CONN_MAX); + if (err) { + PERROR("listen(%s)", rlb->addr.sun_path); + err = -errno; + goto fail; + } + + return 0; + +fail: + rlb_sock_close(rlb); + return err; +} + +static int +rlb_sock_send(td_rlb_t *rlb, td_rlb_conn_t *conn, + const void *msg, size_t size) +{ + ssize_t n; + + n = send(conn->sock, msg, size, MSG_DONTWAIT); + if (n < 0) + return -errno; + if (n && n != size) + return -EPROTO; + + return 0; +} + +static int +rlb_sock_recv(td_rlb_t *rlb, td_rlb_conn_t *conn, + void *msg, size_t size) +{ + ssize_t n; + + n = recv(conn->sock, msg, size, MSG_DONTWAIT); + if (n < 0) + return -errno; + + return n; +} + +static td_rlb_conn_t * +rlb_conn_alloc(td_rlb_t *rlb) +{ + td_rlb_conn_t *conn = NULL; + + if (likely(rlb->n_free > 0)) + conn = rlb->free[--rlb->n_free]; + + return conn; +} + +static void +rlb_conn_free(td_rlb_t *rlb, td_rlb_conn_t *conn) +{ + BUG_ON(rlb->n_free >= RLB_CONN_MAX); + + rlb->free[rlb->n_free++] = conn; +} + +static int +rlb_conn_id(td_rlb_t *rlb, td_rlb_conn_t *conn) +{ + return conn - rlb->connv; +} + +static void +rlb_conn_info(td_rlb_t *rlb, td_rlb_conn_t *conn) +{ + long long wtime; + int waits; + + wtime = 0; + waits = !list_empty(&conn->wait); + if (waits) + wtime = rlb_usec_since(rlb, &conn->wstat.since) / 1000; + + WARN_ON(!!conn->need != waits); + + INFO("conn[%d] needs %lu (since %llu ms, total %lu.%06lu s)," + " %lu granted", + rlb_conn_id(rlb, conn), conn->need, wtime, + conn->wstat.total.tv_sec, conn->wstat.total.tv_usec, + conn->gntd); +} + +static void +rlb_conn_infos(td_rlb_t *rlb) +{ + td_rlb_conn_t *conn; + + rlb_for_each_conn(conn, rlb) + rlb_conn_info(rlb, conn); +} + +static void +rlb_conn_close(td_rlb_t *rlb, td_rlb_conn_t *conn) +{ + int s = conn->sock; + + INFO("Connection %d closed.", rlb_conn_id(rlb, conn)); + rlb_conn_info(rlb, conn); + + if (s) { + close(s); + conn->sock = -1; + } + + list_del_init(&conn->wait); + list_del(&conn->open); + + rlb_conn_free(rlb, conn); +} + +static void +rlb_conn_receive(td_rlb_t *rlb, td_rlb_conn_t *conn) +{ + struct td_valve_req buf[32], req = { -1, -1 }; + ssize_t n; + int i, err; + + n = rlb_sock_recv(rlb, conn, buf, sizeof(buf)); + if (!n) + goto close; + + if (n < 0) { + err = n; + if (err != -EAGAIN) + goto fail; + } + + if (unlikely(n % sizeof(req))) { + err = -EPROTO; + goto fail; + } + + for (i = 0; i < n / sizeof(buf[0]); i++) { + req = buf[i]; + + if (unlikely(req.need > TD_RLB_REQUEST_MAX)) { + err = -EINVAL; + goto fail; + } + + if (unlikely(req.done > conn->gntd)) { + err = -EINVAL; + goto fail; + } + + conn->need += req.need; + conn->gntd -= req.done; + + DBG(8, "rcv: %lu/%lu need=%lu gntd=%lu", + req.need, req.done, conn->need, conn->gntd); + + if (unlikely(conn->need > TD_RLB_REQUEST_MAX)) { + err = -EINVAL; + goto fail; + } + } + + if (conn->need && list_empty(&conn->wait)) { + list_add_tail(&conn->wait, &rlb->wait); + conn->wstat.since = rlb->now; + } + + return; + +fail: + WARN("err = %d (%s)" + " (need %ld/%ld, %ld/%ld done)," + " closing connection.", + err, strerror(-err), + req.need, conn->need, req.done, conn->gntd); + + rlb_conn_info(rlb, conn); +close: + rlb_conn_close(rlb, conn); +} + +static void +rlb_conn_respond(td_rlb_t *rlb, td_rlb_conn_t *conn, unsigned long need) +{ + int err; + + BUG_ON(need > conn->need); + + err = rlb_sock_send(rlb, conn, &need, sizeof(need)); + if (err) + goto fail; + + conn->need -= need; + conn->gntd += need; + + DBG(8, "snd: %lu need=%lu gntd=%lu", need, conn->need, conn->gntd); + + if (!conn->need) { + struct timeval delta; + + timersub(&rlb->now, &conn->wstat.since, &delta); + timeradd(&conn->wstat.total, &delta, &conn->wstat.total); + + list_del_init(&conn->wait); + } + + return; + +fail: + WARN("err = %d, killing connection.", err); + rlb_conn_close(rlb, conn); +} + +static void +rlb_accept_conn(td_rlb_t *rlb) +{ + td_rlb_conn_t *conn; + int s, err; + + s = accept(rlb->sock, NULL, NULL); + if (!s) { + err = -errno; + goto fail; + } + + conn = rlb_conn_alloc(rlb); + if (!conn) { + err = -ENOMEM; + close(s); + goto fail; + } + + INFO("Accepting connection %d.", conn - rlb->connv); + + memset(conn, 0, sizeof(*conn)); + INIT_LIST_HEAD(&conn->wait); + conn->sock = s; + list_add_tail(&conn->open, &rlb->open); + + return; + +fail: + WARN("err = %d", err); +} + +static long long +rlb_pending(td_rlb_t *rlb) +{ + td_rlb_conn_t *conn; + long long pend = 0; + + rlb_for_each_conn(conn, rlb) + pend += conn->gntd; + + return pend; +} + +/* + * token bucket valve + */ + +typedef struct ratelimit_token td_rlb_token_t; + +struct ratelimit_token { + long cred; + long cap; + long rate; + struct timeval timeo; +}; + +static void +rlb_token_settimeo(td_rlb_t *rlb, struct timeval **_tv, void *data) +{ + td_rlb_token_t *token = data; + struct timeval *tv = &token->timeo; + long long us; + + if (list_empty(&rlb->wait)) { + *_tv = NULL; + return; + } + + WARN_ON(token->cred >= 0); + + us = -token->cred; + us *= 1000000; + us /= token->rate; + + tv->tv_sec = us / 1000000; + tv->tv_usec = us % 1000000; + + WARN_ON(!timerisset(tv)); + + *_tv = tv; +} + +static void +rlb_token_refill(td_rlb_t *rlb, td_rlb_token_t *token) +{ + struct timeval tv; + long long cred, max_usec; + + /* max time needed to refill up to cap */ + + max_usec = token->cap - token->cred; + max_usec *= 1000000; + max_usec += token->rate - 1; + max_usec /= token->rate; + + /* actual credit gained */ + + timersub(&rlb->now, &rlb->ts, &tv); + + cred = rlb_tv_usec(&tv); + cred = MIN(cred, max_usec); + cred *= token->rate; + cred /= 1000000; + + /* up to cap */ + + token->cred += cred; + token->cred = MIN(token->cred, token->cap); +} + +static void +rlb_token_dispatch(td_rlb_t *rlb, void *data) +{ + td_rlb_token_t *token = data; + td_rlb_conn_t *conn, *next; + + rlb_token_refill(rlb, token); + + rlb_for_each_waiting_safe(conn, next, rlb) { + if (token->cred < 0) + break; + + token->cred -= conn->need; + + rlb_conn_respond(rlb, conn, conn->need); + } +} + +static void +rlb_token_reset(td_rlb_t *rlb, void *data) +{ + td_rlb_token_t *token = data; + + token->cred = token->cap; +} + +static void +rlb_token_destroy(td_rlb_t *rlb, void *data) +{ + td_rlb_token_t *token = data; + + if (token) + free(token); +} + +static int +rlb_token_create(td_rlb_t *rlb, int argc, char **argv, void **data) +{ + td_rlb_token_t *token; + int err; + + token = calloc(1, sizeof(*token)); + if (!token) { + err = -ENOMEM; + goto fail; + } + + token->rate = 0; + token->cap = 0; + + do { + const struct option longopts[] = { + { "rate", 1, NULL, 'r' }, + { "cap", 1, NULL, 'c' }, + { NULL, 0, NULL, 0 } + }; + int c; + + c = getopt_long(argc, argv, "r:c:", longopts, NULL); + if (c < 0) + break; + + switch (c) { + case 'r': + token->rate = rlb_strtol(optarg); + if (token->rate < 0) { + ERR("invalid --rate"); + goto usage; + } + break; + + case 'c': + token->cap = rlb_strtol(optarg); + if (token->cap < 0) { + ERR("invalid --cap"); + goto usage; + } + break; + + case '?': + goto usage; + + default: + BUG(); + } + } while (1); + + if (!token->rate) { + ERR("--rate required"); + goto usage; + } + + rlb_token_reset(rlb, token); + + *data = token; + + return 0; + +fail: + if (token) + free(token); + + return err; + +usage: + err = -EINVAL; + goto fail; +} + +static void +rlb_token_usage(td_rlb_t *rlb, FILE *stream, void *data) +{ + fprintf(stream, + " {-t|--type}=token --" + " {-r|--rate}=" + " {-c|--cap}="); +} + +static void +rlb_token_info(td_rlb_t *rlb, void *data) +{ + td_rlb_token_t *token = data; + + INFO("TOKEN: rate: %ld B/s cap: %ld B cred: %ld B", + token->rate, token->cap, token->cred); +} + +static struct ratelimit_ops rlb_token_ops = { + .usage = rlb_token_usage, + .create = rlb_token_create, + .destroy = rlb_token_destroy, + .info = rlb_token_info, + + .settimeo = rlb_token_settimeo, + .timeout = rlb_token_dispatch, + .dispatch = rlb_token_dispatch, + .reset = rlb_token_reset, +}; + +/* + * meminfo valve + */ + +typedef struct ratelimit_meminfo td_rlb_meminfo_t; + +struct ratelimit_meminfo { + unsigned int period; + struct timeval ts; + + FILE *s; + + unsigned long total; + unsigned long dirty; + unsigned long writeback; + + unsigned int limit_hi; + unsigned int limit_lo; + unsigned int congested; + + struct rlb_valve valve; + struct timeval timeo; +}; + +static void +rlb_meminfo_info(td_rlb_t *rlb, void *data) +{ + td_rlb_meminfo_t *m = data; + + INFO("MEMINFO: lo/hi: %u/%u%% period: %u ms", + m->limit_lo, m->limit_hi, m->period); + + INFO("MEMINFO: total %lu kB, dirty/writeback %lu/%lu kB", + m->total, m->dirty, m->writeback); + + m->valve.ops->info(rlb, m->valve.data); +} + +static void +rlb_meminfo_close(td_rlb_meminfo_t *m) +{ + if (m->s) { + fclose(m->s); + m->s = NULL; + } +} + +static int +rlb_meminfo_open(td_rlb_meminfo_t *m) +{ + FILE *s; + int err; + + m->s = NULL; + + s = fopen("/proc/meminfo", "r"); + if (!s) { + err = -errno; + goto fail; + } + + m->s = s; + + return 0; + +fail: + rlb_meminfo_close(m); + return err; +} + +static inline int __test_bit(int n, unsigned long *bitmap) +{ + return !!(*bitmap & (1UL<s); + if (!b) + break; + + for (i = 0; i < n_keys; i++) { + struct ratelimit_meminfo_scan *scan; + unsigned long val, *ptr; + int n; + + if (!__test_bit(i, &pending)) + continue; + + scan = &rlb_meminfo_scanfs[i]; + + n = sscanf(buf, scan->format, &val); + if (n != 1) + continue; + + ptr = (void*)m + scan->ptrdiff; + *ptr = val; + + __clear_bit(i, &pending); + } + + } while (pending); + + if (pending) { + err = -ESRCH; + goto fail; + } + + err = 0; +fail: + rlb_meminfo_close(m); + return err; +} + +static void +rlb_meminfo_usage(td_rlb_t *rlb, FILE *stream, void *data) +{ + td_rlb_meminfo_t *m = data; + + fprintf(stream, + " {-t|--type}=meminfo " + " {-H|--high}= {-l|--low}=" + " {-p|--period}= --"); + + if (m && m->valve.ops) { + m->valve.ops->usage(rlb, stream, m->valve.data); + } else + fprintf(stream, " {-t|--type}={...}"); +} + +static void +rlb_meminfo_destroy(td_rlb_t *rlb, void *data) +{ + td_rlb_meminfo_t *m = data; + + if (m) { + if (m->valve.data) { + m->valve.ops->destroy(rlb, m->valve.data); + m->valve.data = NULL; + } + + free(m); + } +} + +static int +rlb_meminfo_create(td_rlb_t *rlb, int argc, char **argv, void **data) +{ + td_rlb_meminfo_t *m; + const char *type; + long dbr; + int err; + + m = calloc(1, sizeof(*m)); + if (!m) { + err = -errno; + goto fail; + } + + type = NULL; + m->period = 100; + + do { + const struct option longopts[] = { + { "period", 1, NULL, 'p' }, + { "type", 1, NULL, 't' }, + { "high", 1, NULL, 'H' }, + { "low", 1, NULL, 'L' }, + { NULL, 0, NULL, 0 } + }; + int c; + + c = getopt_long(argc, argv, "p:t:H:L:", longopts, NULL); + if (c < 0) + break; + + switch (c) { + case 'p': + m->period = rlb_strtol(optarg); + if (m->period < 0) + goto usage; + break; + + case 'H': + m->limit_hi = strtoul(optarg, NULL, 0); + break; + + case 'L': + m->limit_lo = strtoul(optarg, NULL, 0); + break; + + case 't': + type = optarg; + break; + + case '?': + goto usage; + + default: + BUG(); + } + } while (1); + + if (!type) { + ERR("--type required"); + goto usage; + } + + if (!m->limit_hi || !m->limit_lo) { + ERR("--high/--low required"); + goto usage; + } + + if (m->limit_lo >= m->limit_hi) { + ERR("invalid --high/--low ratio"); + goto usage; + } + + dbr = sysctl_strtoul("vm/dirty_background_ratio"); + if (dbr < 0) { + err = dbr; + goto fail; + } + + if (0 && m->limit_lo < dbr) { + ERR("--low %u is less than vm.dirty_background_ratio (= %ld)", + m->limit_lo, dbr); + err = -EINVAL; + goto fail; + } + + *data = m; + + rlb_argv_shift(&optind, &argc, &argv); + + err = rlb_create_valve(rlb, &m->valve, type, argc, argv); + if (err) { + if (err == -EINVAL) + goto usage; + goto fail; + } + + err = rlb_meminfo_scan(m); + if (err) + goto fail; + + return 0; + +fail: + rlb_meminfo_destroy(rlb, m); + WARN("err = %d", err); + return err; + +usage: + err = -EINVAL; + return err; +}; + +static void +rlb_meminfo_settimeo(td_rlb_t *rlb, struct timeval **_tv, void *data) +{ + td_rlb_meminfo_t *m = data; + int idle; + + idle = list_empty(&rlb->wait); + BUG_ON(!idle && !m->congested); + + if (m->congested) { + m->valve.ops->settimeo(rlb, _tv, m->valve.data); + return; + } + + *_tv = NULL; +} + +static void +rlb_meminfo_timeout(td_rlb_t *rlb, void *data) +{ + td_rlb_meminfo_t *m = data; + + WARN_ON(!m->congested); + + if (m->congested) + m->valve.ops->timeout(rlb, m->valve.data); +} + +static int +rlb_meminfo_test_high(td_rlb_t *rlb, td_rlb_meminfo_t *m, long long cred) +{ + long long lo; + + if (m->congested) { + /* hysteresis */ + + lo = m->total; + lo *= m->limit_lo; + lo /= 100; + + if (cred >= lo) + return 0; + + } else + if (cred <= 0) { + m->valve.ops->reset(rlb, m->valve.data); + return 1; + } + + return m->congested; +} + +static void +rlb_meminfo_dispatch_low(td_rlb_t *rlb, td_rlb_meminfo_t *m, + long long *_cred) +{ + td_rlb_conn_t *conn, *next; + long long cred = *_cred, grant; + + rlb_for_each_waiting_safe(conn, next, rlb) { + + if (cred <= 0) + break; + + grant = MIN(cred, conn->need); + + rlb_conn_respond(rlb, conn, grant); + + cred -= grant; + } + + *_cred = cred; +} + +static void +rlb_meminfo_dispatch(td_rlb_t *rlb, void *data) +{ + td_rlb_meminfo_t *m = data; + long long us, hi, cred, dirty, pend; + + /* we run only once per m->period */ + + us = rlb_usec_since(rlb, &m->ts); + if (us / 1000 > m->period) { + rlb_meminfo_scan(m); + m->ts = rlb->now; + } + + /* uncongested credit: + memory below hi watermark minus pending I/O */ + + hi = m->total; + hi *= m->limit_hi; + hi /= 100; + + dirty = m->dirty + m->writeback; + + cred = hi - dirty; + cred *= 1000; + + pend = rlb_pending(rlb); + cred -= pend; + + m->congested = rlb_meminfo_test_high(rlb, m, cred); + + DBG(3, "dirty=%lld (%lld) pend=%llu cred=%lld %s", + dirty, dirty * 100 / m->total, pend, cred, + m->congested ? "congested" : ""); + + if (!m->congested) { + rlb_meminfo_dispatch_low(rlb, m, &cred); + + m->congested = rlb_meminfo_test_high(rlb, m, cred); + } + + if (m->congested) + m->valve.ops->dispatch(rlb, m->valve.data); +} + +static struct ratelimit_ops rlb_meminfo_ops = { + .usage = rlb_meminfo_usage, + .create = rlb_meminfo_create, + .destroy = rlb_meminfo_destroy, + .info = rlb_meminfo_info, + + .settimeo = rlb_meminfo_settimeo, + .timeout = rlb_meminfo_timeout, + .dispatch = rlb_meminfo_dispatch, +}; + +/* + * main loop + */ + +static void +rlb_info(td_rlb_t *rlb) +{ + rlb->valve.ops->info(rlb, rlb->valve.data); + + rlb_conn_infos(rlb); +} + +static sigset_t rlb_sigunblock; +static sigset_t rlb_sigpending; + +static void +rlb_sigmark(int signo) +{ + INFO("Caught SIG%d", signo); + sigaddset(&rlb_sigpending, signo); +} + +static int +rlb_siginit(void) +{ + struct sigaction sa_ignore = { .sa_handler = SIG_IGN }; + struct sigaction sa_pending = { .sa_handler = rlb_sigmark }; + sigset_t sigmask; + int err = 0; + + if (!err) + err = sigaction(SIGPIPE, &sa_ignore, NULL); + if (!err) + err = sigaction(SIGINT, &sa_pending, NULL); + if (!err) + err = sigaction(SIGTERM, &sa_pending, NULL); + if (!err) + err = sigaction(SIGUSR1, &sa_pending, NULL); + if (err) { + err = -errno; + goto fail; + } + + sigemptyset(&sigmask); + sigaddset(&sigmask, SIGINT); + sigaddset(&sigmask, SIGTERM); + sigaddset(&sigmask, SIGUSR1); + + err = sigprocmask(SIG_BLOCK, &sigmask, &rlb_sigunblock); + if (err) { + err = -errno; + goto fail; + } + +fail: + return err; +} + +static int +rlb_main_signaled(td_rlb_t *rlb) +{ + if (sigismember(&rlb_sigpending, SIGUSR1)) + rlb_info(rlb); + + if (sigismember(&rlb_sigpending, SIGINT) || + sigismember(&rlb_sigpending, SIGTERM)) + return -EINTR; + + return 0; +} + + +static struct ratelimit_ops * +rlb_find_valve(const char *name) +{ + struct ratelimit_ops *ops = NULL; + + switch (name[0]) { +#if 0 + case 'l': + if (!strcmp(name, "leaky")) + ops = &rlb_leaky_ops; + break; +#endif + + case 't': + if (!strcmp(name, "token")) + ops = &rlb_token_ops; + break; + + case 'm': + if (!strcmp(name, "meminfo")) + ops = &rlb_meminfo_ops; + break; + } + + return ops; +} + +static int +rlb_main_iterate(td_rlb_t *rlb) +{ + td_rlb_conn_t *conn, *next; + struct timeval *tv; + struct timespec _ts, *ts = &_ts; + int nfds, err; + fd_set rfds; + + FD_ZERO(&rfds); + nfds = 0; + + if (stdin) { + FD_SET(STDIN_FILENO, &rfds); + nfds = MAX(nfds, STDIN_FILENO); + } + + if (rlb->sock >= 0) { + FD_SET(rlb->sock, &rfds); + nfds = MAX(nfds, rlb->sock); + } + + rlb_for_each_conn(conn, rlb) { + FD_SET(conn->sock, &rfds); + nfds = MAX(nfds, conn->sock); + } + + rlb->valve.ops->settimeo(rlb, &tv, rlb->valve.data); + if (tv) { + TIMEVAL_TO_TIMESPEC(tv, ts); + } else + ts = NULL; + + rlb->ts = rlb->now; + + nfds = pselect(nfds + 1, &rfds, NULL, NULL, ts, &rlb_sigunblock); + if (nfds < 0) { + err = -errno; + if (err != -EINTR) + PERROR("select"); + goto fail; + } + + gettimeofday(&rlb->now, NULL); + + if (!nfds) { + BUG_ON(!ts); + rlb->valve.ops->timeout(rlb, rlb->valve.data); + } + + if (nfds) { + rlb_for_each_conn_safe(conn, next, rlb) + if (FD_ISSET(conn->sock, &rfds)) { + rlb_conn_receive(rlb, conn); + if (!--nfds) + break; + } + + rlb->valve.ops->dispatch(rlb, rlb->valve.data); + } + + if (unlikely(nfds)) { + if (FD_ISSET(STDIN_FILENO, &rfds)) { + getc(stdin); + rlb_info(rlb); + nfds--; + } + } + + if (unlikely(nfds)) { + if (FD_ISSET(rlb->sock, &rfds)) { + rlb_accept_conn(rlb); + nfds--; + } + } + + BUG_ON(nfds); + err = 0; +fail: + return err; +} + +static int +rlb_main_run(td_rlb_t *rlb) +{ + int err; + + do { + err = rlb_main_iterate(rlb); + if (err) { + if (err != -EINTR) + break; + + err = rlb_main_signaled(rlb); + if (err) { + err = 0; + break; + } + } + + } while (rlb->sock >= 0 || !list_empty(&rlb->open)); + + return err; +} + +static void +rlb_shutdown(td_rlb_t *rlb) +{ + td_rlb_conn_t *conn, *next; + + rlb_for_each_conn_safe(conn, next, rlb) + rlb_conn_close(rlb, conn); + + rlb_sock_close(rlb); +} + +static void +rlb_usage(td_rlb_t *rlb, const char *prog, FILE *stream) +{ + fprintf(stream, "Usage: %s ", prog); + + if (rlb && rlb->valve.ops) + rlb->valve.ops->usage(rlb, stream, rlb->valve.data); + else + fprintf(stream, + " {-t|--type}={token|meminfo}" + " [-h|--help] [-D|--debug=]"); + + fprintf(stream, "\n"); +} + +static void +rlb_destroy(td_rlb_t *rlb) +{ + rlb_shutdown(rlb); + + if (rlb->valve.data) { + rlb->valve.ops->destroy(rlb, rlb->valve.data); + rlb->valve.data = NULL; + } + + if (rlb->name) { + free(rlb->name); + rlb->name = NULL; + } +} + +static int +rlb_create(td_rlb_t *rlb, const char *name) +{ + int i, err; + + memset(rlb, 0, sizeof(*rlb)); + INIT_LIST_HEAD(&rlb->open); + INIT_LIST_HEAD(&rlb->wait); + rlb->sock = -1; + + for (i = RLB_CONN_MAX - 1; i >= 0; i--) + rlb_conn_free(rlb, &rlb->connv[i]); + + rlb->name = strdup(name); + if (!rlb->name) { + err = -errno; + goto fail; + } + + err = rlb_sock_open(rlb); + if (err) + goto fail; + + gettimeofday(&rlb->now, NULL); + + return 0; + +fail: + WARN("err = %d", err); + rlb_destroy(rlb); + return err; +} + +static int +rlb_create_valve(td_rlb_t *rlb, struct rlb_valve *v, + const char *name, int argc, char **argv) +{ + struct ratelimit_ops *ops; + int err; + + ops = rlb_find_valve(name); + if (!ops) { + err = -ESRCH; + goto fail; + } + + v->ops = ops; + + err = v->ops->create(rlb, argc, argv, &v->data); + +fail: + return err; +} + +static void +rlb_openlog(const char *name, int facility) +{ + static char ident[32]; + + snprintf(ident, sizeof(ident), "%s[%d]", name, getpid()); + ident[sizeof(ident)-1] = 0; + + openlog(ident, 0, facility); + + rlb_vlog = vsyslog; +} + +int +main(int argc, char **argv) +{ + td_rlb_t _rlb, *rlb; + const char *prog, *type; + int err; + + setbuf(stdin, NULL); + setlinebuf(stderr); + + rlb = NULL; + prog = basename(argv[0]); + type = NULL; + rlb_vlog = rlb_vlog_vfprintf; + + do { + const struct option longopts[] = { + { "help", 0, NULL, 'h' }, + { "type", 1, NULL, 't' }, + { "debug", 0, NULL, 'D' }, + { NULL, 0, NULL, 0 }, + }; + int c; + + c = getopt_long(argc, argv, "ht:D:", longopts, NULL); + if (c < 0) + break; + + switch (c) { + case 'h': + rlb_usage(NULL, prog, stdout); + return 0; + + case 't': + type = optarg; + break; + + case 'D': + debug = strtoul(optarg, NULL, 0); + break; + + case '?': + goto usage; + + default: + BUG(); + } + + } while (1); + + if (!type) + goto usage; + + if (argc - optind < 1) + goto usage; + + err = rlb_siginit(); + if (err) + goto fail; + + err = rlb_create(&_rlb, argv[optind++]); + if (err) + goto fail; + + rlb = &_rlb; + + rlb_argv_shift(&optind, &argc, &argv); + + err = rlb_create_valve(rlb, &rlb->valve, type, argc, argv); + if (err) { + if (err == -EINVAL) + goto usage; + goto fail; + } + + if (!debug) { + err = daemon(0, 0); + if (err) + goto fail; + + stdin = stdout = stderr = NULL; + rlb_openlog(prog, LOG_DAEMON); + } + + INFO("TD ratelimit bridge: %s, pid %d", rlb->path, getpid()); + + rlb_info(rlb); + + err = rlb_main_run(rlb); + + if (err) + INFO("Exiting with status %d", -err); + +fail: + if (rlb) + rlb_destroy(rlb); + + return -err; + +usage: + rlb_usage(rlb, prog, stderr); + err = -EINVAL; + goto fail; +} diff --git a/mk/blktap.spec.in b/mk/blktap.spec.in index a46062f..f9d199b 100644 --- a/mk/blktap.spec.in +++ b/mk/blktap.spec.in @@ -49,6 +49,7 @@ rm -rf $RPM_BUILD_ROOT %{_sbindir}/tapdisk2 %{_sbindir}/tap-ctl %{_sbindir}/td-util +%{_sbindir}/td-rated %{_sbindir}/lock-util %{_sbindir}/vhd-update %{_sbindir}/vhd-util -- 2.39.5