*/
#define REQUEST_ASYNC_FD ((io_context_t)1)
-static inline void
-queue_tiocb(struct tqueue *queue, struct tiocb *tiocb)
-{
- struct iocb *iocb = &tiocb->iocb;
-
- if (queue->queued) {
- struct tiocb *prev = (struct tiocb *)
- queue->iocbs[queue->queued - 1]->data;
- prev->next = tiocb;
- }
-
- queue->iocbs[queue->queued++] = iocb;
-}
-
-static inline int
-deferred_tiocbs(struct tqueue *queue)
-{
- return (queue->deferred.head != NULL);
-}
-
-static inline void
-defer_tiocb(struct tqueue *queue, struct tiocb *tiocb)
-{
- struct tlist *list = &queue->deferred;
-
- if (!list->head)
- list->head = list->tail = tiocb;
- else
- list->tail = list->tail->next = tiocb;
-
- queue->tiocbs_deferred++;
- queue->deferrals++;
-}
-
-static inline void
-queue_deferred_tiocb(struct tqueue *queue)
-{
- struct tlist *list = &queue->deferred;
-
- if (list->head) {
- struct tiocb *tiocb = list->head;
-
- list->head = tiocb->next;
- if (!list->head)
- list->tail = NULL;
-
- queue_tiocb(queue, tiocb);
- queue->tiocbs_deferred--;
- }
-}
+#define for_each_tiocb(_tiocb, _list) \
+ list_for_each_entry(_tiocb, _list, entry)
-static inline void
-queue_deferred_tiocbs(struct tqueue *queue)
-{
- while (!tapdisk_queue_full(queue) && deferred_tiocbs(queue))
- queue_deferred_tiocb(queue);
-}
+#define for_each_tiocb_safe(_tiocb, _next, _list) \
+ list_for_each_entry_safe(_tiocb, _next, _list, entry)
/*
* td_complete may queue more tiocbs
int err;
struct iocb *iocb = &tiocb->iocb;
+ list_del(&tiocb->entry);
+ pthread_mutex_unlock(&queue->mutex);
+
if (res == iocb->u.c.nbytes)
err = 0;
else if ((int)res < 0)
err = -EIO;
tiocb->cb(tiocb->arg, tiocb, err);
+
+ pthread_mutex_lock(&queue->mutex);
}
static int
cancel_tiocbs(struct tqueue *queue, int err)
{
- int queued;
- struct tiocb *tiocb;
+ struct tiocb *tiocb, *next;
+ int queued = queue->queued;
- if (!queue->queued)
- return 0;
+ for_each_tiocb_safe(tiocb, next, &queue->pending)
+ complete_tiocb(queue, tiocb, err);
- /*
- * td_complete may queue more tiocbs, which
- * will overwrite the contents of queue->iocbs.
- * use a private linked list to keep track
- * of the tiocbs we're cancelling.
- */
- tiocb = queue->iocbs[0]->data;
- queued = queue->queued;
queue->queued = 0;
- for (; tiocb != NULL; tiocb = tiocb->next)
- complete_tiocb(queue, tiocb, err);
-
return queued;
}
ep = rwio->aio_events + i;
iocb = queue->iocbs[i];
ep->obj = iocb;
+
+ pthread_mutex_unlock(&queue->mutex);
ep->res = tapdisk_rwio_rw(iocb);
+ pthread_mutex_lock(&queue->mutex);
}
split = io_split(&queue->opioctx, rwio->aio_events, merged);
complete_tiocb(queue, tiocb, ep->res);
}
- queue_deferred_tiocbs(queue);
-
return split;
}
struct io_event *ep;
tapdisk_lio_ack_event(queue);
+ pthread_mutex_lock(&queue->mutex);
lio = queue->tio_data;
ret = io_getevents(lio->aio_ctx, 0,
complete_tiocb(queue, tiocb, ep->res);
}
- queue_deferred_tiocbs(queue);
+ pthread_mutex_unlock(&queue->mutex);
}
static int
tapdisk_filter_iocbs(queue->filter, queue->iocbs, queue->queued);
merged = io_merge(&queue->opioctx, queue->iocbs, queue->queued);
tapdisk_lio_set_eventfd(queue, merged, queue->iocbs);
+
+ pthread_mutex_unlock(&queue->mutex);
submitted = io_submit(lio->aio_ctx, merged, queue->iocbs);
+ pthread_mutex_lock(&queue->mutex);
DBG("queued: %d, merged: %d, submitted: %d\n",
queue->queued, merged, submitted);
return err;
}
+static int
+tapdisk_queue_full(struct tqueue *queue)
+{
+ return queue->tiocbs_pending + queue->queued >= queue->size;
+}
+
+static void
+tapdisk_queue_dequeue_tiocbs(struct tqueue *queue)
+{
+ struct tiocb *tiocb, *next;
+
+ /* NB. queue lock held */
+
+ for_each_tiocb_safe(tiocb, next, &queue->waiting) {
+ if (tapdisk_queue_full(queue))
+ break;
+
+ list_move_tail(&tiocb->entry, &queue->pending);
+ queue->iocbs[queue->queued++] = &tiocb->iocb;
+ }
+}
+
+/*
+ * NB. Dispatcher thread.
+ *
+ * I/O may block. Not only synchronous I/O, but because I/O takes
+ * memory, so even io_submit may block. The funny part about I/O
+ * taking memory is that freeing memory typically takes I/O.
+ *
+ * Blktap makes sure that we're served from a bunch of private memory
+ * pools to solve the general deadlock hazard. Unfortunately such
+ * pools are not big enough to cover all tapdev I/O in flight.
+ *
+ * The consequence is that, once pools are exhausted, I/O submission
+ * still blocks. It will immediately wake again once the pools are
+ * refilled, but refilling them typically takes I/O completion.
+ *
+ * We run I/O completion from the main event loop. But submission
+ * therefore must go separate.
+ */
+
+static void*
+tapdisk_queue_thread_run(void *arg)
+{
+ struct tqueue *queue = arg;
+
+ pthread_mutex_lock(&queue->mutex);
+ do {
+ do {
+ if (queue->closing)
+ goto out;
+
+ if (tapdisk_queue_full(queue))
+ goto wait;
+
+ if (!list_empty(&queue->waiting))
+ break;
+
+ wait:
+ pthread_cond_wait(&queue->cond, &queue->mutex);
+ } while (1);
+
+ tapdisk_queue_dequeue_tiocbs(queue);
+
+ queue->tio->tio_submit(queue);
+ } while (1);
+
+out:
+ pthread_mutex_unlock(&queue->mutex);
+ return NULL;
+}
+
+static int
+tapdisk_queue_thread_start(struct tqueue *queue)
+{
+ int err;
+
+ err = pthread_mutex_init(&queue->mutex, NULL);
+ if (err)
+ return -err;
+
+ err = pthread_cond_init(&queue->cond, NULL);
+ if (err)
+ return -err;
+
+ queue->closing = 0;
+
+ err = pthread_create(&queue->thread, NULL,
+ tapdisk_queue_thread_run, queue);
+ if (err)
+ return -err;
+
+ return 0;
+}
+
+static void
+tapdisk_queue_thread_kick(struct tqueue *queue)
+{
+ pthread_mutex_lock(&queue->mutex);
+ pthread_cond_signal(&queue->cond);
+ pthread_mutex_unlock(&queue->mutex);
+}
+
+static void
+tapdisk_queue_thread_exit(struct tqueue *queue)
+{
+ if (queue->thread) {
+ queue->closing = 1;
+ tapdisk_queue_thread_kick(queue);
+
+ pthread_join(queue->thread, NULL);
+ queue->thread = 0;
+ }
+}
+
int
tapdisk_init_queue(struct tqueue *queue, int size,
int drv, struct tfilter *filter)
queue->size = size;
queue->filter = filter;
+ INIT_LIST_HEAD(&queue->waiting);
+ INIT_LIST_HEAD(&queue->pending);
if (!size)
return 0;
if (err)
goto fail;
+ err = tapdisk_queue_thread_start(queue);
+ if (err)
+ goto fail;
+
return 0;
fail:
{
tapdisk_queue_free_io(queue);
+ tapdisk_queue_thread_exit(queue);
+
free(queue->iocbs);
queue->iocbs = NULL;
void
tapdisk_debug_queue(struct tqueue *queue)
{
- struct tiocb *tiocb = queue->deferred.head;
+ struct tiocb *tiocb;
WARN("TAPDISK QUEUE:\n");
WARN("size: %d, tio: %s, queued: %d, iocbs_pending: %d, "
- "tiocbs_pending: %d, tiocbs_deferred: %d, deferrals: %llx\n",
+ "tiocbs_pending: %d\n",
queue->size, queue->tio->name, queue->queued, queue->iocbs_pending,
- queue->tiocbs_pending, queue->tiocbs_deferred, queue->deferrals);
-
- if (tiocb) {
- WARN("deferred:\n");
- for (; tiocb != NULL; tiocb = tiocb->next) {
- struct iocb *io = &tiocb->iocb;
- WARN("%s of %lu bytes at %lld\n",
- (io->aio_lio_opcode == IO_CMD_PWRITE ?
- "write" : "read"),
- io->u.c.nbytes, io->u.c.offset);
- }
+ queue->tiocbs_pending);
+
+ pthread_mutex_lock(&queue->mutex);
+
+ WARN("pending:\n");
+ for_each_tiocb(tiocb, &queue->pending) {
+ struct iocb *io = &tiocb->iocb;
+ WARN("%s of %lu bytes at %lld\n",
+ (io->aio_lio_opcode == IO_CMD_PWRITE ?
+ "write" : "read"),
+ io->u.c.nbytes, io->u.c.offset);
+ }
+
+ WARN("deferred:\n");
+ for_each_tiocb(tiocb, &queue->waiting) {
+ struct iocb *io = &tiocb->iocb;
+ WARN("%s of %lu bytes at %lld\n",
+ (io->aio_lio_opcode == IO_CMD_PWRITE ?
+ "write" : "read"),
+ io->u.c.nbytes, io->u.c.offset);
}
+
+ pthread_mutex_unlock(&queue->mutex);
}
void
iocb->data = tiocb;
tiocb->cb = cb;
tiocb->arg = arg;
- tiocb->next = NULL;
}
void
tapdisk_queue_tiocb(struct tqueue *queue, struct tiocb *tiocb)
{
- if (!tapdisk_queue_full(queue))
- queue_tiocb(queue, tiocb);
- else
- defer_tiocb(queue, tiocb);
-}
-
-
-/*
- * fail_tiocbs may queue more tiocbs
- */
-int
-tapdisk_submit_tiocbs(struct tqueue *queue)
-{
- return queue->tio->tio_submit(queue);
+ pthread_mutex_lock(&queue->mutex);
+ list_add_tail(&tiocb->entry, &queue->waiting);
+ pthread_mutex_unlock(&queue->mutex);
}
-int
+void
tapdisk_submit_all_tiocbs(struct tqueue *queue)
{
- int submitted = 0;
-
- do {
- submitted += tapdisk_submit_tiocbs(queue);
- } while (!tapdisk_queue_empty(queue));
-
- return submitted;
-}
-
-/*
- * cancel_tiocbs may queue more tiocbs
- */
-int
-tapdisk_cancel_tiocbs(struct tqueue *queue)
-{
- return cancel_tiocbs(queue, -EIO);
-}
-
-int
-tapdisk_cancel_all_tiocbs(struct tqueue *queue)
-{
- int cancelled = 0;
-
- do {
- cancelled += tapdisk_cancel_tiocbs(queue);
- } while (!tapdisk_queue_empty(queue));
-
- return cancelled;
+ if (!list_empty(&queue->waiting))
+ tapdisk_queue_thread_kick(queue);
}