#include "tapdisk-utils.h"
#include "libaio-compat.h"
-#include "blktap2.h"
-#include "tapdisk-storage.h"
#include "atomicio.h"
#define WARN(_f, _a...) tlog_write(TLOG_WARN, _f, ##_a)
*/
#define REQUEST_ASYNC_FD ((io_context_t)1)
-#define for_each_tiocb(_tiocb, _list) \
- list_for_each_entry(_tiocb, _list, entry)
+static inline void
+queue_tiocb(struct tqueue *queue, struct tiocb *tiocb)
+{
+ struct iocb *iocb = &tiocb->iocb;
+
+ if (queue->queued) {
+ struct tiocb *prev = (struct tiocb *)
+ queue->iocbs[queue->queued - 1]->data;
+ prev->next = tiocb;
+ }
+
+ queue->iocbs[queue->queued++] = iocb;
+}
+
+static inline int
+deferred_tiocbs(struct tqueue *queue)
+{
+ return (queue->deferred.head != NULL);
+}
+
+static inline void
+defer_tiocb(struct tqueue *queue, struct tiocb *tiocb)
+{
+ struct tlist *list = &queue->deferred;
+
+ if (!list->head)
+ list->head = list->tail = tiocb;
+ else
+ list->tail = list->tail->next = tiocb;
+
+ queue->tiocbs_deferred++;
+ queue->deferrals++;
+}
+
+static inline void
+queue_deferred_tiocb(struct tqueue *queue)
+{
+ struct tlist *list = &queue->deferred;
-#define for_each_tiocb_safe(_tiocb, _next, _list) \
- list_for_each_entry_safe(_tiocb, _next, _list, entry)
+ if (list->head) {
+ struct tiocb *tiocb = list->head;
+
+ list->head = tiocb->next;
+ if (!list->head)
+ list->tail = NULL;
+
+ queue_tiocb(queue, tiocb);
+ queue->tiocbs_deferred--;
+ }
+}
+
+static inline void
+queue_deferred_tiocbs(struct tqueue *queue)
+{
+ while (!tapdisk_queue_full(queue) && deferred_tiocbs(queue))
+ queue_deferred_tiocb(queue);
+}
/*
* td_complete may queue more tiocbs
int err;
struct iocb *iocb = &tiocb->iocb;
- list_del(&tiocb->entry);
- pthread_mutex_unlock(&queue->mutex);
-
if (res == iocb->u.c.nbytes)
err = 0;
else if ((int)res < 0)
err = -EIO;
tiocb->cb(tiocb->arg, tiocb, err);
-
- pthread_mutex_lock(&queue->mutex);
}
static int
cancel_tiocbs(struct tqueue *queue, int err)
{
- struct tiocb *tiocb, *next;
- int queued = queue->queued;
+ int queued;
+ struct tiocb *tiocb;
- for_each_tiocb_safe(tiocb, next, &queue->pending)
- complete_tiocb(queue, tiocb, err);
+ if (!queue->queued)
+ return 0;
+ /*
+ * td_complete may queue more tiocbs, which
+ * will overwrite the contents of queue->iocbs.
+ * use a private linked list to keep track
+ * of the tiocbs we're cancelling.
+ */
+ tiocb = queue->iocbs[0]->data;
+ queued = queue->queued;
queue->queued = 0;
+ for (; tiocb != NULL; tiocb = tiocb->next)
+ complete_tiocb(queue, tiocb, err);
+
return queued;
}
ep = rwio->aio_events + i;
iocb = queue->iocbs[i];
ep->obj = iocb;
-
- pthread_mutex_unlock(&queue->mutex);
ep->res = tapdisk_rwio_rw(iocb);
- pthread_mutex_lock(&queue->mutex);
}
split = io_split(&queue->opioctx, rwio->aio_events, merged);
complete_tiocb(queue, tiocb, ep->res);
}
+ queue_deferred_tiocbs(queue);
+
return split;
}
struct io_event *ep;
tapdisk_lio_ack_event(queue);
- pthread_mutex_lock(&queue->mutex);
lio = queue->tio_data;
ret = io_getevents(lio->aio_ctx, 0,
complete_tiocb(queue, tiocb, ep->res);
}
- pthread_mutex_unlock(&queue->mutex);
+ queue_deferred_tiocbs(queue);
}
static int
tapdisk_filter_iocbs(queue->filter, queue->iocbs, queue->queued);
merged = io_merge(&queue->opioctx, queue->iocbs, queue->queued);
tapdisk_lio_set_eventfd(queue, merged, queue->iocbs);
-
- pthread_mutex_unlock(&queue->mutex);
submitted = io_submit(lio->aio_ctx, merged, queue->iocbs);
- pthread_mutex_lock(&queue->mutex);
DBG("queued: %d, merged: %d, submitted: %d\n",
queue->queued, merged, submitted);
return err;
}
-static int
-tapdisk_queue_full(struct tqueue *queue)
-{
- return queue->tiocbs_pending + queue->queued >= queue->size;
-}
-
-static void
-tapdisk_queue_dequeue_tiocbs(struct tqueue *queue)
-{
- struct tiocb *tiocb, *next;
-
- /* NB. queue lock held */
-
- for_each_tiocb_safe(tiocb, next, &queue->waiting) {
- if (tapdisk_queue_full(queue))
- break;
-
- list_move_tail(&tiocb->entry, &queue->pending);
- queue->iocbs[queue->queued++] = &tiocb->iocb;
- }
-}
-
-/*
- * NB. Dispatcher thread.
- *
- * I/O may block. Not only synchronous I/O, but because I/O takes
- * memory, so even io_submit may block. The funny part about I/O
- * taking memory is that freeing memory typically takes I/O.
- *
- * Blktap makes sure that we're served from a bunch of private memory
- * pools to solve the general deadlock hazard. Unfortunately such
- * pools are not big enough to cover all tapdev I/O in flight.
- *
- * The consequence is that, once pools are exhausted, I/O submission
- * still blocks. It will immediately wake again once the pools are
- * refilled, but refilling them typically takes I/O completion.
- *
- * We run I/O completion from the main event loop. But submission
- * therefore must go separate.
- */
-
-static void*
-tapdisk_queue_thread_run(void *arg)
-{
- struct tqueue *queue = arg;
-
- pthread_mutex_lock(&queue->mutex);
- do {
- do {
- if (queue->closing)
- goto out;
-
- if (tapdisk_queue_full(queue))
- goto wait;
-
- if (!list_empty(&queue->waiting))
- break;
-
- wait:
- pthread_cond_wait(&queue->cond, &queue->mutex);
- } while (1);
-
- tapdisk_queue_dequeue_tiocbs(queue);
-
- queue->tio->tio_submit(queue);
- } while (1);
-
-out:
- pthread_mutex_unlock(&queue->mutex);
- return NULL;
-}
-
-static int
-tapdisk_queue_thread_start(struct tqueue *queue)
-{
- pthread_attr_t attr;
- int err;
-
- err = pthread_mutex_init(&queue->mutex, NULL);
- if (err)
- return -err;
-
- err = pthread_cond_init(&queue->cond, NULL);
- if (err)
- return -err;
-
- queue->closing = 0;
-
- pthread_attr_init(&attr);
- pthread_attr_setstacksize(&attr, 32<<10);
-
- err = pthread_create(&queue->thread, &attr,
- tapdisk_queue_thread_run, queue);
- if (err)
- return -err;
-
- return 0;
-}
-
-static void
-tapdisk_queue_thread_kick(struct tqueue *queue)
-{
- pthread_mutex_lock(&queue->mutex);
- pthread_cond_signal(&queue->cond);
- pthread_mutex_unlock(&queue->mutex);
-}
-
-static void
-tapdisk_queue_thread_exit(struct tqueue *queue)
-{
- if (queue->thread) {
- queue->closing = 1;
- tapdisk_queue_thread_kick(queue);
-
- pthread_join(queue->thread, NULL);
- queue->thread = 0;
- }
-}
-
int
tapdisk_init_queue(struct tqueue *queue, int size,
int drv, struct tfilter *filter)
queue->size = size;
queue->filter = filter;
- INIT_LIST_HEAD(&queue->waiting);
- INIT_LIST_HEAD(&queue->pending);
if (!size)
return 0;
if (err)
goto fail;
- err = tapdisk_queue_thread_start(queue);
- if (err)
- goto fail;
-
return 0;
fail:
{
tapdisk_queue_free_io(queue);
- tapdisk_queue_thread_exit(queue);
-
free(queue->iocbs);
queue->iocbs = NULL;
void
tapdisk_debug_queue(struct tqueue *queue)
{
- struct tiocb *tiocb;
+ struct tiocb *tiocb = queue->deferred.head;
WARN("TAPDISK QUEUE:\n");
WARN("size: %d, tio: %s, queued: %d, iocbs_pending: %d, "
- "tiocbs_pending: %d\n",
+ "tiocbs_pending: %d, tiocbs_deferred: %d, deferrals: %llx\n",
queue->size, queue->tio->name, queue->queued, queue->iocbs_pending,
- queue->tiocbs_pending);
-
- pthread_mutex_lock(&queue->mutex);
-
- WARN("pending:\n");
- for_each_tiocb(tiocb, &queue->pending) {
- struct iocb *io = &tiocb->iocb;
- WARN("%s of %lu bytes at %lld\n",
- (io->aio_lio_opcode == IO_CMD_PWRITE ?
- "write" : "read"),
- io->u.c.nbytes, io->u.c.offset);
- }
-
- WARN("deferred:\n");
- for_each_tiocb(tiocb, &queue->waiting) {
- struct iocb *io = &tiocb->iocb;
- WARN("%s of %lu bytes at %lld\n",
- (io->aio_lio_opcode == IO_CMD_PWRITE ?
- "write" : "read"),
- io->u.c.nbytes, io->u.c.offset);
+ queue->tiocbs_pending, queue->tiocbs_deferred, queue->deferrals);
+
+ if (tiocb) {
+ WARN("deferred:\n");
+ for (; tiocb != NULL; tiocb = tiocb->next) {
+ struct iocb *io = &tiocb->iocb;
+ WARN("%s of %lu bytes at %lld\n",
+ (io->aio_lio_opcode == IO_CMD_PWRITE ?
+ "write" : "read"),
+ io->u.c.nbytes, io->u.c.offset);
+ }
}
-
- pthread_mutex_unlock(&queue->mutex);
}
void
tapdisk_prep_tiocb(struct tiocb *tiocb, int fd, int rw, char *buf, size_t size,
- long long offset, int storage, td_queue_callback_t cb, void *arg)
+ long long offset, td_queue_callback_t cb, void *arg)
{
struct iocb *iocb = &tiocb->iocb;
iocb->data = tiocb;
tiocb->cb = cb;
tiocb->arg = arg;
-
- if (storage == TAPDISK_STORAGE_TYPE_EXT) {
- size_t block_size = 4<<10; /* should query the fs */
- tiocb->merge_limit = BLKTAP2_BIO_POOL_SIZE * block_size;
- } else
- /* contiguous storage:
- BLKTAP2_BIO_POOL_SIZE * BIO_MAX_PAGES * PAGE_SIZE */
- tiocb->merge_limit = SIZE_MAX;
+ tiocb->next = NULL;
}
void
tapdisk_queue_tiocb(struct tqueue *queue, struct tiocb *tiocb)
{
- pthread_mutex_lock(&queue->mutex);
- list_add_tail(&tiocb->entry, &queue->waiting);
- pthread_mutex_unlock(&queue->mutex);
+ if (!tapdisk_queue_full(queue))
+ queue_tiocb(queue, tiocb);
+ else
+ defer_tiocb(queue, tiocb);
}
-void
+
+/*
+ * fail_tiocbs may queue more tiocbs
+ */
+int
+tapdisk_submit_tiocbs(struct tqueue *queue)
+{
+ return queue->tio->tio_submit(queue);
+}
+
+int
tapdisk_submit_all_tiocbs(struct tqueue *queue)
{
- if (!list_empty(&queue->waiting))
- tapdisk_queue_thread_kick(queue);
+ int submitted = 0;
+
+ do {
+ submitted += tapdisk_submit_tiocbs(queue);
+ } while (!tapdisk_queue_empty(queue));
+
+ return submitted;
+}
+
+/*
+ * cancel_tiocbs may queue more tiocbs
+ */
+int
+tapdisk_cancel_tiocbs(struct tqueue *queue)
+{
+ return cancel_tiocbs(queue, -EIO);
+}
+
+int
+tapdisk_cancel_all_tiocbs(struct tqueue *queue)
+{
+ int cancelled = 0;
+
+ do {
+ cancelled += tapdisk_cancel_tiocbs(queue);
+ } while (!tapdisk_queue_empty(queue));
+
+ return cancelled;
}