--- /dev/null
+/*
+ * Copyright (c) 2010, Citrix Systems, Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of XenSource Inc. nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+
+#include "tapdisk.h"
+#include "tapdisk-driver.h"
+#include "tapdisk-server.h"
+#include "tapdisk-interface.h"
+
+#include "block-valve.h"
+
+typedef struct td_valve td_valve_t;
+typedef struct td_valve_request td_valve_request_t;
+
+struct td_valve_request {
+ td_request_t treq;
+ int secs;
+
+ struct list_head entry;
+ td_valve_t *valve;
+};
+
+struct td_valve_stats {
+ unsigned long long stor;
+ unsigned long long forw;
+};
+
+struct td_valve {
+ char *brname;
+ unsigned long flags;
+
+ int sock;
+ event_id_t sock_id;
+
+ event_id_t sched_id;
+ event_id_t retry_id;
+
+ unsigned int cred;
+ unsigned int need;
+ unsigned int done;
+
+ struct list_head stor;
+ struct list_head forw;
+
+ td_valve_request_t reqv[MAX_REQUESTS];
+ td_valve_request_t *free[MAX_REQUESTS];
+ int n_free;
+
+ struct td_valve_stats stats;
+};
+
+#define td_valve_for_each_stored_request(_req, _next, _valve) \
+ list_for_each_entry_safe(_req, _next, &(_valve)->stor, entry)
+
+#define td_valve_for_each_forwarded_request(_req, _next, _valve) \
+ list_for_each_entry_safe(_req, _next, &(_valve)->forw, entry)
+
+#define TD_VALVE_CONNECT_INTERVAL 2 /* s */
+
+#define TD_VALVE_RDLIMIT (1<<0)
+#define TD_VALVE_WRLIMIT (1<<1)
+#define TD_VALVE_KILLED (1<<31)
+
+static void valve_schedule_retry(td_valve_t *);
+static void valve_conn_receive(td_valve_t *);
+static void valve_conn_request(td_valve_t *, unsigned long);
+static void valve_forward_stored_requests(td_valve_t *);
+static void valve_kill(td_valve_t *);
+
+#define DBG(_f, _a...) if (1) { tlog_syslog(TLOG_DBG, _f, ##_a); }
+#define INFO(_f, _a...) tlog_syslog(TLOG_INFO, "valve: " _f, ##_a)
+#define WARN(_f, _a...) tlog_syslog(TLOG_WARN, "WARNING: "_f " in %s:%d", \
+ ##_a, __func__, __LINE__)
+#define ERR(_f, _a...) tlog_syslog(TLOG_WARN, "ERROR: " _f " in %s:%d", \
+ ##_a, __func__, __LINE__)
+#define VERR(_err, _f, _a...) tlog_syslog(TLOG_WARN, \
+ "ERROR: err=%d (%s), " _f ".", \
+ _err, strerror(-(_err)), ##_a)
+#undef PERROR
+#define PERROR(_f, _a...) VERR(-errno, _f, ##_a)
+
+#define BUG() do { \
+ ERR("Aborting"); \
+ td_panic(); \
+ } while (0)
+
+#define BUG_ON(_cond) \
+ if (unlikely(_cond)) { \
+ ERR("(%s) = %ld", #_cond, (long)(_cond)); \
+ BUG(); \
+ }
+
+#define WARN_ON(_cond) ({ \
+ int __cond = _cond; \
+ if (unlikely(__cond)) \
+ WARN("(%s) = %ld", #_cond, (long)(_cond)); \
+ __cond; \
+})
+
+#define ARRAY_SIZE(_a) (sizeof(_a)/sizeof((_a)[0]))
+#define TREQ_SIZE(_treq) ((unsigned int)(_treq.secs) << 9)
+
+static td_valve_request_t *
+valve_alloc_request(td_valve_t *valve)
+{
+ td_valve_request_t *req = NULL;
+
+ if (valve->n_free)
+ req = valve->free[--valve->n_free];
+
+ return req;
+}
+
+static void
+valve_free_request(td_valve_t *valve, td_valve_request_t *req)
+{
+ BUG_ON(valve->n_free >= ARRAY_SIZE(valve->free));
+ list_del_init(&req->entry);
+ valve->free[valve->n_free++] = req;
+}
+
+static void
+__valve_sock_event(event_id_t id, char mode, void *private)
+{
+ td_valve_t *valve = private;
+
+ valve_conn_receive(valve);
+
+ valve_forward_stored_requests(valve);
+}
+
+static void
+valve_set_done_pending(td_valve_t *valve)
+{
+ WARN_ON(valve->done == 0);
+ tapdisk_server_mask_event(valve->sched_id, 0);
+}
+
+static void
+valve_clear_done_pending(td_valve_t *valve)
+{
+ WARN_ON(valve->done != 0);
+ tapdisk_server_mask_event(valve->sched_id, 1);
+}
+
+static void
+__valve_sched_event(event_id_t id, char mode, void *private)
+{
+ td_valve_t *valve = private;
+
+ if (likely(valve->done > 0))
+ /* flush valve->done */
+ valve_conn_request(valve, 0);
+}
+
+static void
+valve_sock_close(td_valve_t *valve)
+{
+ if (valve->sock >= 0) {
+ close(valve->sock);
+ valve->sock = -1;
+ }
+
+ if (valve->sock_id >= 0) {
+ tapdisk_server_unregister_event(valve->sock_id);
+ valve->sock_id = -1;
+ }
+
+ if (valve->sched_id >= 0) {
+ tapdisk_server_unregister_event(valve->sched_id);
+ valve->sched_id = -1;
+ }
+}
+
+static int
+valve_sock_open(td_valve_t *valve)
+{
+ struct sockaddr_un addr = { .sun_family = AF_UNIX };
+ int s, id, err;
+
+ s = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (s < 0) {
+ PERROR("socket");
+ err = -errno;
+ goto fail;
+ }
+
+ valve->sock = s;
+
+ if (valve->brname[0] == '/')
+ snprintf(addr.sun_path, sizeof(addr.sun_path),
+ valve->brname);
+ else
+ snprintf(addr.sun_path, sizeof(addr.sun_path),
+ "%s/%s", TD_VALVE_SOCKDIR, valve->brname);
+
+ err = connect(valve->sock, &addr, sizeof(addr));
+ if (err) {
+ err = -errno;
+ goto fail;
+ }
+
+ id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
+ valve->sock, 0,
+ __valve_sock_event,
+ valve);
+ if (id < 0) {
+ err = id;
+ goto fail;
+ }
+
+ valve->sock_id = id;
+
+ id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT,
+ -1, 0,
+ __valve_sched_event,
+ valve);
+ if (id < 0) {
+ err = id;
+ goto fail;
+ }
+
+ valve->sched_id = id;
+
+ INFO("Connected to %s", addr.sun_path);
+
+ valve->cred = 0;
+ valve->need = 0;
+ valve->done = 0;
+
+ valve_clear_done_pending(valve);
+
+ return 0;
+
+fail:
+ valve_sock_close(valve);
+ return err;
+}
+
+static int
+valve_sock_send(td_valve_t *valve, const void *msg, size_t size)
+{
+ ssize_t n;
+
+ n = send(valve->sock, msg, size, MSG_DONTWAIT);
+ if (n < 0)
+ return -errno;
+ if (n != size)
+ return -EPROTO;
+
+ return 0;
+}
+
+static int
+valve_sock_recv(td_valve_t *valve, void *msg, size_t size)
+{
+ ssize_t n;
+
+ n = recv(valve->sock, msg, size, MSG_DONTWAIT);
+ if (n < 0)
+ return -errno;
+
+ return n;
+}
+
+static void
+__valve_retry_timeout(event_id_t id, char mode, void *private)
+{
+ td_valve_t *valve = private;
+ int err;
+
+ err = valve_sock_open(valve);
+ if (!err)
+ tapdisk_server_unregister_event(valve->retry_id);
+}
+
+static void
+valve_schedule_retry(td_valve_t *valve)
+{
+ int id;
+
+ BUG_ON(valve->sock_id >= 0);
+
+ id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT,
+ -1, TD_VALVE_CONNECT_INTERVAL,
+ __valve_retry_timeout,
+ valve);
+ BUG_ON(id < 0);
+
+ valve->retry_id = id;
+}
+
+static void
+valve_conn_open(td_valve_t *valve)
+{
+ int err;
+
+ BUG_ON(valve->flags & TD_VALVE_KILLED);
+
+ err = valve_sock_open(valve);
+ if (err) {
+ WARN("%s: %s", valve->brname, strerror(-err));
+ valve_schedule_retry(valve);
+ }
+}
+
+static void
+valve_conn_close(td_valve_t *valve, int reset)
+{
+ td_valve_request_t *req, *next;
+
+ valve_sock_close(valve);
+
+ if (reset)
+ td_valve_for_each_stored_request(req, next, valve) {
+ td_forward_request(req->treq);
+ valve->stats.forw++;
+ valve_free_request(valve, req);
+ }
+
+ WARN_ON(!list_empty(&valve->stor));
+}
+
+static void
+valve_conn_reset(td_valve_t *valve)
+{
+ valve_conn_close(valve, 1);
+ valve_conn_open(valve);
+}
+
+void
+valve_conn_receive(td_valve_t *valve)
+{
+ unsigned long buf[32], cred = 0;
+ ssize_t n;
+ int i, err;
+
+ n = valve_sock_recv(valve, buf, sizeof(buf));
+ if (!n) {
+ err = -ECONNRESET;
+ goto reset;
+ }
+
+ if (n < 0) {
+ err = n;
+ if (err != -EAGAIN)
+ goto reset;
+ }
+
+ for (i = 0; i < n / sizeof(buf[0]); i++) {
+ err = WARN_ON(buf[i] >= TD_RLB_REQUEST_MAX);
+ if (err)
+ goto kill;
+
+ cred += buf[i];
+ }
+
+ if (cred > valve->need) {
+ err = -EINVAL;
+ goto reset;
+ }
+
+ valve->cred += cred;
+ valve->need -= cred;
+
+ return;
+
+reset:
+ VERR(err, "resetting connection");
+ valve_conn_reset(valve);
+ return;
+
+kill:
+ ERR("Killing valve.");
+ valve_kill(valve);
+}
+
+static void
+valve_conn_request(td_valve_t *valve, unsigned long size)
+{
+ struct td_valve_req _req;
+ int err;
+
+ _req.need = size;
+ _req.done = valve->done;
+
+ valve->need += size;
+ valve->done = 0;
+
+ valve_clear_done_pending(valve);
+
+ err = valve_sock_send(valve, &_req, sizeof(_req));
+ if (!err)
+ return;
+
+ VERR(err, "resetting connection");
+ valve_conn_reset(valve);
+}
+
+static int
+valve_expend_request(td_valve_t *valve, const td_request_t treq)
+{
+ if (valve->flags & TD_VALVE_KILLED)
+ return 0;
+
+ if (valve->sock < 0)
+ return 0;
+
+ if (valve->cred < TREQ_SIZE(treq))
+ return -EAGAIN;
+
+ valve->cred -= TREQ_SIZE(treq);
+
+ return 0;
+}
+
+static void
+__valve_complete_treq(td_request_t treq, int error)
+{
+ td_valve_request_t *req = treq.cb_data;
+ td_valve_t *valve = req->valve;
+
+ BUG_ON(req->secs < treq.secs);
+ req->secs -= treq.secs;
+
+ valve->done += TREQ_SIZE(treq);
+ valve_set_done_pending(valve);
+
+ if (!req->secs) {
+ td_complete_request(req->treq, error);
+ valve_free_request(valve, req);
+ }
+}
+
+static void
+valve_forward_stored_requests(td_valve_t *valve)
+{
+ td_valve_request_t *req, *next;
+ td_request_t clone;
+ int err;
+
+ td_valve_for_each_stored_request(req, next, valve) {
+
+ err = valve_expend_request(valve, req->treq);
+ if (err)
+ break;
+
+ clone = req->treq;
+ clone.cb = __valve_complete_treq;
+ clone.cb_data = req;
+
+ td_forward_request(clone);
+ valve->stats.forw++;
+
+ list_move(&req->entry, &valve->forw);
+ }
+}
+
+static int
+valve_store_request(td_valve_t *valve, td_request_t treq)
+{
+ td_valve_request_t *req;
+
+ req = valve_alloc_request(valve);
+ if (!req)
+ return -EBUSY;
+
+ valve_conn_request(valve, TREQ_SIZE(treq));
+
+ req->treq = treq;
+ req->secs = treq.secs;
+
+ list_add_tail(&req->entry, &valve->stor);
+ valve->stats.stor++;
+
+ return 0;
+}
+
+static void
+valve_kill(td_valve_t *valve)
+{
+ valve->flags |= TD_VALVE_KILLED;
+ valve_conn_close(valve, 1);
+}
+
+static void
+valve_init(td_valve_t *valve, unsigned long flags)
+{
+ int i;
+
+ memset(valve, 0, sizeof(*valve));
+
+ INIT_LIST_HEAD(&valve->stor);
+ INIT_LIST_HEAD(&valve->forw);
+
+ valve->sock = -1;
+ valve->sock_id = -1;
+
+ valve->retry_id = -1;
+ valve->sched_id = -1;
+
+ valve->flags = flags;
+
+ for (i = ARRAY_SIZE(valve->reqv) - 1; i >= 0; i--) {
+ td_valve_request_t *req = &valve->reqv[i];
+
+ req->valve = valve;
+ INIT_LIST_HEAD(&req->entry);
+
+ valve_free_request(valve, req);
+ }
+}
+
+static int
+td_valve_close(td_driver_t *driver)
+{
+ td_valve_t *valve = driver->data;
+
+ WARN_ON(!list_empty(&valve->stor));
+ WARN_ON(!list_empty(&valve->forw));
+
+ valve_conn_close(valve, 0);
+
+ if (valve->brname) {
+ free(valve->brname);
+ valve->brname = NULL;
+ }
+
+ return 0;
+}
+
+static int
+td_valve_open(td_driver_t *driver,
+ const char *name, td_flag_t flags)
+{
+ td_valve_t *valve = driver->data;
+ int err;
+
+ valve_init(valve, TD_VALVE_WRLIMIT);
+
+ valve->brname = strdup(name);
+ if (!valve->brname) {
+ err = -errno;
+ goto fail;
+ }
+
+ valve_conn_open(valve);
+
+ return 0;
+
+fail:
+ td_valve_close(driver);
+ return err;
+}
+
+static void
+td_valve_queue_request(td_driver_t *driver, td_request_t treq)
+{
+ td_valve_t *valve = driver->data;
+ int err;
+
+ switch (treq.op) {
+
+ case TD_OP_READ:
+ if (valve->flags & TD_VALVE_RDLIMIT)
+ break;
+
+ goto forward;
+
+ case TD_OP_WRITE:
+ if (valve->flags & TD_VALVE_WRLIMIT)
+ break;
+
+ goto forward;
+
+ default:
+ BUG();
+ }
+
+ err = valve_expend_request(valve, treq);
+ if (!err)
+ goto forward;
+
+ err = valve_store_request(valve, treq);
+ if (err)
+ td_complete_request(treq, -EBUSY);
+
+ return;
+
+forward:
+ td_forward_request(treq);
+ valve->stats.forw++;
+}
+
+static int
+td_valve_get_parent_id(td_driver_t *driver, td_disk_id_t *id)
+{
+ return -EINVAL;
+}
+
+static int
+td_valve_validate_parent(td_driver_t *driver,
+ td_driver_t *parent_driver, td_flag_t flags)
+{
+ return -EINVAL;
+}
+
+static void
+td_valve_stats(td_driver_t *driver, td_stats_t *st)
+{
+ td_valve_t *valve = driver->data;
+ td_valve_request_t *req, *next;
+ int n_reqs;
+
+ tapdisk_stats_field(st, "bridge", "d", valve->brname);
+ tapdisk_stats_field(st, "flags", "#x", valve->flags);
+
+ tapdisk_stats_field(st, "cred", "d", valve->cred);
+ tapdisk_stats_field(st, "need", "d", valve->need);
+ tapdisk_stats_field(st, "done", "d", valve->done);
+
+ /*
+ * stored is [ waiting, total-waits ]
+ */
+
+ n_reqs = 0;
+ td_valve_for_each_stored_request(req, next, valve)
+ n_reqs++;
+
+ tapdisk_stats_field(st, "stor", "[");
+ tapdisk_stats_val(st, "d", n_reqs);
+ tapdisk_stats_val(st, "llu", valve->stats.stor);
+ tapdisk_stats_leave(st, ']');
+
+ /*
+ * forwarded is [ in-flight, total-requests ]
+ */
+
+ n_reqs = 0;
+ td_valve_for_each_forwarded_request(req, next, valve)
+ n_reqs++;
+
+ tapdisk_stats_field(st, "forw", "[");
+ tapdisk_stats_val(st, "d", n_reqs);
+ tapdisk_stats_val(st, "llu", valve->stats.forw);
+ tapdisk_stats_leave(st, ']');
+}
+
+struct tap_disk tapdisk_valve = {
+ .disk_type = "tapdisk_valve",
+ .flags = 0,
+ .private_data_size = sizeof(td_valve_t),
+ .td_open = td_valve_open,
+ .td_close = td_valve_close,
+ .td_queue_read = td_valve_queue_request,
+ .td_queue_write = td_valve_queue_request,
+ .td_get_parent_id = td_valve_get_parent_id,
+ .td_validate_parent = td_valve_validate_parent,
+ .td_stats = td_valve_stats,
+};
--- /dev/null
+/*
+ * Copyright (c) 2011, Citrix Systems.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of XenSource Inc. nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <errno.h>
+#include <stdarg.h>
+#include <signal.h>
+#include <getopt.h>
+#include <syslog.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <sys/time.h>
+
+#include "block-valve.h"
+#include "compiler.h"
+#include "list.h"
+
+static void
+rlb_vlog_vfprintf(int prio, const char *fmt, va_list ap)
+{
+ vfprintf(stderr, fmt, ap); fputc('\n', stderr);
+}
+
+static void (*rlb_vlog)(int prio, const char *fmt, va_list ap);
+
+__printf(2, 3)
+static void
+rlb_log(int prio, const char *fmt, ...)
+{
+ va_list ap;
+ va_start(ap, fmt); rlb_vlog(prio, fmt, ap); va_end(ap);
+}
+
+static int debug = 0;
+
+#define DBG(_l, _f, _a...) if (debug >= _l) { rlb_log(LOG_DEBUG, _f, ##_a); }
+#define INFO(_f, _a...) rlb_log(LOG_INFO, _f, ##_a)
+#define WARN(_f, _a...) rlb_log(LOG_WARNING, "WARNING: " _f " in %s:%d", \
+ ##_a, __func__, __LINE__)
+#define ERR(_f, _a...) rlb_log(LOG_ERR, "ERROR: " _f " in %s:%d", \
+ ##_a, __func__, __LINE__)
+#define PERROR(_f, _a...) rlb_log(LOG_ERR, _f ": %s in %s:%d", \
+ ##_a, strerror(errno), __func__, __LINE__)
+
+#define BUG() do { \
+ ERR("Aborting"); \
+ abort(); \
+ } while (0)
+
+#define BUG_ON(_cond) \
+ if (unlikely(_cond)) { \
+ ERR("(%s) = %d", #_cond, _cond); \
+ BUG(); \
+ }
+
+#define WARN_ON(_cond) ({ \
+ int __cond = _cond; \
+ if (unlikely(__cond)) \
+ WARN("(%s) = %d", #_cond, _cond); \
+ __cond; \
+})
+
+#define MAX(a, b) ((a) > (b) ? (a) : (b))
+#define MIN(a, b) ((a) < (b) ? (a) : (b))
+
+#define ARRAY_SIZE(_a) (sizeof(_a)/sizeof((_a)[0]))
+
+typedef struct ratelimit_bridge td_rlb_t;
+typedef struct ratelimit_connection td_rlb_conn_t;
+
+struct ratelimit_connection {
+ int sock;
+
+ unsigned long need; /* I/O requested */
+ unsigned long gntd; /* I/O granted, pending */
+
+ struct list_head open; /* connected */
+ struct list_head wait; /* need > 0 */
+
+ struct {
+ struct timeval since;
+ struct timeval total;
+ } wstat;
+};
+
+#define RLB_CONN_MAX 1024
+
+struct ratelimit_ops {
+ void (*usage)(td_rlb_t *rlb, FILE *stream, void *data);
+
+ int (*create)(td_rlb_t *rlb, int argc, char **argv, void **data);
+ void (*destroy)(td_rlb_t *rlb, void *data);
+
+ void (*info)(td_rlb_t *rlb, void *data);
+
+ void (*settimeo)(td_rlb_t *rlb, struct timeval **tv, void *data);
+ void (*timeout)(td_rlb_t *rlb, void *data);
+ void (*dispatch)(td_rlb_t *rlb, void *data);
+ void (*reset)(td_rlb_t *rlb, void *data);
+};
+
+struct ratelimit_bridge {
+ char *name;
+ char *ident;
+
+ struct sockaddr_un addr;
+ char *path;
+ int sock;
+
+ struct list_head open; /* all connections */
+ struct list_head wait; /* all in need */
+
+ struct timeval ts, now;
+
+ td_rlb_conn_t connv[RLB_CONN_MAX];
+ td_rlb_conn_t *free[RLB_CONN_MAX];
+ int n_free;
+
+ struct rlb_valve {
+ struct ratelimit_ops *ops;
+ void *data;
+ } valve;
+};
+
+#define rlb_for_each_conn(_conn, _rlb) \
+ list_for_each_entry(_conn, &(_rlb)->open, open)
+
+#define rlb_for_each_conn_safe(_conn, _next, _rlb) \
+ list_for_each_entry_safe(_conn, _next, &(_rlb)->open, open)
+
+#define rlb_for_each_waiting(_conn, _next, _rlb) \
+ list_for_each_entry(_conn, _next, &(_rlb)->wait, wait)
+
+#define rlb_for_each_waiting_safe(_conn, _next, _rlb) \
+ list_for_each_entry_safe(_conn, _next, &(_rlb)->wait, wait)
+
+#define rlb_conn_entry(_list) \
+ list_entry(_list, td_rlb_conn_t, open)
+
+#define rlb_wait_entry(_list) \
+ list_entry(_list, td_rlb_conn_t, wait)
+
+static struct ratelimit_ops *rlb_find_valve(const char *name);
+
+static int rlb_create_valve(td_rlb_t *, struct rlb_valve *,
+ const char *name, int argc, char **argv);
+
+/*
+ * util
+ */
+
+#define case_G case 'G': case 'g'
+#define case_M case 'M': case 'm'
+#define case_K case 'K': case 'k'
+
+static long
+rlb_strtol(const char *s)
+{
+ unsigned long l, u = 1;
+ char *end, p, q;
+
+ l = strtoul(s, &end, 0);
+ if (!*end)
+ return l;
+
+ p = *end++;
+
+ switch (p) {
+ case_G: case_M: case_K:
+
+ q = *end++;
+
+ switch (q) {
+ case 'i':
+ switch (p) {
+ case_G:
+ u *= 1000;
+ case_M:
+ u *= 1000;
+ case_K:
+ u *= 1000;
+ }
+ break;
+
+ case 0:
+ switch (p) {
+ case_G:
+ u *= 1024;
+ case_M:
+ u *= 1024;
+ case_K:
+ u *= 1024;
+ }
+ break;
+
+ default:
+ goto fail;
+ }
+ break;
+
+ case 0:
+ break;
+
+ default:
+ goto fail;
+ }
+
+ return l * u;
+
+fail:
+ return -EINVAL;
+}
+
+static char*
+vmprintf(const char *fmt, va_list ap)
+{
+ char *s;
+ int n;
+
+ n = vasprintf(&s, fmt, ap);
+ if (n < 0)
+ s = NULL;
+
+ return s;
+}
+
+__printf(1, 2)
+static char*
+mprintf(const char *fmt, ...)
+{
+ va_list ap;
+ char *s;
+
+ va_start(ap, fmt);
+ s = vmprintf(fmt, ap);
+ va_end(ap);
+
+ return s;
+}
+
+static int
+sysctl_vscanf(const char *name, const char *fmt, va_list ap)
+{
+ char *path = NULL;
+ FILE *s = NULL;
+ int rv;
+
+ path = mprintf("/proc/sys/%s", name);
+ if (!path) {
+ rv = -errno;
+ goto fail;
+ }
+
+ s = fopen(path, "r");
+ if (!s) {
+ rv = -errno;
+ goto fail;
+ }
+
+ rv = vfscanf(s, fmt, ap);
+fail:
+ if (s)
+ fclose(s);
+
+ if (path)
+ free(path);
+
+ return rv;
+}
+
+static int
+sysctl_scanf(const char *name, const char *fmt, ...)
+{
+ va_list(ap);
+ int rv;
+
+ va_start(ap, fmt);
+ rv = sysctl_vscanf(name, fmt, ap);
+ va_end(ap);
+
+ return rv;
+}
+
+static long
+sysctl_strtoul(const char *name)
+{
+ unsigned val;
+ int n;
+
+ n = sysctl_scanf(name, "%lu", &val);
+ if (n < 0)
+ return n;
+ if (n != 1)
+ return -EINVAL;
+
+ return val;
+}
+
+
+static long long
+rlb_tv_usec(const struct timeval *tv)
+{
+ long long us;
+
+ us = tv->tv_sec;
+ us *= 1000000;
+ us += tv->tv_usec;
+
+ return us;
+}
+
+static long long
+rlb_usec_since(td_rlb_t *rlb, const struct timeval *since)
+{
+ struct timeval delta;
+
+ timersub(&rlb->now, since, &delta);
+
+ return rlb_tv_usec(&delta);
+}
+
+static inline void
+rlb_argv_shift(int *optind, int *argc, char ***argv)
+{
+ /* reset optind and args after '--' */
+
+ *optind -= 1;
+
+ *argc -= *optind;
+ *argv += *optind;
+
+ *optind = 1;
+}
+
+/*
+ * socket I/O
+ */
+
+static void
+rlb_sock_close(td_rlb_t *rlb)
+{
+ if (rlb->path) {
+ unlink(rlb->path);
+ rlb->path = NULL;
+ }
+
+ if (rlb->sock >= 0) {
+ close(rlb->sock);
+ rlb->sock = -1;
+ }
+}
+
+static int
+rlb_sock_open(td_rlb_t *rlb)
+{
+ int s, err;
+
+ rlb->sock = -1;
+
+ s = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (s < 0) {
+ PERROR("socket");
+ err = -errno;
+ goto fail;
+ }
+
+ rlb->sock = s;
+
+ rlb->addr.sun_family = AF_UNIX;
+ snprintf(rlb->addr.sun_path, sizeof(rlb->addr.sun_path),
+ "%s/%s", TD_VALVE_SOCKDIR, rlb->name);
+
+ err = bind(rlb->sock, &rlb->addr, sizeof(rlb->addr));
+ if (err) {
+ PERROR("%s", rlb->addr.sun_path);
+ err = -errno;
+ goto fail;
+ }
+
+ rlb->path = rlb->addr.sun_path;
+
+ err = listen(rlb->sock, RLB_CONN_MAX);
+ if (err) {
+ PERROR("listen(%s)", rlb->addr.sun_path);
+ err = -errno;
+ goto fail;
+ }
+
+ return 0;
+
+fail:
+ rlb_sock_close(rlb);
+ return err;
+}
+
+static int
+rlb_sock_send(td_rlb_t *rlb, td_rlb_conn_t *conn,
+ const void *msg, size_t size)
+{
+ ssize_t n;
+
+ n = send(conn->sock, msg, size, MSG_DONTWAIT);
+ if (n < 0)
+ return -errno;
+ if (n && n != size)
+ return -EPROTO;
+
+ return 0;
+}
+
+static int
+rlb_sock_recv(td_rlb_t *rlb, td_rlb_conn_t *conn,
+ void *msg, size_t size)
+{
+ ssize_t n;
+
+ n = recv(conn->sock, msg, size, MSG_DONTWAIT);
+ if (n < 0)
+ return -errno;
+
+ return n;
+}
+
+static td_rlb_conn_t *
+rlb_conn_alloc(td_rlb_t *rlb)
+{
+ td_rlb_conn_t *conn = NULL;
+
+ if (likely(rlb->n_free > 0))
+ conn = rlb->free[--rlb->n_free];
+
+ return conn;
+}
+
+static void
+rlb_conn_free(td_rlb_t *rlb, td_rlb_conn_t *conn)
+{
+ BUG_ON(rlb->n_free >= RLB_CONN_MAX);
+
+ rlb->free[rlb->n_free++] = conn;
+}
+
+static int
+rlb_conn_id(td_rlb_t *rlb, td_rlb_conn_t *conn)
+{
+ return conn - rlb->connv;
+}
+
+static void
+rlb_conn_info(td_rlb_t *rlb, td_rlb_conn_t *conn)
+{
+ long long wtime;
+ int waits;
+
+ wtime = 0;
+ waits = !list_empty(&conn->wait);
+ if (waits)
+ wtime = rlb_usec_since(rlb, &conn->wstat.since) / 1000;
+
+ WARN_ON(!!conn->need != waits);
+
+ INFO("conn[%d] needs %lu (since %llu ms, total %lu.%06lu s),"
+ " %lu granted",
+ rlb_conn_id(rlb, conn), conn->need, wtime,
+ conn->wstat.total.tv_sec, conn->wstat.total.tv_usec,
+ conn->gntd);
+}
+
+static void
+rlb_conn_infos(td_rlb_t *rlb)
+{
+ td_rlb_conn_t *conn;
+
+ rlb_for_each_conn(conn, rlb)
+ rlb_conn_info(rlb, conn);
+}
+
+static void
+rlb_conn_close(td_rlb_t *rlb, td_rlb_conn_t *conn)
+{
+ int s = conn->sock;
+
+ INFO("Connection %d closed.", rlb_conn_id(rlb, conn));
+ rlb_conn_info(rlb, conn);
+
+ if (s) {
+ close(s);
+ conn->sock = -1;
+ }
+
+ list_del_init(&conn->wait);
+ list_del(&conn->open);
+
+ rlb_conn_free(rlb, conn);
+}
+
+static void
+rlb_conn_receive(td_rlb_t *rlb, td_rlb_conn_t *conn)
+{
+ struct td_valve_req buf[32], req = { -1, -1 };
+ ssize_t n;
+ int i, err;
+
+ n = rlb_sock_recv(rlb, conn, buf, sizeof(buf));
+ if (!n)
+ goto close;
+
+ if (n < 0) {
+ err = n;
+ if (err != -EAGAIN)
+ goto fail;
+ }
+
+ if (unlikely(n % sizeof(req))) {
+ err = -EPROTO;
+ goto fail;
+ }
+
+ for (i = 0; i < n / sizeof(buf[0]); i++) {
+ req = buf[i];
+
+ if (unlikely(req.need > TD_RLB_REQUEST_MAX)) {
+ err = -EINVAL;
+ goto fail;
+ }
+
+ if (unlikely(req.done > conn->gntd)) {
+ err = -EINVAL;
+ goto fail;
+ }
+
+ conn->need += req.need;
+ conn->gntd -= req.done;
+
+ DBG(8, "rcv: %lu/%lu need=%lu gntd=%lu",
+ req.need, req.done, conn->need, conn->gntd);
+
+ if (unlikely(conn->need > TD_RLB_REQUEST_MAX)) {
+ err = -EINVAL;
+ goto fail;
+ }
+ }
+
+ if (conn->need && list_empty(&conn->wait)) {
+ list_add_tail(&conn->wait, &rlb->wait);
+ conn->wstat.since = rlb->now;
+ }
+
+ return;
+
+fail:
+ WARN("err = %d (%s)"
+ " (need %ld/%ld, %ld/%ld done),"
+ " closing connection.",
+ err, strerror(-err),
+ req.need, conn->need, req.done, conn->gntd);
+
+ rlb_conn_info(rlb, conn);
+close:
+ rlb_conn_close(rlb, conn);
+}
+
+static void
+rlb_conn_respond(td_rlb_t *rlb, td_rlb_conn_t *conn, unsigned long need)
+{
+ int err;
+
+ BUG_ON(need > conn->need);
+
+ err = rlb_sock_send(rlb, conn, &need, sizeof(need));
+ if (err)
+ goto fail;
+
+ conn->need -= need;
+ conn->gntd += need;
+
+ DBG(8, "snd: %lu need=%lu gntd=%lu", need, conn->need, conn->gntd);
+
+ if (!conn->need) {
+ struct timeval delta;
+
+ timersub(&rlb->now, &conn->wstat.since, &delta);
+ timeradd(&conn->wstat.total, &delta, &conn->wstat.total);
+
+ list_del_init(&conn->wait);
+ }
+
+ return;
+
+fail:
+ WARN("err = %d, killing connection.", err);
+ rlb_conn_close(rlb, conn);
+}
+
+static void
+rlb_accept_conn(td_rlb_t *rlb)
+{
+ td_rlb_conn_t *conn;
+ int s, err;
+
+ s = accept(rlb->sock, NULL, NULL);
+ if (!s) {
+ err = -errno;
+ goto fail;
+ }
+
+ conn = rlb_conn_alloc(rlb);
+ if (!conn) {
+ err = -ENOMEM;
+ close(s);
+ goto fail;
+ }
+
+ INFO("Accepting connection %d.", conn - rlb->connv);
+
+ memset(conn, 0, sizeof(*conn));
+ INIT_LIST_HEAD(&conn->wait);
+ conn->sock = s;
+ list_add_tail(&conn->open, &rlb->open);
+
+ return;
+
+fail:
+ WARN("err = %d", err);
+}
+
+static long long
+rlb_pending(td_rlb_t *rlb)
+{
+ td_rlb_conn_t *conn;
+ long long pend = 0;
+
+ rlb_for_each_conn(conn, rlb)
+ pend += conn->gntd;
+
+ return pend;
+}
+
+/*
+ * token bucket valve
+ */
+
+typedef struct ratelimit_token td_rlb_token_t;
+
+struct ratelimit_token {
+ long cred;
+ long cap;
+ long rate;
+ struct timeval timeo;
+};
+
+static void
+rlb_token_settimeo(td_rlb_t *rlb, struct timeval **_tv, void *data)
+{
+ td_rlb_token_t *token = data;
+ struct timeval *tv = &token->timeo;
+ long long us;
+
+ if (list_empty(&rlb->wait)) {
+ *_tv = NULL;
+ return;
+ }
+
+ WARN_ON(token->cred >= 0);
+
+ us = -token->cred;
+ us *= 1000000;
+ us /= token->rate;
+
+ tv->tv_sec = us / 1000000;
+ tv->tv_usec = us % 1000000;
+
+ WARN_ON(!timerisset(tv));
+
+ *_tv = tv;
+}
+
+static void
+rlb_token_refill(td_rlb_t *rlb, td_rlb_token_t *token)
+{
+ struct timeval tv;
+ long long cred, max_usec;
+
+ /* max time needed to refill up to cap */
+
+ max_usec = token->cap - token->cred;
+ max_usec *= 1000000;
+ max_usec += token->rate - 1;
+ max_usec /= token->rate;
+
+ /* actual credit gained */
+
+ timersub(&rlb->now, &rlb->ts, &tv);
+
+ cred = rlb_tv_usec(&tv);
+ cred = MIN(cred, max_usec);
+ cred *= token->rate;
+ cred /= 1000000;
+
+ /* up to cap */
+
+ token->cred += cred;
+ token->cred = MIN(token->cred, token->cap);
+}
+
+static void
+rlb_token_dispatch(td_rlb_t *rlb, void *data)
+{
+ td_rlb_token_t *token = data;
+ td_rlb_conn_t *conn, *next;
+
+ rlb_token_refill(rlb, token);
+
+ rlb_for_each_waiting_safe(conn, next, rlb) {
+ if (token->cred < 0)
+ break;
+
+ token->cred -= conn->need;
+
+ rlb_conn_respond(rlb, conn, conn->need);
+ }
+}
+
+static void
+rlb_token_reset(td_rlb_t *rlb, void *data)
+{
+ td_rlb_token_t *token = data;
+
+ token->cred = token->cap;
+}
+
+static void
+rlb_token_destroy(td_rlb_t *rlb, void *data)
+{
+ td_rlb_token_t *token = data;
+
+ if (token)
+ free(token);
+}
+
+static int
+rlb_token_create(td_rlb_t *rlb, int argc, char **argv, void **data)
+{
+ td_rlb_token_t *token;
+ int err;
+
+ token = calloc(1, sizeof(*token));
+ if (!token) {
+ err = -ENOMEM;
+ goto fail;
+ }
+
+ token->rate = 0;
+ token->cap = 0;
+
+ do {
+ const struct option longopts[] = {
+ { "rate", 1, NULL, 'r' },
+ { "cap", 1, NULL, 'c' },
+ { NULL, 0, NULL, 0 }
+ };
+ int c;
+
+ c = getopt_long(argc, argv, "r:c:", longopts, NULL);
+ if (c < 0)
+ break;
+
+ switch (c) {
+ case 'r':
+ token->rate = rlb_strtol(optarg);
+ if (token->rate < 0) {
+ ERR("invalid --rate");
+ goto usage;
+ }
+ break;
+
+ case 'c':
+ token->cap = rlb_strtol(optarg);
+ if (token->cap < 0) {
+ ERR("invalid --cap");
+ goto usage;
+ }
+ break;
+
+ case '?':
+ goto usage;
+
+ default:
+ BUG();
+ }
+ } while (1);
+
+ if (!token->rate) {
+ ERR("--rate required");
+ goto usage;
+ }
+
+ rlb_token_reset(rlb, token);
+
+ *data = token;
+
+ return 0;
+
+fail:
+ if (token)
+ free(token);
+
+ return err;
+
+usage:
+ err = -EINVAL;
+ goto fail;
+}
+
+static void
+rlb_token_usage(td_rlb_t *rlb, FILE *stream, void *data)
+{
+ fprintf(stream,
+ " {-t|--type}=token --"
+ " {-r|--rate}=<rate [KMG]>"
+ " {-c|--cap}=<size [KMG]>");
+}
+
+static void
+rlb_token_info(td_rlb_t *rlb, void *data)
+{
+ td_rlb_token_t *token = data;
+
+ INFO("TOKEN: rate: %ld B/s cap: %ld B cred: %ld B",
+ token->rate, token->cap, token->cred);
+}
+
+static struct ratelimit_ops rlb_token_ops = {
+ .usage = rlb_token_usage,
+ .create = rlb_token_create,
+ .destroy = rlb_token_destroy,
+ .info = rlb_token_info,
+
+ .settimeo = rlb_token_settimeo,
+ .timeout = rlb_token_dispatch,
+ .dispatch = rlb_token_dispatch,
+ .reset = rlb_token_reset,
+};
+
+/*
+ * meminfo valve
+ */
+
+typedef struct ratelimit_meminfo td_rlb_meminfo_t;
+
+struct ratelimit_meminfo {
+ unsigned int period;
+ struct timeval ts;
+
+ FILE *s;
+
+ unsigned long total;
+ unsigned long dirty;
+ unsigned long writeback;
+
+ unsigned int limit_hi;
+ unsigned int limit_lo;
+ unsigned int congested;
+
+ struct rlb_valve valve;
+ struct timeval timeo;
+};
+
+static void
+rlb_meminfo_info(td_rlb_t *rlb, void *data)
+{
+ td_rlb_meminfo_t *m = data;
+
+ INFO("MEMINFO: lo/hi: %u/%u%% period: %u ms",
+ m->limit_lo, m->limit_hi, m->period);
+
+ INFO("MEMINFO: total %lu kB, dirty/writeback %lu/%lu kB",
+ m->total, m->dirty, m->writeback);
+
+ m->valve.ops->info(rlb, m->valve.data);
+}
+
+static void
+rlb_meminfo_close(td_rlb_meminfo_t *m)
+{
+ if (m->s) {
+ fclose(m->s);
+ m->s = NULL;
+ }
+}
+
+static int
+rlb_meminfo_open(td_rlb_meminfo_t *m)
+{
+ FILE *s;
+ int err;
+
+ m->s = NULL;
+
+ s = fopen("/proc/meminfo", "r");
+ if (!s) {
+ err = -errno;
+ goto fail;
+ }
+
+ m->s = s;
+
+ return 0;
+
+fail:
+ rlb_meminfo_close(m);
+ return err;
+}
+
+static inline int __test_bit(int n, unsigned long *bitmap)
+{
+ return !!(*bitmap & (1UL<<n));
+}
+
+static inline void __clear_bit(int n, unsigned long *bitmap)
+{
+ *bitmap &= ~(1UL<<n);
+}
+
+static struct ratelimit_meminfo_scan {
+ const char *format;
+ ptrdiff_t ptrdiff;
+} rlb_meminfo_scanfs[] = {
+ { "MemTotal: %lu kB",
+ offsetof(struct ratelimit_meminfo, total) },
+ { "Dirty: %lu kB",
+ offsetof(struct ratelimit_meminfo, dirty) },
+ { "Writeback: %lu kB",
+ offsetof(struct ratelimit_meminfo, writeback) },
+};
+
+static int
+rlb_meminfo_scan(td_rlb_meminfo_t *m)
+{
+ const int n_keys = ARRAY_SIZE(rlb_meminfo_scanfs);
+ unsigned long pending;
+ int err;
+
+ err = rlb_meminfo_open(m);
+ if (err)
+ goto fail;
+
+ pending = (1UL << n_keys) - 1;
+
+ do {
+ char buf[80], *b;
+ int i;
+
+ b = fgets(buf, sizeof(buf), m->s);
+ if (!b)
+ break;
+
+ for (i = 0; i < n_keys; i++) {
+ struct ratelimit_meminfo_scan *scan;
+ unsigned long val, *ptr;
+ int n;
+
+ if (!__test_bit(i, &pending))
+ continue;
+
+ scan = &rlb_meminfo_scanfs[i];
+
+ n = sscanf(buf, scan->format, &val);
+ if (n != 1)
+ continue;
+
+ ptr = (void*)m + scan->ptrdiff;
+ *ptr = val;
+
+ __clear_bit(i, &pending);
+ }
+
+ } while (pending);
+
+ if (pending) {
+ err = -ESRCH;
+ goto fail;
+ }
+
+ err = 0;
+fail:
+ rlb_meminfo_close(m);
+ return err;
+}
+
+static void
+rlb_meminfo_usage(td_rlb_t *rlb, FILE *stream, void *data)
+{
+ td_rlb_meminfo_t *m = data;
+
+ fprintf(stream,
+ " {-t|--type}=meminfo "
+ " {-H|--high}=<percent> {-l|--low}=<percent>"
+ " {-p|--period}=<msecs> --");
+
+ if (m && m->valve.ops) {
+ m->valve.ops->usage(rlb, stream, m->valve.data);
+ } else
+ fprintf(stream, " {-t|--type}={...}");
+}
+
+static void
+rlb_meminfo_destroy(td_rlb_t *rlb, void *data)
+{
+ td_rlb_meminfo_t *m = data;
+
+ if (m) {
+ if (m->valve.data) {
+ m->valve.ops->destroy(rlb, m->valve.data);
+ m->valve.data = NULL;
+ }
+
+ free(m);
+ }
+}
+
+static int
+rlb_meminfo_create(td_rlb_t *rlb, int argc, char **argv, void **data)
+{
+ td_rlb_meminfo_t *m;
+ const char *type;
+ long dbr;
+ int err;
+
+ m = calloc(1, sizeof(*m));
+ if (!m) {
+ err = -errno;
+ goto fail;
+ }
+
+ type = NULL;
+ m->period = 100;
+
+ do {
+ const struct option longopts[] = {
+ { "period", 1, NULL, 'p' },
+ { "type", 1, NULL, 't' },
+ { "high", 1, NULL, 'H' },
+ { "low", 1, NULL, 'L' },
+ { NULL, 0, NULL, 0 }
+ };
+ int c;
+
+ c = getopt_long(argc, argv, "p:t:H:L:", longopts, NULL);
+ if (c < 0)
+ break;
+
+ switch (c) {
+ case 'p':
+ m->period = rlb_strtol(optarg);
+ if (m->period < 0)
+ goto usage;
+ break;
+
+ case 'H':
+ m->limit_hi = strtoul(optarg, NULL, 0);
+ break;
+
+ case 'L':
+ m->limit_lo = strtoul(optarg, NULL, 0);
+ break;
+
+ case 't':
+ type = optarg;
+ break;
+
+ case '?':
+ goto usage;
+
+ default:
+ BUG();
+ }
+ } while (1);
+
+ if (!type) {
+ ERR("--type required");
+ goto usage;
+ }
+
+ if (!m->limit_hi || !m->limit_lo) {
+ ERR("--high/--low required");
+ goto usage;
+ }
+
+ if (m->limit_lo >= m->limit_hi) {
+ ERR("invalid --high/--low ratio");
+ goto usage;
+ }
+
+ dbr = sysctl_strtoul("vm/dirty_background_ratio");
+ if (dbr < 0) {
+ err = dbr;
+ goto fail;
+ }
+
+ if (0 && m->limit_lo < dbr) {
+ ERR("--low %u is less than vm.dirty_background_ratio (= %ld)",
+ m->limit_lo, dbr);
+ err = -EINVAL;
+ goto fail;
+ }
+
+ *data = m;
+
+ rlb_argv_shift(&optind, &argc, &argv);
+
+ err = rlb_create_valve(rlb, &m->valve, type, argc, argv);
+ if (err) {
+ if (err == -EINVAL)
+ goto usage;
+ goto fail;
+ }
+
+ err = rlb_meminfo_scan(m);
+ if (err)
+ goto fail;
+
+ return 0;
+
+fail:
+ rlb_meminfo_destroy(rlb, m);
+ WARN("err = %d", err);
+ return err;
+
+usage:
+ err = -EINVAL;
+ return err;
+};
+
+static void
+rlb_meminfo_settimeo(td_rlb_t *rlb, struct timeval **_tv, void *data)
+{
+ td_rlb_meminfo_t *m = data;
+ int idle;
+
+ idle = list_empty(&rlb->wait);
+ BUG_ON(!idle && !m->congested);
+
+ if (m->congested) {
+ m->valve.ops->settimeo(rlb, _tv, m->valve.data);
+ return;
+ }
+
+ *_tv = NULL;
+}
+
+static void
+rlb_meminfo_timeout(td_rlb_t *rlb, void *data)
+{
+ td_rlb_meminfo_t *m = data;
+
+ WARN_ON(!m->congested);
+
+ if (m->congested)
+ m->valve.ops->timeout(rlb, m->valve.data);
+}
+
+static int
+rlb_meminfo_test_high(td_rlb_t *rlb, td_rlb_meminfo_t *m, long long cred)
+{
+ long long lo;
+
+ if (m->congested) {
+ /* hysteresis */
+
+ lo = m->total;
+ lo *= m->limit_lo;
+ lo /= 100;
+
+ if (cred >= lo)
+ return 0;
+
+ } else
+ if (cred <= 0) {
+ m->valve.ops->reset(rlb, m->valve.data);
+ return 1;
+ }
+
+ return m->congested;
+}
+
+static void
+rlb_meminfo_dispatch_low(td_rlb_t *rlb, td_rlb_meminfo_t *m,
+ long long *_cred)
+{
+ td_rlb_conn_t *conn, *next;
+ long long cred = *_cred, grant;
+
+ rlb_for_each_waiting_safe(conn, next, rlb) {
+
+ if (cred <= 0)
+ break;
+
+ grant = MIN(cred, conn->need);
+
+ rlb_conn_respond(rlb, conn, grant);
+
+ cred -= grant;
+ }
+
+ *_cred = cred;
+}
+
+static void
+rlb_meminfo_dispatch(td_rlb_t *rlb, void *data)
+{
+ td_rlb_meminfo_t *m = data;
+ long long us, hi, cred, dirty, pend;
+
+ /* we run only once per m->period */
+
+ us = rlb_usec_since(rlb, &m->ts);
+ if (us / 1000 > m->period) {
+ rlb_meminfo_scan(m);
+ m->ts = rlb->now;
+ }
+
+ /* uncongested credit:
+ memory below hi watermark minus pending I/O */
+
+ hi = m->total;
+ hi *= m->limit_hi;
+ hi /= 100;
+
+ dirty = m->dirty + m->writeback;
+
+ cred = hi - dirty;
+ cred *= 1000;
+
+ pend = rlb_pending(rlb);
+ cred -= pend;
+
+ m->congested = rlb_meminfo_test_high(rlb, m, cred);
+
+ DBG(3, "dirty=%lld (%lld) pend=%llu cred=%lld %s",
+ dirty, dirty * 100 / m->total, pend, cred,
+ m->congested ? "congested" : "");
+
+ if (!m->congested) {
+ rlb_meminfo_dispatch_low(rlb, m, &cred);
+
+ m->congested = rlb_meminfo_test_high(rlb, m, cred);
+ }
+
+ if (m->congested)
+ m->valve.ops->dispatch(rlb, m->valve.data);
+}
+
+static struct ratelimit_ops rlb_meminfo_ops = {
+ .usage = rlb_meminfo_usage,
+ .create = rlb_meminfo_create,
+ .destroy = rlb_meminfo_destroy,
+ .info = rlb_meminfo_info,
+
+ .settimeo = rlb_meminfo_settimeo,
+ .timeout = rlb_meminfo_timeout,
+ .dispatch = rlb_meminfo_dispatch,
+};
+
+/*
+ * main loop
+ */
+
+static void
+rlb_info(td_rlb_t *rlb)
+{
+ rlb->valve.ops->info(rlb, rlb->valve.data);
+
+ rlb_conn_infos(rlb);
+}
+
+static sigset_t rlb_sigunblock;
+static sigset_t rlb_sigpending;
+
+static void
+rlb_sigmark(int signo)
+{
+ INFO("Caught SIG%d", signo);
+ sigaddset(&rlb_sigpending, signo);
+}
+
+static int
+rlb_siginit(void)
+{
+ struct sigaction sa_ignore = { .sa_handler = SIG_IGN };
+ struct sigaction sa_pending = { .sa_handler = rlb_sigmark };
+ sigset_t sigmask;
+ int err = 0;
+
+ if (!err)
+ err = sigaction(SIGPIPE, &sa_ignore, NULL);
+ if (!err)
+ err = sigaction(SIGINT, &sa_pending, NULL);
+ if (!err)
+ err = sigaction(SIGTERM, &sa_pending, NULL);
+ if (!err)
+ err = sigaction(SIGUSR1, &sa_pending, NULL);
+ if (err) {
+ err = -errno;
+ goto fail;
+ }
+
+ sigemptyset(&sigmask);
+ sigaddset(&sigmask, SIGINT);
+ sigaddset(&sigmask, SIGTERM);
+ sigaddset(&sigmask, SIGUSR1);
+
+ err = sigprocmask(SIG_BLOCK, &sigmask, &rlb_sigunblock);
+ if (err) {
+ err = -errno;
+ goto fail;
+ }
+
+fail:
+ return err;
+}
+
+static int
+rlb_main_signaled(td_rlb_t *rlb)
+{
+ if (sigismember(&rlb_sigpending, SIGUSR1))
+ rlb_info(rlb);
+
+ if (sigismember(&rlb_sigpending, SIGINT) ||
+ sigismember(&rlb_sigpending, SIGTERM))
+ return -EINTR;
+
+ return 0;
+}
+
+
+static struct ratelimit_ops *
+rlb_find_valve(const char *name)
+{
+ struct ratelimit_ops *ops = NULL;
+
+ switch (name[0]) {
+#if 0
+ case 'l':
+ if (!strcmp(name, "leaky"))
+ ops = &rlb_leaky_ops;
+ break;
+#endif
+
+ case 't':
+ if (!strcmp(name, "token"))
+ ops = &rlb_token_ops;
+ break;
+
+ case 'm':
+ if (!strcmp(name, "meminfo"))
+ ops = &rlb_meminfo_ops;
+ break;
+ }
+
+ return ops;
+}
+
+static int
+rlb_main_iterate(td_rlb_t *rlb)
+{
+ td_rlb_conn_t *conn, *next;
+ struct timeval *tv;
+ struct timespec _ts, *ts = &_ts;
+ int nfds, err;
+ fd_set rfds;
+
+ FD_ZERO(&rfds);
+ nfds = 0;
+
+ if (stdin) {
+ FD_SET(STDIN_FILENO, &rfds);
+ nfds = MAX(nfds, STDIN_FILENO);
+ }
+
+ if (rlb->sock >= 0) {
+ FD_SET(rlb->sock, &rfds);
+ nfds = MAX(nfds, rlb->sock);
+ }
+
+ rlb_for_each_conn(conn, rlb) {
+ FD_SET(conn->sock, &rfds);
+ nfds = MAX(nfds, conn->sock);
+ }
+
+ rlb->valve.ops->settimeo(rlb, &tv, rlb->valve.data);
+ if (tv) {
+ TIMEVAL_TO_TIMESPEC(tv, ts);
+ } else
+ ts = NULL;
+
+ rlb->ts = rlb->now;
+
+ nfds = pselect(nfds + 1, &rfds, NULL, NULL, ts, &rlb_sigunblock);
+ if (nfds < 0) {
+ err = -errno;
+ if (err != -EINTR)
+ PERROR("select");
+ goto fail;
+ }
+
+ gettimeofday(&rlb->now, NULL);
+
+ if (!nfds) {
+ BUG_ON(!ts);
+ rlb->valve.ops->timeout(rlb, rlb->valve.data);
+ }
+
+ if (nfds) {
+ rlb_for_each_conn_safe(conn, next, rlb)
+ if (FD_ISSET(conn->sock, &rfds)) {
+ rlb_conn_receive(rlb, conn);
+ if (!--nfds)
+ break;
+ }
+
+ rlb->valve.ops->dispatch(rlb, rlb->valve.data);
+ }
+
+ if (unlikely(nfds)) {
+ if (FD_ISSET(STDIN_FILENO, &rfds)) {
+ getc(stdin);
+ rlb_info(rlb);
+ nfds--;
+ }
+ }
+
+ if (unlikely(nfds)) {
+ if (FD_ISSET(rlb->sock, &rfds)) {
+ rlb_accept_conn(rlb);
+ nfds--;
+ }
+ }
+
+ BUG_ON(nfds);
+ err = 0;
+fail:
+ return err;
+}
+
+static int
+rlb_main_run(td_rlb_t *rlb)
+{
+ int err;
+
+ do {
+ err = rlb_main_iterate(rlb);
+ if (err) {
+ if (err != -EINTR)
+ break;
+
+ err = rlb_main_signaled(rlb);
+ if (err) {
+ err = 0;
+ break;
+ }
+ }
+
+ } while (rlb->sock >= 0 || !list_empty(&rlb->open));
+
+ return err;
+}
+
+static void
+rlb_shutdown(td_rlb_t *rlb)
+{
+ td_rlb_conn_t *conn, *next;
+
+ rlb_for_each_conn_safe(conn, next, rlb)
+ rlb_conn_close(rlb, conn);
+
+ rlb_sock_close(rlb);
+}
+
+static void
+rlb_usage(td_rlb_t *rlb, const char *prog, FILE *stream)
+{
+ fprintf(stream, "Usage: %s <name>", prog);
+
+ if (rlb && rlb->valve.ops)
+ rlb->valve.ops->usage(rlb, stream, rlb->valve.data);
+ else
+ fprintf(stream,
+ " {-t|--type}={token|meminfo}"
+ " [-h|--help] [-D|--debug=<n>]");
+
+ fprintf(stream, "\n");
+}
+
+static void
+rlb_destroy(td_rlb_t *rlb)
+{
+ rlb_shutdown(rlb);
+
+ if (rlb->valve.data) {
+ rlb->valve.ops->destroy(rlb, rlb->valve.data);
+ rlb->valve.data = NULL;
+ }
+
+ if (rlb->name) {
+ free(rlb->name);
+ rlb->name = NULL;
+ }
+}
+
+static int
+rlb_create(td_rlb_t *rlb, const char *name)
+{
+ int i, err;
+
+ memset(rlb, 0, sizeof(*rlb));
+ INIT_LIST_HEAD(&rlb->open);
+ INIT_LIST_HEAD(&rlb->wait);
+ rlb->sock = -1;
+
+ for (i = RLB_CONN_MAX - 1; i >= 0; i--)
+ rlb_conn_free(rlb, &rlb->connv[i]);
+
+ rlb->name = strdup(name);
+ if (!rlb->name) {
+ err = -errno;
+ goto fail;
+ }
+
+ err = rlb_sock_open(rlb);
+ if (err)
+ goto fail;
+
+ gettimeofday(&rlb->now, NULL);
+
+ return 0;
+
+fail:
+ WARN("err = %d", err);
+ rlb_destroy(rlb);
+ return err;
+}
+
+static int
+rlb_create_valve(td_rlb_t *rlb, struct rlb_valve *v,
+ const char *name, int argc, char **argv)
+{
+ struct ratelimit_ops *ops;
+ int err;
+
+ ops = rlb_find_valve(name);
+ if (!ops) {
+ err = -ESRCH;
+ goto fail;
+ }
+
+ v->ops = ops;
+
+ err = v->ops->create(rlb, argc, argv, &v->data);
+
+fail:
+ return err;
+}
+
+static void
+rlb_openlog(const char *name, int facility)
+{
+ static char ident[32];
+
+ snprintf(ident, sizeof(ident), "%s[%d]", name, getpid());
+ ident[sizeof(ident)-1] = 0;
+
+ openlog(ident, 0, facility);
+
+ rlb_vlog = vsyslog;
+}
+
+int
+main(int argc, char **argv)
+{
+ td_rlb_t _rlb, *rlb;
+ const char *prog, *type;
+ int err;
+
+ setbuf(stdin, NULL);
+ setlinebuf(stderr);
+
+ rlb = NULL;
+ prog = basename(argv[0]);
+ type = NULL;
+ rlb_vlog = rlb_vlog_vfprintf;
+
+ do {
+ const struct option longopts[] = {
+ { "help", 0, NULL, 'h' },
+ { "type", 1, NULL, 't' },
+ { "debug", 0, NULL, 'D' },
+ { NULL, 0, NULL, 0 },
+ };
+ int c;
+
+ c = getopt_long(argc, argv, "ht:D:", longopts, NULL);
+ if (c < 0)
+ break;
+
+ switch (c) {
+ case 'h':
+ rlb_usage(NULL, prog, stdout);
+ return 0;
+
+ case 't':
+ type = optarg;
+ break;
+
+ case 'D':
+ debug = strtoul(optarg, NULL, 0);
+ break;
+
+ case '?':
+ goto usage;
+
+ default:
+ BUG();
+ }
+
+ } while (1);
+
+ if (!type)
+ goto usage;
+
+ if (argc - optind < 1)
+ goto usage;
+
+ err = rlb_siginit();
+ if (err)
+ goto fail;
+
+ err = rlb_create(&_rlb, argv[optind++]);
+ if (err)
+ goto fail;
+
+ rlb = &_rlb;
+
+ rlb_argv_shift(&optind, &argc, &argv);
+
+ err = rlb_create_valve(rlb, &rlb->valve, type, argc, argv);
+ if (err) {
+ if (err == -EINVAL)
+ goto usage;
+ goto fail;
+ }
+
+ if (!debug) {
+ err = daemon(0, 0);
+ if (err)
+ goto fail;
+
+ stdin = stdout = stderr = NULL;
+ rlb_openlog(prog, LOG_DAEMON);
+ }
+
+ INFO("TD ratelimit bridge: %s, pid %d", rlb->path, getpid());
+
+ rlb_info(rlb);
+
+ err = rlb_main_run(rlb);
+
+ if (err)
+ INFO("Exiting with status %d", -err);
+
+fail:
+ if (rlb)
+ rlb_destroy(rlb);
+
+ return -err;
+
+usage:
+ rlb_usage(rlb, prog, stderr);
+ err = -EINVAL;
+ goto fail;
+}