static struct xenstore_domain_interface *xenstore_buf;
static DECLARE_WAIT_QUEUE_HEAD(xb_waitq);
static spinlock_t xb_lock = SPIN_LOCK_UNLOCKED; /* protects xenbus req ring */
-DECLARE_WAIT_QUEUE_HEAD(xenbus_watch_queue);
struct xenbus_event_queue xenbus_default_watch_queue;
struct watch {
static MINIOS_LIST_HEAD(, watch) watches;
struct xenbus_req_info
{
- int in_use:1;
- struct wait_queue_head waitq;
+ struct xenbus_event_queue *reply_queue; /* non-0 iff in use */
+ struct xenbus_event *for_queue;
void *reply;
};
void xenbus_event_queue_init(struct xenbus_event_queue *queue)
{
MINIOS_STAILQ_INIT(&queue->events);
+ init_waitqueue_head(&queue->waitq);
+}
+
+static struct xenbus_event *remove_event(struct xenbus_event_queue *queue)
+{
+ struct xenbus_event *event;
+
+ event = MINIOS_STAILQ_FIRST(&queue->events);
+ if (!event)
+ goto out;
+ MINIOS_STAILQ_REMOVE_HEAD(&queue->events, entry);
+
+ out:
+ return event;
+}
+
+static void queue_event(struct xenbus_event_queue *queue,
+ struct xenbus_event *event)
+{
+ MINIOS_STAILQ_INSERT_TAIL(&queue->events, event, entry);
+ wake_up(&queue->waitq);
+}
+
+static struct xenbus_event *await_event(struct xenbus_event_queue *queue)
+{
+ struct xenbus_event *event;
+ DEFINE_WAIT(w);
+ while (!(event = remove_event(queue))) {
+ add_waiter(w, queue->waitq);
+ schedule();
+ }
+ remove_waiter(w, queue->waitq);
+ return event;
}
char **xenbus_wait_for_watch_return(struct xenbus_event_queue *queue)
{
struct xenbus_event *event;
- DEFINE_WAIT(w);
if (!queue)
queue = &xenbus_default_watch_queue;
- while (!(event = MINIOS_STAILQ_FIRST(&queue->events))) {
- add_waiter(w, xenbus_watch_queue);
- schedule();
- }
- remove_waiter(w, xenbus_watch_queue);
- MINIOS_STAILQ_REMOVE_HEAD(&queue->events, entry);
+ event = await_event(queue);
return &event->path;
}
}
if (events) {
- MINIOS_STAILQ_INSERT_TAIL(&events->events, event, entry);
- wake_up(&xenbus_watch_queue);
+ queue_event(events, event);
} else {
printk("unexpected watch token %s\n", event->token);
free(event);
MASK_XENSTORE_IDX(xenstore_buf->rsp_cons),
msg.len + sizeof(msg));
xenstore_buf->rsp_cons += msg.len + sizeof(msg);
- wake_up(&req_info[msg.req_id].waitq);
+ queue_event(req_info[msg.req_id].reply_queue,
+ req_info[msg.req_id].for_queue);
}
}
}
/* Release a xenbus identifier */
static void release_xenbus_id(int id)
{
- BUG_ON(!req_info[id].in_use);
+ BUG_ON(!req_info[id].reply_queue);
spin_lock(&req_lock);
- req_info[id].in_use = 0;
+ req_info[id].reply_queue = 0;
nr_live_reqs--;
- req_info[id].in_use = 0;
if (nr_live_reqs == NR_REQS - 1)
wake_up(&req_wq);
spin_unlock(&req_lock);
/* Allocate an identifier for a xenbus request. Blocks if none are
available. */
-static int allocate_xenbus_id(void)
+static int allocate_xenbus_id(struct xenbus_event_queue *reply_queue,
+ struct xenbus_event *for_queue)
{
static int probe;
int o_probe;
o_probe = probe;
for (;;)
{
- if (!req_info[o_probe].in_use)
+ if (!req_info[o_probe].reply_queue)
break;
o_probe = (o_probe + 1) % NR_REQS;
BUG_ON(o_probe == probe);
}
nr_live_reqs++;
- req_info[o_probe].in_use = 1;
+ req_info[o_probe].reply_queue = reply_queue;
+ req_info[o_probe].for_queue = for_queue;
probe = (o_probe + 1) % NR_REQS;
spin_unlock(&req_lock);
- init_waitqueue_head(&req_info[o_probe].waitq);
return o_probe;
}
int nr_reqs)
{
int id;
- DEFINE_WAIT(w);
struct xsd_sockmsg *rep;
+ struct xenbus_event_queue queue;
+ struct xenbus_event event_buf;
- /*
- * XXX: should use a predicate loop instead of blindly trusting
- * that $someone didn't wake us up
- */
+ xenbus_event_queue_init(&queue);
- id = allocate_xenbus_id();
- add_waiter(w, req_info[id].waitq);
+ id = allocate_xenbus_id(&queue,&event_buf);
xb_write(type, id, trans, io, nr_reqs);
- schedule();
- remove_waiter(w, req_info[id].waitq);
- wake(current);
+ struct xenbus_event *event = await_event(&queue);
+ BUG_ON(event != &event_buf);
rep = req_info[id].reply;
BUG_ON(rep->req_id != id);