td_stream_req_t reqs[TD_STREAM_MAX_REQS];
td_stream_req_t *free[TD_STREAM_MAX_REQS];
int n_free;
-
- int poll_fd;
- event_id_t poll_id;
};
static unsigned int tapdisk_stream_count;
static void tapdisk_stream_close_image(td_stream_t *);
+static void tapdisk_stream_queue_requests(td_stream_t *);
static void
usage(const char *app, int err)
return;
}
- tapdisk_server_mask_event(s->poll_id, 0);
+ tapdisk_stream_queue_requests(s);
}
static void
td_stream_req_t *req;
req = tapdisk_stream_alloc_req(s);
- if (!req) {
- tapdisk_server_mask_event(s->poll_id, 1);
+ if (!req)
break;
- }
tapdisk_stream_queue_request(s, req);
}
__tapdisk_stream_event_cb(event_id_t id, char mode, void *arg)
{
td_stream_t *s = arg;
- tapdisk_stream_queue_requests(s);
-}
-
-static void
-tapdisk_stream_destroy_event(struct tapdisk_stream *s)
-{
- if (s->poll_id >= 0) {
- tapdisk_server_unregister_event(s->poll_id);
- s->poll_id = 0;
- }
-
- if (s->poll_fd >= 0) {
- close(s->poll_fd);
- s->poll_fd = -1;
- }
-}
-
-static int
-tapdisk_stream_create_event(td_stream_t *s)
-{
- int fd[2];
- int err;
-
- s->poll_id = -1;
- s->poll_fd = -1;
-
- err = pipe(fd);
- if (err) {
- err = -errno;
- goto fail;
- }
-
- s->poll_fd = fd[0];
-
- err = close(fd[1]);
- BUG_ON(err);
-
- s->poll_id =
- tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
- s->poll_fd, 0,
- __tapdisk_stream_event_cb, s);
- if (s->poll_id < 0)
- goto fail;
-
- return 0;
-
-fail:
- if (err)
- fprintf(stderr, "failed to register event: %d\n", err);
-
- tapdisk_stream_destroy_event(s);
-
- return err;
}
static int
static void
tapdisk_stream_close(struct tapdisk_stream *s)
{
- tapdisk_stream_destroy_event(s);
-
tapdisk_stream_destroy_reqs(s);
tapdisk_stream_close_image(s);
err = tapdisk_stream_set_position(s, count, skip);
if (!err)
err = tapdisk_stream_create_reqs(s);
- if (!err)
- err = tapdisk_stream_create_event(s);
if (err)
tapdisk_stream_close(s);