]> xenbits.xensource.com Git - people/dstodden/blktap.git/commitdiff
CA-44322: Add an I/O-submit thread.
authorDaniel Stodden <daniel.stodden@citrix.com>
Thu, 30 Sep 2010 21:01:45 +0000 (14:01 -0700)
committerDaniel Stodden <daniel.stodden@citrix.com>
Thu, 30 Sep 2010 21:01:45 +0000 (14:01 -0700)
Slightly annoying to add threads to core blktap code, but necessary to
avoid potential starvation when dom0 gets under memory
pressure. Blktap can guarantee io_submit makes progress by keeping
memory reserves, but not enough to guarantee that it's
non-blocking. To refill the reserves, we want completion of in-flight
I/O the main even loop.

Signed-off-by: Daniel Stodden <daniel.stodden@citrix.comm>
drivers/tapdisk-queue.c
drivers/tapdisk-queue.h

index 1353b10812afe47ac6c400be0b11f72995acf334..9d5a2c8d16b84a0343b4ad663e84ef6ae30845cd 100644 (file)
  */
 #define REQUEST_ASYNC_FD ((io_context_t)1)
 
-static inline void
-queue_tiocb(struct tqueue *queue, struct tiocb *tiocb)
-{
-       struct iocb *iocb = &tiocb->iocb;
-
-       if (queue->queued) {
-               struct tiocb *prev = (struct tiocb *)
-                       queue->iocbs[queue->queued - 1]->data;
-               prev->next = tiocb;
-       }
-
-       queue->iocbs[queue->queued++] = iocb;
-}
-
-static inline int
-deferred_tiocbs(struct tqueue *queue)
-{
-       return (queue->deferred.head != NULL);
-}
-
-static inline void
-defer_tiocb(struct tqueue *queue, struct tiocb *tiocb)
-{
-       struct tlist *list = &queue->deferred;
-
-       if (!list->head)
-               list->head = list->tail = tiocb;
-       else
-               list->tail = list->tail->next = tiocb;
-
-       queue->tiocbs_deferred++;
-       queue->deferrals++;
-}
-
-static inline void
-queue_deferred_tiocb(struct tqueue *queue)
-{
-       struct tlist *list = &queue->deferred;
-
-       if (list->head) {
-               struct tiocb *tiocb = list->head;
-
-               list->head = tiocb->next;
-               if (!list->head)
-                       list->tail = NULL;
-
-               queue_tiocb(queue, tiocb);
-               queue->tiocbs_deferred--;
-       }
-}
+#define for_each_tiocb(_tiocb, _list) \
+       list_for_each_entry(_tiocb, _list, entry)
 
-static inline void
-queue_deferred_tiocbs(struct tqueue *queue)
-{
-       while (!tapdisk_queue_full(queue) && deferred_tiocbs(queue))
-               queue_deferred_tiocb(queue);
-}
+#define for_each_tiocb_safe(_tiocb, _next, _list) \
+       list_for_each_entry_safe(_tiocb, _next, _list, entry)
 
 /*
  * td_complete may queue more tiocbs
@@ -124,6 +72,9 @@ complete_tiocb(struct tqueue *queue, struct tiocb *tiocb, unsigned long res)
        int err;
        struct iocb *iocb = &tiocb->iocb;
 
+       list_del(&tiocb->entry);
+       pthread_mutex_unlock(&queue->mutex);
+
        if (res == iocb->u.c.nbytes)
                err = 0;
        else if ((int)res < 0)
@@ -132,30 +83,21 @@ complete_tiocb(struct tqueue *queue, struct tiocb *tiocb, unsigned long res)
                err = -EIO;
 
        tiocb->cb(tiocb->arg, tiocb, err);
+
+       pthread_mutex_lock(&queue->mutex);
 }
 
 static int
 cancel_tiocbs(struct tqueue *queue, int err)
 {
-       int queued;
-       struct tiocb *tiocb;
+       struct tiocb *tiocb, *next;
+       int queued = queue->queued;
 
-       if (!queue->queued)
-               return 0;
+       for_each_tiocb_safe(tiocb, next, &queue->pending)
+               complete_tiocb(queue, tiocb, err);
 
-       /* 
-        * td_complete may queue more tiocbs, which
-        * will overwrite the contents of queue->iocbs.
-        * use a private linked list to keep track
-        * of the tiocbs we're cancelling. 
-        */
-       tiocb  = queue->iocbs[0]->data;
-       queued = queue->queued;
        queue->queued = 0;
 
-       for (; tiocb != NULL; tiocb = tiocb->next)
-               complete_tiocb(queue, tiocb, err);
-
        return queued;
 }
 
