ia64/xen-unstable

view tools/blktap2/drivers/tapdisk-queue.c @ 19647:1c627434605e

blktap2: a completely rewritten blktap implementation

Benefits to blktap2 over the old version of blktap:

* Isolation from xenstore - Blktap devices are now created directly on
the linux dom0 command line, rather than being spawned in response
to XenStore events. This is handy for debugging, makes blktap
generally easier to work with, and is a step toward a generic
user-level block device implementation that is not Xen-specific.

* Improved tapdisk infrastructure: simpler request forwarding, new
request scheduler, request merging, more efficient use of AIO.

* Improved tapdisk error handling and memory management. No
allocations on the block data path, IO retry logic to protect
guests
transient block device failures. This has been tested and is known
to work on weird environments such as NFS soft mounts.

* Pause and snapshot of live virtual disks (see xmsnap script).

* VHD support. The VHD code in this release has been rigorously
tested, and represents a very mature implementation of the VHD
image
format.

* No more duplication of mechanism with blkback. The blktap kernel
module has changed dramatically from the original blktap. Blkback
is now always used to talk to Xen guests, blktap just presents a
Linux gendisk that blkback can export. This is done while
preserving the zero-copy data path from domU to physical device.

These patches deprecate the old blktap code, which can hopefully be
removed from the tree completely at some point in the future.

