#include <string.h>
#include <assert.h>
#include <unistd.h>
+#include <sys/mman.h>
#include "list.h"
#include "scheduler.h"
-#include "tapdisk-vbd.h"
+#include "tapdisk.h"
#include "tapdisk-server.h"
#include "tapdisk-disktype.h"
#define POLL_WRITE 1
#define MIN(a, b) ((a) < (b) ? (a) : (b))
+#define BUG(_cond) td_panic()
+#define BUG_ON(_cond) if (unlikely(_cond)) { td_panic(); }
-struct tapdisk_stream_poll {
- int pipe[2];
- int set;
-};
+#define TD_STREAM_MAX_REQS 16
+#define TD_STREAM_REQ_SIZE (sysconf(_SC_PAGE_SIZE) * 32)
+
+typedef struct tapdisk_stream_request td_stream_req_t;
+typedef struct tapdisk_stream td_stream_t;
struct tapdisk_stream_request {
- uint64_t sec;
- uint32_t secs;
- uint64_t seqno;
- blkif_request_t blkif_req;
- struct list_head next;
+ void *buf;
+ struct td_iovec iov;
+ td_vbd_request_t vreq;
+ struct list_head entry;
};
struct tapdisk_stream {
int err;
- uint64_t cur;
- uint64_t start;
- uint64_t end;
-
- uint64_t started;
- uint64_t completed;
+ td_sector_t sec_in;
+ td_sector_t sec_out;
+ uint64_t count;
- struct tapdisk_stream_poll poll;
- event_id_t enqueue_event_id;
-
- struct list_head free_list;
struct list_head pending_list;
struct list_head completed_list;
- struct tapdisk_stream_request requests[MAX_REQUESTS];
+ td_stream_req_t reqs[TD_STREAM_MAX_REQS];
+ td_stream_req_t *free[TD_STREAM_MAX_REQS];
+ int n_free;
+
+ int poll_fd;
+ event_id_t poll_id;
};
static unsigned int tapdisk_stream_count;
-static void tapdisk_stream_close_image(struct tapdisk_stream *);
+static void tapdisk_stream_close_image(td_stream_t *);
static void
usage(const char *app, int err)
exit(err);
}
-static inline void
-tapdisk_stream_poll_initialize(struct tapdisk_stream_poll *p)
+static inline int
+tapdisk_stream_stop(td_stream_t *s)
{
- p->set = 0;
- p->pipe[POLL_READ] = p->pipe[POLL_WRITE] = -1;
+ return (list_empty(&s->pending_list) && (!s->count || s->err));
}
static int
-tapdisk_stream_poll_open(struct tapdisk_stream_poll *p)
+tapdisk_stream_req_create(td_stream_req_t *req)
{
- int err;
-
- tapdisk_stream_poll_initialize(p);
+ int prot, flags;
- err = pipe(p->pipe);
- if (err)
- return -errno;
+ memset(req, 0, sizeof(*req));
+ INIT_LIST_HEAD(&req->entry);
- err = fcntl(p->pipe[POLL_READ], F_SETFL, O_NONBLOCK);
- if (err)
- goto out;
+ prot = PROT_READ|PROT_WRITE;
+ flags = MAP_ANONYMOUS|MAP_PRIVATE;
- err = fcntl(p->pipe[POLL_WRITE], F_SETFL, O_NONBLOCK);
- if (err)
- goto out;
+ req->buf = mmap(NULL, TD_STREAM_REQ_SIZE, prot, flags, -1, 0);
+ if (req->buf == MAP_FAILED) {
+ req->buf = NULL;
+ return -errno;
+ }
return 0;
-
-out:
- close(p->pipe[POLL_READ]);
- close(p->pipe[POLL_WRITE]);
- tapdisk_stream_poll_initialize(p);
- return -errno;
}
static void
-tapdisk_stream_poll_close(struct tapdisk_stream_poll *p)
+tapdisk_stream_req_destroy(td_stream_req_t *req)
{
- if (p->pipe[POLL_READ] != -1)
- close(p->pipe[POLL_READ]);
- if (p->pipe[POLL_WRITE] != -1)
- close(p->pipe[POLL_WRITE]);
- tapdisk_stream_poll_initialize(p);
+ if (req->buf) {
+ int err = munmap(req->iov.base, TD_STREAM_REQ_SIZE);
+ BUG_ON(err);
+ req->iov.base = NULL;
+ }
}
-static inline void
-tapdisk_stream_poll_clear(struct tapdisk_stream_poll *p)
+td_stream_req_t *
+tapdisk_stream_alloc_req(td_stream_t *s)
{
- int gcc, dummy;
+ td_stream_req_t *req = NULL;
- gcc = read(p->pipe[POLL_READ], &dummy, sizeof(dummy));
- p->set = 0;
-}
+ if (likely(s->n_free))
+ req = s->free[--s->n_free];
-static inline void
-tapdisk_stream_poll_set(struct tapdisk_stream_poll *p)
-{
- int dummy = 0;
-
- if (!p->set) {
- int gcc = write(p->pipe[POLL_WRITE], &dummy, sizeof(dummy));
- p->set = 1;
- }
+ return req;
}
-static inline int
-tapdisk_stream_stop(struct tapdisk_stream *s)
+void
+tapdisk_stream_free_req(td_stream_t *s, td_stream_req_t *req)
{
- return (list_empty(&s->pending_list) && (s->cur == s->end || s->err));
+ BUG_ON(s->n_free >= MAX_REQUESTS);
+ BUG_ON(!list_empty(&req->entry));
+ s->free[s->n_free++] = req;
}
-static inline void
-tapdisk_stream_initialize_request(struct tapdisk_stream_request *req)
+static void
+tapdisk_stream_destroy_reqs(td_stream_t *s)
{
- memset(req, 0, sizeof(*req));
- INIT_LIST_HEAD(&req->next);
-}
+ td_stream_req_t *req;
-static inline int
-tapdisk_stream_request_idx(struct tapdisk_stream *s,
- struct tapdisk_stream_request *req)
-{
- return (req - s->requests);
+ do {
+ req = tapdisk_stream_alloc_req(s);
+ if (!req)
+ break;
+
+ tapdisk_stream_req_destroy(req);
+ } while (1);
}
-static inline struct tapdisk_stream_request *
-tapdisk_stream_get_request(struct tapdisk_stream *s)
+static int
+tapdisk_stream_create_reqs(td_stream_t *s)
{
- struct tapdisk_stream_request *req;
+ size_t size;
+ void *buf;
+ int i, err;
- if (list_empty(&s->free_list))
- return NULL;
+ s->n_free = 0;
- req = list_entry(s->free_list.next,
- struct tapdisk_stream_request, next);
+ for (i = 0; i < TD_STREAM_MAX_REQS; i++) {
+ td_stream_req_t *req = &s->reqs[i];
- list_del_init(&req->next);
- tapdisk_stream_initialize_request(req);
+ err = tapdisk_stream_req_create(req);
+ if (err)
+ goto fail;
- return req;
+ tapdisk_stream_free_req(s, req);
+ }
+
+ return 0;
+
+fail:
+ tapdisk_stream_destroy_reqs(s);
+ return err;
}
-static void
-tapdisk_stream_print_request(struct tapdisk_stream *s,
- struct tapdisk_stream_request *sreq)
+static int
+tapdisk_stream_print_request(td_stream_t *s, td_stream_req_t *req)
{
- unsigned long idx = (unsigned long)tapdisk_stream_request_idx(s, sreq);
- char *buf = (char *)MMAP_VADDR(s->vbd->ring.vstart, idx, 0);
- int gcc = write(s->out_fd, buf, sreq->secs << SECTOR_SHIFT);
+ struct td_iovec *iov = &req->iov;
+
+ int gcc = write(s->out_fd, iov->base, iov->secs << SECTOR_SHIFT);
+
+ return iov->secs;
}
static void
-tapdisk_stream_write_data(struct tapdisk_stream *s)
+tapdisk_stream_write_data(td_stream_t *s)
{
- struct tapdisk_stream_request *sreq, *tmp;
+ td_stream_req_t *req, *next;
- list_for_each_entry_safe(sreq, tmp, &s->completed_list, next) {
- if (sreq->seqno != s->completed)
+ list_for_each_entry_safe(req, next, &s->completed_list, entry) {
+ if (req->vreq.sec != s->sec_out)
break;
- s->completed++;
- tapdisk_stream_print_request(s, sreq);
+ s->sec_out += tapdisk_stream_print_request(s, req);
- list_del_init(&sreq->next);
- list_add_tail(&sreq->next, &s->free_list);
+ list_del_init(&req->entry);
+ tapdisk_stream_free_req(s, req);
}
}
static inline void
-tapdisk_stream_queue_completed(struct tapdisk_stream *s,
- struct tapdisk_stream_request *sreq)
+tapdisk_stream_queue_completed(td_stream_t *s, td_stream_req_t *req)
{
- struct tapdisk_stream_request *itr;
+ td_stream_req_t *itr;
- list_for_each_entry(itr, &s->completed_list, next)
- if (sreq->seqno < itr->seqno) {
- list_add_tail(&sreq->next, &itr->next);
- return;
- }
+ list_for_each_entry(itr, &s->completed_list, entry)
+ if (req->vreq.sec < itr->vreq.sec)
+ break;
- list_add_tail(&sreq->next, &s->completed_list);
+ list_add_tail(&req->entry, &itr->entry);
}
static void
-tapdisk_stream_dequeue(void *arg, blkif_response_t *rsp)
+tapdisk_stream_complete_request(td_stream_t *s, td_stream_req_t *req,
+ int error, int final)
{
- struct tapdisk_stream *s = (struct tapdisk_stream *)arg;
- struct tapdisk_stream_request *sreq = s->requests + rsp->id;
-
- list_del_init(&sreq->next);
+ list_del_init(&req->entry);
- if (rsp->status == BLKIF_RSP_OKAY)
- tapdisk_stream_queue_completed(s, sreq);
+ if (likely(!error))
+ tapdisk_stream_queue_completed(s, req);
else {
s->err = EIO;
- list_add_tail(&sreq->next, &s->free_list);
- fprintf(stderr, "error reading sector 0x%llx\n", sreq->sec);
+ tapdisk_stream_free_req(s, req);
+ fprintf(stderr, "error reading sector 0x%llx\n", req->vreq.sec);
}
- tapdisk_stream_write_data(s);
- tapdisk_stream_poll_set(&s->poll);
-}
-
-static void
-tapdisk_stream_enqueue(event_id_t id, char mode, void *arg)
-{
- td_vbd_t *vbd;
- int i, idx, psize;
- struct tapdisk_stream *s = (struct tapdisk_stream *)arg;
+ if (!final)
+ return;
- vbd = s->vbd;
- tapdisk_stream_poll_clear(&s->poll);
+ tapdisk_stream_write_data(s);
if (tapdisk_stream_stop(s)) {
tapdisk_stream_close_image(s);
return;
}
- psize = getpagesize();
+ tapdisk_server_mask_event(s->poll_id, 0);
+}
- while (s->cur < s->end && !s->err) {
- blkif_request_t *breq;
- td_vbd_request_t *vreq;
- struct tapdisk_stream_request *sreq;
+static void
+__tapdisk_stream_request_cb(td_vbd_request_t *vreq, int error,
+ void *token, int final)
+{
+ td_stream_req_t *req = containerof(vreq, td_stream_req_t, vreq);
+ td_stream_t *s = token;
- sreq = tapdisk_stream_get_request(s);
- if (!sreq)
- break;
+ tapdisk_stream_complete_request(s, req, error, final);
+}
- idx = tapdisk_stream_request_idx(s, sreq);
+static void
+tapdisk_stream_queue_request(td_stream_t *s, td_stream_req_t *req)
+{
+ td_vbd_request_t *vreq;
+ struct td_iovec *iov;
+ int secs, err;
- sreq->sec = s->cur;
- sreq->secs = 0;
- sreq->seqno = s->started++;
+ iov = &req->iov;
+ secs = MIN(TD_STREAM_REQ_SIZE >> SECTOR_SHIFT, s->count);
- breq = &sreq->blkif_req;
- breq->id = idx;
- breq->nr_segments = 0;
- breq->sector_number = sreq->sec;
- breq->operation = BLKIF_OP_READ;
+ iov->base = req->buf;
+ iov->secs = secs;
- for (i = 0; i < BLKIF_MAX_SEGMENTS_PER_REQUEST; i++) {
- uint32_t secs = MIN(s->end - s->cur, psize >> SECTOR_SHIFT);
- struct blkif_request_segment *seg = breq->seg + i;
+ vreq = &req->vreq;
+ vreq->iov = iov;
+ vreq->iovcnt = 1;
+ vreq->sec = s->sec_in;
+ vreq->op = TD_OP_READ;
+ vreq->name = NULL;
+ vreq->token = s;
+ vreq->cb = __tapdisk_stream_request_cb;
- if (!secs)
- break;
+ s->count -= secs;
+ s->sec_in += secs;
- sreq->secs += secs;
- s->cur += secs;
+ err = tapdisk_vbd_queue_request(s->vbd, vreq);
+ if (err)
+ tapdisk_stream_complete_request(s, req, err, 1);
- seg->first_sect = 0;
- seg->last_sect = secs - 1;
- breq->nr_segments++;
- }
+ list_add_tail(&req->entry, &s->pending_list);
+}
- vreq = vbd->request_list + idx;
+static void
+tapdisk_stream_queue_requests(td_stream_t *s)
+{
- assert(list_empty(&vreq->next));
- assert(vreq->secs_pending == 0);
+ while (s->count && !s->err) {
+ td_stream_req_t *req;
- memcpy(&vreq->req, breq, sizeof(*breq));
- vbd->received++;
- vreq->vbd = vbd;
+ req = tapdisk_stream_alloc_req(s);
+ if (!req) {
+ tapdisk_server_mask_event(s->poll_id, 1);
+ break;
+ }
- tapdisk_vbd_move_request(vreq, &vbd->new_requests);
- list_add_tail(&sreq->next, &s->pending_list);
+ tapdisk_stream_queue_request(s, req);
}
-
- tapdisk_vbd_issue_requests(vbd);
}
static int
goto out;
}
- tapdisk_vbd_set_callback(s->vbd, tapdisk_stream_dequeue, s);
-
err = tapdisk_vbd_open_vdi(s->vbd, name, TD_OPEN_RDONLY, -1);
if (err)
goto out;
}
static void
-tapdisk_stream_close_image(struct tapdisk_stream *s)
+tapdisk_stream_close_image(td_stream_t *s)
{
td_vbd_t *vbd;
if (vbd) {
tapdisk_vbd_close_vdi(vbd);
tapdisk_server_remove_vbd(vbd);
- free((void *)vbd->ring.vstart);
free(vbd->name);
free(vbd);
s->vbd = NULL;
}
static int
-tapdisk_stream_set_position(struct tapdisk_stream *s,
+tapdisk_stream_set_position(td_stream_t *s,
uint64_t count, uint64_t skip)
{
int err;
return err;
}
- if (count == (uint64_t)-1)
+ if (count == -1LL)
count = info.size - skip;
if (count + skip > info.size) {
return -EINVAL;
}
- s->start = skip;
- s->cur = s->start;
- s->end = s->start + count;
+ s->sec_in = skip;
+ s->sec_out = skip;
+ s->count = count;
return 0;
}
-static int
-tapdisk_stream_initialize_requests(struct tapdisk_stream *s)
+void
+__tapdisk_stream_event_cb(event_id_t id, char mode, void *arg)
{
- size_t size;
- td_ring_t *ring;
- int err, i, psize;
-
- ring = &s->vbd->ring;
- psize = getpagesize();
- size = psize * BLKTAP_MMAP_REGION_SIZE;
+ td_stream_t *s = arg;
+ tapdisk_stream_queue_requests(s);
+}
- /* sneaky -- set up ring->vstart so tapdisk_vbd will use our buffers */
- err = posix_memalign((void **)&ring->vstart, psize, size);
- if (err) {
- fprintf(stderr, "failed to allocate buffers: %d\n", err);
- ring->vstart = 0;
- return err;
+static void
+tapdisk_stream_destroy_event(struct tapdisk_stream *s)
+{
+ if (s->poll_id >= 0) {
+ tapdisk_server_unregister_event(s->poll_id);
+ s->poll_id = 0;
}
- for (i = 0; i < MAX_REQUESTS; i++) {
- struct tapdisk_stream_request *req = s->requests + i;
- tapdisk_stream_initialize_request(req);
- list_add_tail(&req->next, &s->free_list);
+ if (s->poll_fd >= 0) {
+ close(s->poll_fd);
+ s->poll_fd = -1;
}
-
- return 0;
}
static int
-tapdisk_stream_register_enqueue_event(struct tapdisk_stream *s)
+tapdisk_stream_create_event(td_stream_t *s)
{
+ int fd[2];
int err;
- struct tapdisk_stream_poll *p = &s->poll;
- err = tapdisk_stream_poll_open(p);
- if (err)
- goto out;
+ s->poll_id = -1;
+ s->poll_fd = -1;
- err = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
- p->pipe[POLL_READ], 0,
- tapdisk_stream_enqueue, s);
- if (err < 0)
- goto out;
+ err = pipe(fd);
+ if (err) {
+ err = -errno;
+ goto fail;
+ }
- s->enqueue_event_id = err;
- err = 0;
+ s->poll_fd = fd[0];
-out:
+ err = close(fd[1]);
+ BUG_ON(err);
+
+ s->poll_id =
+ tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
+ s->poll_fd, 0,
+ __tapdisk_stream_event_cb, s);
+ if (s->poll_id < 0)
+ goto fail;
+
+ return 0;
+
+fail:
if (err)
fprintf(stderr, "failed to register event: %d\n", err);
- return err;
-}
-static void
-tapdisk_stream_unregister_enqueue_event(struct tapdisk_stream *s)
-{
- if (s->enqueue_event_id) {
- tapdisk_server_unregister_event(s->enqueue_event_id);
- s->enqueue_event_id = 0;
- }
- tapdisk_stream_poll_close(&s->poll);
-}
+ tapdisk_stream_destroy_event(s);
-static inline void
-tapdisk_stream_initialize(struct tapdisk_stream *s)
-{
- memset(s, 0, sizeof(*s));
- s->in_fd = s->out_fd = -1;
- INIT_LIST_HEAD(&s->free_list);
- INIT_LIST_HEAD(&s->pending_list);
- INIT_LIST_HEAD(&s->completed_list);
+ return err;
}
static int
return 0;
}
-static int
-tapdisk_stream_open(struct tapdisk_stream *s, const char *name,
- uint64_t count, uint64_t skip)
+static void
+tapdisk_stream_close(struct tapdisk_stream *s)
{
- int err;
+ tapdisk_stream_destroy_event(s);
- tapdisk_stream_initialize(s);
+ tapdisk_stream_destroy_reqs(s);
- err = tapdisk_stream_open_fds(s);
- if (err)
- return err;
+ tapdisk_stream_close_image(s);
- err = tapdisk_stream_open_image(s, name);
- if (err)
- return err;
+ if (s->out_fd >= 0) {
+ close(s->out_fd);
+ s->out_fd = -1;
+ }
+}
- err = tapdisk_stream_set_position(s, count, skip);
- if (err)
- return err;
+static int
+tapdisk_stream_open(struct tapdisk_stream *s, const char *name,
+ uint64_t count, uint64_t skip)
+{
+ int err = 0;
- err = tapdisk_stream_initialize_requests(s);
- if (err)
- return err;
+ memset(s, 0, sizeof(*s));
+ s->in_fd = s->out_fd = -1;
+ INIT_LIST_HEAD(&s->pending_list);
+ INIT_LIST_HEAD(&s->completed_list);
- err = tapdisk_stream_register_enqueue_event(s);
- if (err)
- return err;
+ if (!err)
+ err = tapdisk_stream_open_fds(s);
+ if (!err)
+ err = tapdisk_stream_open_image(s, name);
+ if (!err)
+ err = tapdisk_stream_set_position(s, count, skip);
+ if (!err)
+ err = tapdisk_stream_create_reqs(s);
+ if (!err)
+ err = tapdisk_stream_create_event(s);
- return 0;
-}
+ if (err)
+ tapdisk_stream_close(s);
-static void
-tapdisk_stream_release(struct tapdisk_stream *s)
-{
- close(s->out_fd);
- tapdisk_stream_close_image(s);
- tapdisk_stream_unregister_enqueue_event(s);
+ return err;
}
static int
tapdisk_stream_run(struct tapdisk_stream *s)
{
- tapdisk_stream_enqueue(s->enqueue_event_id, SCHEDULER_POLL_READ_FD, s);
+ tapdisk_stream_queue_requests(s);
tapdisk_server_run();
return s->err;
}
err = 0;
out:
- tapdisk_stream_release(&stream);
+ tapdisk_stream_close(&stream);
tapdisk_stop_logging();
return err;
}