@@ -245,7 +187,10 @@ tapdisk_rwio_submit(struct tqueue *queue)
                ep      = rwio->aio_events + i;
                iocb    = queue->iocbs[i];
                ep->obj = iocb;
+
+               pthread_mutex_unlock(&queue->mutex);
                ep->res = tapdisk_rwio_rw(iocb);
+               pthread_mutex_lock(&queue->mutex);
        }
 
        split = io_split(&queue->opioctx, rwio->aio_events, merged);
@@ -257,8 +202,6 @@ tapdisk_rwio_submit(struct tqueue *queue)
                complete_tiocb(queue, tiocb, ep->res);
        }
 
-       queue_deferred_tiocbs(queue);
-
        return split;
 }
 
@@ -448,6 +391,7 @@ tapdisk_lio_event(event_id_t id, char mode, void *private)
        struct io_event *ep;
 
        tapdisk_lio_ack_event(queue);
+       pthread_mutex_lock(&queue->mutex);
 
        lio   = queue->tio_data;
        ret   = io_getevents(lio->aio_ctx, 0,
@@ -466,7 +410,7 @@ tapdisk_lio_event(event_id_t id, char mode, void *private)
                complete_tiocb(queue, tiocb, ep->res);
        }
 
-       queue_deferred_tiocbs(queue);
+       pthread_mutex_unlock(&queue->mutex);
 }
 
 static int
@@ -516,7 +460,10 @@ tapdisk_lio_submit(struct tqueue *queue)
        tapdisk_filter_iocbs(queue->filter, queue->iocbs, queue->queued);
        merged    = io_merge(&queue->opioctx, queue->iocbs, queue->queued);
        tapdisk_lio_set_eventfd(queue, merged, queue->iocbs);
+
+       pthread_mutex_unlock(&queue->mutex);
        submitted = io_submit(lio->aio_ctx, merged, queue->iocbs);
+       pthread_mutex_lock(&queue->mutex);
 
        DBG("queued: %d, merged: %d, submitted: %d\n",
            queue->queued, merged, submitted);
@@ -603,6 +550,121 @@ fail:
        return err;
 }
 
+static int
+tapdisk_queue_full(struct tqueue *queue)
+{
+       return queue->tiocbs_pending + queue->queued >= queue->size;
+}
+
+static void
+tapdisk_queue_dequeue_tiocbs(struct tqueue *queue)
+{
+       struct tiocb *tiocb, *next;
+
+       /* NB. queue lock held */
+
+       for_each_tiocb_safe(tiocb, next, &queue->waiting) {
+               if (tapdisk_queue_full(queue))
+                       break;
+
+               list_move_tail(&tiocb->entry, &queue->pending);
+               queue->iocbs[queue->queued++] = &tiocb->iocb;
+       }
+}
+
+/*
+ * NB. Dispatcher thread.
+ *
+ * I/O may block. Not only synchronous I/O, but because I/O takes
+ * memory, so even io_submit may block. The funny part about I/O
+ * taking memory is that freeing memory typically takes I/O.
+ *
+ * Blktap makes sure that we're served from a bunch of private memory
+ * pools to solve the general deadlock hazard. Unfortunately such
+ * pools are not big enough to cover all tapdev I/O in flight.
+ *
+ * The consequence is that, once pools are exhausted, I/O submission
+ * still blocks. It will immediately wake again once the pools are
+ * refilled, but refilling them typically takes I/O completion.
+ *
+ * We run I/O completion from the main event loop. But submission
+ * therefore must go separate.
+ */
+
+static void*
+tapdisk_queue_thread_run(void *arg)
+{
+       struct tqueue *queue = arg;
+
+       pthread_mutex_lock(&queue->mutex);
+       do {
+               do {
+                       if (queue->closing)
+                               goto out;
+
+                       if (tapdisk_queue_full(queue))
+                               goto wait;
+
+                       if (!list_empty(&queue->waiting))
+                               break;
+
+               wait:
+                       pthread_cond_wait(&queue->cond, &queue->mutex);
+               } while (1);
+
+               tapdisk_queue_dequeue_tiocbs(queue);
+
+               queue->tio->tio_submit(queue);
+       } while (1);
+
+out:
+       pthread_mutex_unlock(&queue->mutex);
+       return NULL;
+}
+
+static int
+tapdisk_queue_thread_start(struct tqueue *queue)
+{
+       int err;
+
+       err = pthread_mutex_init(&queue->mutex, NULL);
+       if (err)
+               return -err;
+
+       err = pthread_cond_init(&queue->cond, NULL);
+       if (err)
+               return -err;
+
+       queue->closing = 0;
+
+       err = pthread_create(&queue->thread, NULL,
+                            tapdisk_queue_thread_run, queue);
+       if (err)
+               return -err;
+
+       return 0;
+}
+
+static void
+tapdisk_queue_thread_kick(struct tqueue *queue)
+{
+       pthread_mutex_lock(&queue->mutex);
+       pthread_cond_signal(&queue->cond);
+       pthread_mutex_unlock(&queue->mutex);
+}
+
+static void
+tapdisk_queue_thread_exit(struct tqueue *queue)
+{
+       if (queue->thread) {
+               queue->closing = 1;
+               tapdisk_queue_thread_kick(queue);
+
+               pthread_join(queue->thread, NULL);
+               queue->thread = 0;
+       }
+}
+
 int
 tapdisk_init_queue(struct tqueue *queue, int size,
                   int drv, struct tfilter *filter)