Signed-off-by: Jake Wires <jake.wires@citrix.com>
Signed-off-by: Dutch Meyer <dmeyer@cs.ubc.ca>
author Keir Fraser <keir.fraser@citrix.com>
date Tue May 26 11:52:31 2009 +0100 (2009-05-26)
parents
children b7f73a7f3078
line source
1 /*
2 * Copyright (c) 2008, XenSource Inc.
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 * * Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * * Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 * * Neither the name of XenSource Inc. nor the names of its contributors
13 * may be used to endorse or promote products derived from this software
14 * without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
20 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
21 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
22 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
23 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
24 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
25 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
26 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
29 #include <errno.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32 #include <libaio.h>
34 #include "tapdisk.h"
35 #include "tapdisk-log.h"
36 #include "tapdisk-queue.h"
37 #include "tapdisk-filter.h"
38 #include "atomicio.h"
40 #define WARN(_f, _a...) tlog_write(TLOG_WARN, _f, ##_a)
41 #define DBG(_f, _a...) tlog_write(TLOG_DBG, _f, ##_a)
42 #define ERR(_err, _f, _a...) tlog_error(_err, _f, ##_a)
44 /*
45 * We used a kernel patch to return an fd associated with the AIO context
46 * so that we can concurrently poll on synchronous and async descriptors.
47 * This is signalled by passing 1 as the io context to io_setup.
48 */
49 #define REQUEST_ASYNC_FD 1
51 static inline void
52 queue_tiocb(struct tqueue *queue, struct tiocb *tiocb)
53 {
54 struct iocb *iocb = &tiocb->iocb;
56 if (queue->queued) {
57 struct tiocb *prev = (struct tiocb *)
58 queue->iocbs[queue->queued - 1]->data;
59 prev->next = tiocb;
60 }
62 queue->iocbs[queue->queued++] = iocb;
63 }
65 static inline int
66 deferred_tiocbs(struct tqueue *queue)
67 {
68 return (queue->deferred.head != NULL);
69 }
71 static inline void
72 defer_tiocb(struct tqueue *queue, struct tiocb *tiocb)
73 {
74 struct tlist *list = &queue->deferred;
76 if (!list->head)
77 list->head = list->tail = tiocb;
78 else
79 list->tail = list->tail->next = tiocb;
81 queue->tiocbs_deferred++;
82 queue->deferrals++;
83 }
85 static inline void
86 queue_deferred_tiocb(struct tqueue *queue)
87 {
88 struct tlist *list = &queue->deferred;
90 if (list->head) {
91 struct tiocb *tiocb = list->head;
93 list->head = tiocb->next;
94 if (!list->head)
95 list->tail = NULL;
97 queue_tiocb(queue, tiocb);
98 queue->tiocbs_deferred--;
99 }
100 }
102 static inline void
103 queue_deferred_tiocbs(struct tqueue *queue)
104 {
105 while (!tapdisk_queue_full(queue) && deferred_tiocbs(queue))
106 queue_deferred_tiocb(queue);
107 }
109 /*
110 * td_complete may queue more tiocbs
111 */
112 static void
113 complete_tiocb(struct tqueue *queue, struct tiocb *tiocb, unsigned long res)
114 {
115 int err;
116 struct iocb *iocb = &tiocb->iocb;
118 if (res == iocb->u.c.nbytes)
119 err = 0;
120 else if ((int)res < 0)
121 err = (int)res;
122 else
123 err = -EIO;
125 tiocb->cb(tiocb->arg, tiocb, err);
126 }
128 static int
129 cancel_tiocbs(struct tqueue *queue, int err)
130 {
131 int queued;
132 struct tiocb *tiocb;
134 if (!queue->queued)
135 return 0;
137 /*
138 * td_complete may queue more tiocbs, which
139 * will overwrite the contents of queue->iocbs.
140 * use a private linked list to keep track
141 * of the tiocbs we're cancelling.
142 */
143 tiocb = (struct tiocb *)queue->iocbs[0]->data;
144 queued = queue->queued;
145 queue->queued = 0;
147 for (; tiocb != NULL; tiocb = tiocb->next)
148 complete_tiocb(queue, tiocb, err);
150 return queued;
151 }
153 static int
154 fail_tiocbs(struct tqueue *queue, int succeeded, int total, int err)
155 {
156 ERR(err, "io_submit error: %d of %d failed",
157 total - succeeded, total);
159 /* take any non-submitted, merged iocbs
160 * off of the queue, split them, and fail them */
161 queue->queued = io_expand_iocbs(&queue->opioctx,
162 queue->iocbs, succeeded, total);
164 return cancel_tiocbs(queue, err);
165 }
167 static inline ssize_t
168 iocb_rw(struct iocb *iocb)
169 {
170 int fd = iocb->aio_fildes;
171 char *buf = iocb->u.c.buf;
172 long long off = iocb->u.c.offset;
173 size_t size = iocb->u.c.nbytes;
174 ssize_t (*func)(int, void *, size_t) =
175 (iocb->aio_lio_opcode == IO_CMD_PWRITE ? vwrite : read);
177 if (lseek64(fd, off, SEEK_SET) == (off64_t)-1)
178 return -errno;
180 if (atomicio(func, fd, buf, size) != size)
181 return -errno;
183 return size;
184 }
186 static int
187 io_synchronous_rw(struct tqueue *queue)
188 {
189 int i, merged, split;
190 struct iocb *iocb;
191 struct tiocb *tiocb;
192 struct io_event *ep;
194 if (!queue->queued)
195 return 0;
197 tapdisk_filter_iocbs(queue->filter, queue->iocbs, queue->queued);
198 merged = io_merge(&queue->opioctx, queue->iocbs, queue->queued);
200 queue->queued = 0;
202 for (i = 0; i < merged; i++) {
203 ep = queue->aio_events + i;
204 iocb = queue->iocbs[i];
205 ep->obj = iocb;
206 ep->res = iocb_rw(iocb);
207 }
209 split = io_split(&queue->opioctx, queue->aio_events, merged);
210 tapdisk_filter_events(queue->filter, queue->aio_events, split);
212 for (i = split, ep = queue->aio_events; i-- > 0; ep++) {
213 iocb = ep->obj;
214 tiocb = (struct tiocb *)iocb->data;
215 complete_tiocb(queue, tiocb, ep->res);
216 }
218 queue_deferred_tiocbs(queue);
220 return split;
221 }
223 int
224 tapdisk_init_queue(struct tqueue *queue, int size,
225 int sync, struct tfilter *filter)
226 {
227 int i, err;
229 memset(queue, 0, sizeof(struct tqueue));
231 queue->size = size;
232 queue->sync = sync;
233 queue->filter = filter;
235 if (sync) {
236 /* set up a pipe so we can return
237 * a poll fd that won't fire. */
238 if (pipe(queue->dummy_pipe))
239 return -errno;
240 queue->poll_fd = queue->dummy_pipe[0];
241 } else {
242 queue->aio_ctx = (io_context_t)REQUEST_ASYNC_FD;
243 queue->poll_fd = io_setup(size, &queue->aio_ctx);
245 if (queue->poll_fd < 0) {
246 if (queue->poll_fd == -EAGAIN)
247 DPRINTF("Couldn't setup AIO context. If you "
248 "are trying to concurrently use a "
249 "large number of blktap-based disks, "
250 "you may need to increase the "
251 "system-wide aio request limit. "
252 "(e.g. 'echo 1048576 > /proc/sys/fs/"
253 "aio-max-nr')\n");
254 else
255 DPRINTF("Couldn't get fd for AIO poll "
256 "support. This is probably because "
257 "your kernel does not have the "
258 "aio-poll patch applied.\n");
259 return queue->poll_fd;
260 }
261 }
263 err = -ENOMEM;
264 queue->iocbs = calloc(size, sizeof(struct iocb *));
265 queue->aio_events = calloc(size, sizeof(struct io_event));
266 if (!queue->iocbs || !queue->aio_events)
267 goto fail;
269 err = opio_init(&queue->opioctx, size);
270 if (err)
271 goto fail;
273 return 0;
275 fail:
276 tapdisk_free_queue(queue);
277 return err;
278 }
280 void
281 tapdisk_free_queue(struct tqueue *queue)
282 {
283 if (queue->sync) {
284 close(queue->dummy_pipe[0]);
285 close(queue->dummy_pipe[1]);
286 } else
287 io_destroy(queue->aio_ctx);
289 free(queue->iocbs);
290 free(queue->aio_events);
291 opio_free(&queue->opioctx);
292 }
294 void
295 tapdisk_debug_queue(struct tqueue *queue)
296 {
297 struct tiocb *tiocb = queue->deferred.head;
299 WARN("TAPDISK QUEUE:\n");
300 WARN("size: %d, sync: %d, queued: %d, iocbs_pending: %d, "
301 "tiocbs_pending: %d, tiocbs_deferred: %d, deferrals: %"PRIx64"\n",
302 queue->size, queue->sync, queue->queued, queue->iocbs_pending,
303 queue->tiocbs_pending, queue->tiocbs_deferred, queue->deferrals);
305 if (tiocb) {
306 WARN("deferred:\n");
307 for (; tiocb != NULL; tiocb = tiocb->next) {
308 struct iocb *io = &tiocb->iocb;
309 WARN("%s of %lu bytes at %lld\n",
310 (io->aio_lio_opcode == IO_CMD_PWRITE ?
311 "write" : "read"),
312 io->u.c.nbytes, io->u.c.offset);
313 }
314 }
315 }
317 void
318 tapdisk_prep_tiocb(struct tiocb *tiocb, int fd, int rw, char *buf, size_t size,
319 long long offset, td_queue_callback_t cb, void *arg)
320 {
321 struct iocb *iocb = &tiocb->iocb;
323 if (rw)
324 io_prep_pwrite(iocb, fd, buf, size, offset);
325 else
326 io_prep_pread(iocb, fd, buf, size, offset);
328 iocb->data = tiocb;
329 tiocb->cb = cb;
330 tiocb->arg = arg;
331 tiocb->next = NULL;
332 }
334 void
335 tapdisk_queue_tiocb(struct tqueue *queue, struct tiocb *tiocb)
336 {
337 if (!tapdisk_queue_full(queue))
338 queue_tiocb(queue, tiocb);
339 else
340 defer_tiocb(queue, tiocb);
341 }
343 /*
344 * fail_tiocbs may queue more tiocbs
345 */
346 int
347 tapdisk_submit_tiocbs(struct tqueue *queue)
348 {
349 int merged, submitted, err = 0;
351 if (!queue->queued)
352 return 0;
354 if (queue->sync)
355 return io_synchronous_rw(queue);
357 tapdisk_filter_iocbs(queue->filter, queue->iocbs, queue->queued);
358 merged = io_merge(&queue->opioctx, queue->iocbs, queue->queued);
359 submitted = io_submit(queue->aio_ctx, merged, queue->iocbs);
361 DBG("queued: %d, merged: %d, submitted: %d\n",
362 queue->queued, merged, submitted);
364 if (submitted < 0) {
365 err = submitted;
366 submitted = 0;
367 } else if (submitted < merged)
368 err = -EIO;
370 queue->iocbs_pending += submitted;
371 queue->tiocbs_pending += queue->queued;
372 queue->queued = 0;
374 if (err)
375 queue->tiocbs_pending -=
376 fail_tiocbs(queue, submitted, merged, err);
378 return submitted;
379 }
381 int
382 tapdisk_submit_all_tiocbs(struct tqueue *queue)
383 {
384 int submitted = 0;
386 do {
387 submitted += tapdisk_submit_tiocbs(queue);
388 } while (!tapdisk_queue_empty(queue));
390 return submitted;
391 }
393 int
394 tapdisk_complete_tiocbs(struct tqueue *queue)
395 {
396 int i, ret, split;
397 struct iocb *iocb;
398 struct tiocb *tiocb;
399 struct io_event *ep;
401 ret = io_getevents(queue->aio_ctx, 0,
402 queue->size, queue->aio_events, NULL);
403 split = io_split(&queue->opioctx, queue->aio_events, ret);
404 tapdisk_filter_events(queue->filter, queue->aio_events, split);
406 DBG("events: %d, tiocbs: %d\n", ret, split);
408 queue->iocbs_pending -= ret;
409 queue->tiocbs_pending -= split;
411 for (i = split, ep = queue->aio_events; i-- > 0; ep++) {
412 iocb = ep->obj;
413 tiocb = (struct tiocb *)iocb->data;
414 complete_tiocb(queue, tiocb, ep->res);
415 }
417 queue_deferred_tiocbs(queue);
419 return split;
420 }
422 /*
423 * cancel_tiocbs may queue more tiocbs
424 */
425 int
426 tapdisk_cancel_tiocbs(struct tqueue *queue)
427 {
428 return cancel_tiocbs(queue, -EIO);
429 }
431 int
432 tapdisk_cancel_all_tiocbs(struct tqueue *queue)
433 {
434 int cancelled = 0;
436 do {
437 cancelled += tapdisk_cancel_tiocbs(queue);
438 } while (!tapdisk_queue_empty(queue));
440 return cancelled;
441 }