@@ -613,6 +675,8 @@ tapdisk_init_queue(struct tqueue *queue, int size,
 
        queue->size   = size;
        queue->filter = filter;
+       INIT_LIST_HEAD(&queue->waiting);
+       INIT_LIST_HEAD(&queue->pending);
 
        if (!size)
                return 0;
@@ -631,6 +695,10 @@ tapdisk_init_queue(struct tqueue *queue, int size,
        if (err)
                goto fail;
 
+       err = tapdisk_queue_thread_start(queue);
+       if (err)
+               goto fail;
+
        return 0;
 
  fail:
@@ -643,6 +711,8 @@ tapdisk_free_queue(struct tqueue *queue)
 {
        tapdisk_queue_free_io(queue);
 
+       tapdisk_queue_thread_exit(queue);
+
        free(queue->iocbs);
        queue->iocbs = NULL;
 
@@ -652,24 +722,35 @@ tapdisk_free_queue(struct tqueue *queue)
 void 
 tapdisk_debug_queue(struct tqueue *queue)
 {
-       struct tiocb *tiocb = queue->deferred.head;
+       struct tiocb *tiocb;
 
        WARN("TAPDISK QUEUE:\n");
        WARN("size: %d, tio: %s, queued: %d, iocbs_pending: %d, "
-            "tiocbs_pending: %d, tiocbs_deferred: %d, deferrals: %llx\n",
+            "tiocbs_pending: %d\n",
             queue->size, queue->tio->name, queue->queued, queue->iocbs_pending,
-            queue->tiocbs_pending, queue->tiocbs_deferred, queue->deferrals);
-
-       if (tiocb) {
-               WARN("deferred:\n");
-               for (; tiocb != NULL; tiocb = tiocb->next) {
-                       struct iocb *io = &tiocb->iocb;
-                       WARN("%s of %lu bytes at %lld\n",
-                            (io->aio_lio_opcode == IO_CMD_PWRITE ?
-                             "write" : "read"),
-                            io->u.c.nbytes, io->u.c.offset);
-               }
+            queue->tiocbs_pending);
+
+       pthread_mutex_lock(&queue->mutex);
+
+       WARN("pending:\n");
+       for_each_tiocb(tiocb, &queue->pending) {
+               struct iocb *io = &tiocb->iocb;
+               WARN("%s of %lu bytes at %lld\n",
+                    (io->aio_lio_opcode == IO_CMD_PWRITE ?
+                     "write" : "read"),
+                    io->u.c.nbytes, io->u.c.offset);
+       }
+
+       WARN("deferred:\n");
+       for_each_tiocb(tiocb, &queue->waiting) {
+               struct iocb *io = &tiocb->iocb;
+               WARN("%s of %lu bytes at %lld\n",
+                    (io->aio_lio_opcode == IO_CMD_PWRITE ?
+                     "write" : "read"),
+                    io->u.c.nbytes, io->u.c.offset);
        }
+
+       pthread_mutex_unlock(&queue->mutex);
 }
 
 void
@@ -686,57 +767,19 @@ tapdisk_prep_tiocb(struct tiocb *tiocb, int fd, int rw, char *buf, size_t size,
        iocb->data  = tiocb;
        tiocb->cb   = cb;
        tiocb->arg  = arg;
-       tiocb->next = NULL;
 }
 
 void
 tapdisk_queue_tiocb(struct tqueue *queue, struct tiocb *tiocb)
 {
-       if (!tapdisk_queue_full(queue))
-               queue_tiocb(queue, tiocb);
-       else
-               defer_tiocb(queue, tiocb);
-}
-
-
-/*
- * fail_tiocbs may queue more tiocbs
- */
-int
-tapdisk_submit_tiocbs(struct tqueue *queue)
-{
-       return queue->tio->tio_submit(queue);
+       pthread_mutex_lock(&queue->mutex);
+       list_add_tail(&tiocb->entry, &queue->waiting);
+       pthread_mutex_unlock(&queue->mutex);
 }
 
-int
+void
 tapdisk_submit_all_tiocbs(struct tqueue *queue)
 {
-       int submitted = 0;
-
-       do {
-               submitted += tapdisk_submit_tiocbs(queue);
-       } while (!tapdisk_queue_empty(queue));
-
-       return submitted;
-}
-
-/*
- * cancel_tiocbs may queue more tiocbs
- */
-int
-tapdisk_cancel_tiocbs(struct tqueue *queue)
-{
-       return cancel_tiocbs(queue, -EIO);
-}
-
-int
-tapdisk_cancel_all_tiocbs(struct tqueue *queue)
-{
-       int cancelled = 0;
-
-       do {
-               cancelled += tapdisk_cancel_tiocbs(queue);
-       } while (!tapdisk_queue_empty(queue));
-
-       return cancelled;
+       if (!list_empty(&queue->waiting))
+               tapdisk_queue_thread_kick(queue);
 }
index 9892c0dceb3182b8dc5207d8fa9f47ffbf8c5a50..ef386b2f7c70ec6755ee5a9d3b2dccd1b64171fa 100644 (file)
@@ -6,6 +6,7 @@
 #define TAPDISK_QUEUE_H
 
 #include <libaio.h>
+#include <pthread.h>
 
 #include "io-optimize.h"
 #include "scheduler.h"
@@ -15,18 +16,12 @@ struct tfilter;
 
 typedef void (*td_queue_callback_t)(void *arg, struct tiocb *, int err);
 
-
 struct tiocb {
        td_queue_callback_t   cb;
        void                 *arg;
 
        struct iocb           iocb;
-       struct tiocb         *next;
-};
-
-struct tlist {
-       struct tiocb         *head;
-       struct tiocb         *tail;
+       struct list_head      entry;
 };
 
 struct tqueue {
@@ -37,27 +32,24 @@ struct tqueue {
 
        struct opioctx        opioctx;
 
-       int                   queued;
        struct iocb         **iocbs;
+       int                   queued;
 
-       /* number of iocbs pending in the aio layer */
-       int                   iocbs_pending;
+       struct list_head      waiting; /* tiocbs deferred */
+       struct list_head      pending; /* tiocbs submitted */
 
-       /* number of tiocbs pending in the queue -- 
-        * this is likely to be larger than iocbs_pending 
-        * due to request coalescing */
+       /* iocbs <= tiocbs pending, due to coalescing */
+       int                   iocbs_pending;
        int                   tiocbs_pending;
 
-       /* iocbs may be deferred if the aio ring is full.
-        * tapdisk_queue_complete will ensure deferred
-        * iocbs are queued as slots become available. */
-       struct tlist          deferred;
-       int                   tiocbs_deferred;
-
        /* optional tapdisk filter */
        struct tfilter       *filter;
 
-       uint64_t              deferrals;
+       /* tio_submit thread */
+       pthread_t             thread;
+       pthread_cond_t        cond;
+       pthread_mutex_t       mutex;
+       int                   closing;
 };
 
 struct tio {
@@ -83,16 +75,12 @@ enum {
  * The *_all_tiocbs variants will handle the first two cases;
  * be sure to call submit after calling complete in the third case.
  */
-#define tapdisk_queue_count(q) ((q)->queued)
-#define tapdisk_queue_empty(q) ((q)->queued == 0)
-#define tapdisk_queue_full(q)  \
-       (((q)->tiocbs_pending + (q)->queued) >= (q)->size)
 int tapdisk_init_queue(struct tqueue *, int size, int drv, struct tfilter *);
 void tapdisk_free_queue(struct tqueue *);
 void tapdisk_debug_queue(struct tqueue *);
 void tapdisk_queue_tiocb(struct tqueue *, struct tiocb *);
 int tapdisk_submit_tiocbs(struct tqueue *);
-int tapdisk_submit_all_tiocbs(struct tqueue *);
+void tapdisk_submit_all_tiocbs(struct tqueue *);
 int tapdisk_cancel_tiocbs(struct tqueue *);
 int tapdisk_cancel_all_tiocbs(struct tqueue *);
 void tapdisk_prep_tiocb(struct tiocb *, int, int, char *, size_t,