ia64/xen-unstable

changeset 7293:8016551fde98

Refactor xenbus to break up the xenbus_lock and permit watches
to fire concurrently with request/reply pairs. Remove
watch_ack message: no longer needed.

Signed-off-by: Keir Fraser <keir@xensource.com>
author kaf24@firebug.cl.cam.ac.uk
date Sun Oct 09 18:52:54 2005 +0100 (2005-10-09)
parents ab93a9a46bd4
children 5df423407700
files linux-2.6-xen-sparse/arch/xen/i386/kernel/smpboot.c linux-2.6-xen-sparse/arch/xen/kernel/reboot.c linux-2.6-xen-sparse/drivers/xen/balloon/balloon.c linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_comms.c linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_dev.c linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_probe.c linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c linux-2.6-xen-sparse/include/asm-xen/xenbus.h tools/blktap/xenbus.c tools/console/daemon/io.c tools/python/xen/lowlevel/xs/xs.c tools/python/xen/xend/xenstore/xswatch.py tools/xenstore/testsuite/07watch.test tools/xenstore/testsuite/08transaction.test tools/xenstore/testsuite/10domain-homedir.test tools/xenstore/testsuite/11domain-watch.test tools/xenstore/testsuite/12readonly.test tools/xenstore/testsuite/13watch-ack.test tools/xenstore/xenstored_core.c tools/xenstore/xenstored_core.h tools/xenstore/xenstored_watch.c tools/xenstore/xs.c tools/xenstore/xs.h tools/xenstore/xs_test.c xen/include/public/io/xs_wire.h
line diff
     1.1 --- a/linux-2.6-xen-sparse/arch/xen/i386/kernel/smpboot.c	Sun Oct 09 12:29:24 2005 -0400
     1.2 +++ b/linux-2.6-xen-sparse/arch/xen/i386/kernel/smpboot.c	Sun Oct 09 18:52:54 2005 +0100
     1.3 @@ -1327,18 +1327,14 @@ static struct xenbus_watch cpu_watch = {
     1.4  	.callback = handle_vcpu_hotplug_event
     1.5  };
     1.6  
     1.7 -/* NB: Assumes xenbus_lock is held! */
     1.8  static int setup_cpu_watcher(struct notifier_block *notifier,
     1.9  			      unsigned long event, void *data)
    1.10  {
    1.11 -	int err = 0;
    1.12 +	int err;
    1.13  
    1.14 -	BUG_ON(down_trylock(&xenbus_lock) == 0);
    1.15  	err = register_xenbus_watch(&cpu_watch);
    1.16 -
    1.17 -	if (err) {
    1.18 +	if (err)
    1.19  		printk("Failed to register watch on /cpu\n");
    1.20 -	}
    1.21  
    1.22  	return NOTIFY_DONE;
    1.23  }
     2.1 --- a/linux-2.6-xen-sparse/arch/xen/kernel/reboot.c	Sun Oct 09 12:29:24 2005 -0400
     2.2 +++ b/linux-2.6-xen-sparse/arch/xen/kernel/reboot.c	Sun Oct 09 18:52:54 2005 +0100
     2.3 @@ -360,9 +360,6 @@ static struct xenbus_watch sysrq_watch =
     2.4  
     2.5  static struct notifier_block xenstore_notifier;
     2.6  
     2.7 -/* Setup our watcher
     2.8 -   NB: Assumes xenbus_lock is held!
     2.9 -*/
    2.10  static int setup_shutdown_watcher(struct notifier_block *notifier,
    2.11                                    unsigned long event,
    2.12                                    void *data)
    2.13 @@ -372,8 +369,6 @@ static int setup_shutdown_watcher(struct
    2.14  	int err2 = 0;
    2.15  #endif
    2.16  
    2.17 -	BUG_ON(down_trylock(&xenbus_lock) == 0);
    2.18 -
    2.19  	err1 = register_xenbus_watch(&shutdown_watch);
    2.20  #ifdef CONFIG_MAGIC_SYSRQ
    2.21  	err2 = register_xenbus_watch(&sysrq_watch);
     3.1 --- a/linux-2.6-xen-sparse/drivers/xen/balloon/balloon.c	Sun Oct 09 12:29:24 2005 -0400
     3.2 +++ b/linux-2.6-xen-sparse/drivers/xen/balloon/balloon.c	Sun Oct 09 18:52:54 2005 +0100
     3.3 @@ -370,17 +370,12 @@ static void watch_target(struct xenbus_w
     3.4      
     3.5  }
     3.6  
     3.7 -/* Setup our watcher
     3.8 -   NB: Assumes xenbus_lock is held!
     3.9 -*/
    3.10  int balloon_init_watcher(struct notifier_block *notifier,
    3.11                           unsigned long event,
    3.12                           void *data)
    3.13  {
    3.14  	int err;
    3.15  
    3.16 -	BUG_ON(down_trylock(&xenbus_lock) == 0);
    3.17 -
    3.18  	err = register_xenbus_watch(&target_watch);
    3.19  	if (err)
    3.20  		printk(KERN_ERR "Failed to set balloon watcher\n");
     4.1 --- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_comms.c	Sun Oct 09 12:29:24 2005 -0400
     4.2 +++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_comms.c	Sun Oct 09 18:52:54 2005 +0100
     4.3 @@ -130,15 +130,10 @@ int xb_write(const void *data, unsigned 
     4.4  
     4.5  		wait_event(xb_waitq, output_avail(out));
     4.6  
     4.7 -		/* Read, then check: not that we don't trust store.
     4.8 -		 * Hell, some of my best friends are daemons.  But,
     4.9 -		 * in this post-911 world... */
    4.10 +		mb();
    4.11  		h = *out;
    4.12 -		mb();
    4.13 -		if (!check_buffer(&h)) {
    4.14 -			set_current_state(TASK_RUNNING);
    4.15 -			return -EIO; /* ETERRORIST! */
    4.16 -		}
    4.17 +		if (!check_buffer(&h))
    4.18 +			return -EIO;
    4.19  
    4.20  		dst = get_output_chunk(&h, out->buf, &avail);
    4.21  		if (avail > len)
    4.22 @@ -173,12 +168,11 @@ int xb_read(void *data, unsigned len)
    4.23  		const char *src;
    4.24  
    4.25  		wait_event(xb_waitq, xs_input_avail());
    4.26 -		h = *in;
    4.27 +
    4.28  		mb();
    4.29 -		if (!check_buffer(&h)) {
    4.30 -			set_current_state(TASK_RUNNING);
    4.31 +		h = *in;
    4.32 +		if (!check_buffer(&h))
    4.33  			return -EIO;
    4.34 -		}
    4.35  
    4.36  		src = get_input_chunk(&h, in->buf, &avail);
    4.37  		if (avail > len)
    4.38 @@ -195,10 +189,6 @@ int xb_read(void *data, unsigned len)
    4.39  			notify_remote_via_evtchn(xen_start_info->store_evtchn);
    4.40  	}
    4.41  
    4.42 -	/* If we left something, wake watch thread to deal with it. */
    4.43 -	if (xs_input_avail())
    4.44 -		wake_up(&xb_waitq);
    4.45 -
    4.46  	return 0;
    4.47  }
    4.48  
     5.1 --- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_dev.c	Sun Oct 09 12:29:24 2005 -0400
     5.2 +++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_dev.c	Sun Oct 09 18:52:54 2005 +0100
     5.3 @@ -46,85 +46,113 @@
     5.4  #include <asm/hypervisor.h>
     5.5  
     5.6  struct xenbus_dev_data {
     5.7 -	/* Are there bytes left to be read in this message? */
     5.8 -	int bytes_left;
     5.9 -	/* Are we still waiting for the reply to a message we wrote? */
    5.10 -	int awaiting_reply;
    5.11 -	/* Buffer for outgoing messages. */
    5.12 +	int in_transaction;
    5.13 +
    5.14 +	/* Partial request. */
    5.15  	unsigned int len;
    5.16  	union {
    5.17  		struct xsd_sockmsg msg;
    5.18  		char buffer[PAGE_SIZE];
    5.19  	} u;
    5.20 +
    5.21 +	/* Response queue. */
    5.22 +#define MASK_READ_IDX(idx) ((idx)&(PAGE_SIZE-1))
    5.23 +	char read_buffer[PAGE_SIZE];
    5.24 +	unsigned int read_cons, read_prod;
    5.25 +	wait_queue_head_t read_waitq;
    5.26  };
    5.27  
    5.28  static struct proc_dir_entry *xenbus_dev_intf;
    5.29  
    5.30 -/* Reply can be long (dir, getperm): don't buffer, just examine
    5.31 - * headers so we can discard rest if they die. */
    5.32  static ssize_t xenbus_dev_read(struct file *filp,
    5.33  			       char __user *ubuf,
    5.34  			       size_t len, loff_t *ppos)
    5.35  {
    5.36 -	struct xenbus_dev_data *data = filp->private_data;
    5.37 -	struct xsd_sockmsg msg;
    5.38 -	int err;
    5.39 -
    5.40 -	/* Refill empty buffer? */
    5.41 -	if (data->bytes_left == 0) {
    5.42 -		if (len < sizeof(msg))
    5.43 -			return -EINVAL;
    5.44 +	struct xenbus_dev_data *u = filp->private_data;
    5.45 +	int i;
    5.46  
    5.47 -		err = xb_read(&msg, sizeof(msg));
    5.48 -		if (err)
    5.49 -			return err;
    5.50 -		data->bytes_left = msg.len;
    5.51 -		if (ubuf && copy_to_user(ubuf, &msg, sizeof(msg)) != 0)
    5.52 -			return -EFAULT;
    5.53 -		/* We can receive spurious XS_WATCH_EVENT messages. */
    5.54 -		if (msg.type != XS_WATCH_EVENT)
    5.55 -			data->awaiting_reply = 0;
    5.56 -		return sizeof(msg);
    5.57 +	if (wait_event_interruptible(u->read_waitq,
    5.58 +				     u->read_prod != u->read_cons))
    5.59 +		return -EINTR;
    5.60 +
    5.61 +	for (i = 0; i < len; i++) {
    5.62 +		if (u->read_cons == u->read_prod)
    5.63 +			break;
    5.64 +		put_user(u->read_buffer[MASK_READ_IDX(u->read_cons)], ubuf+i);
    5.65 +		u->read_cons++;
    5.66  	}
    5.67  
    5.68 -	/* Don't read over next header, or over temporary buffer. */
    5.69 -	if (len > sizeof(data->u.buffer))
    5.70 -		len = sizeof(data->u.buffer);
    5.71 -	if (len > data->bytes_left)
    5.72 -		len = data->bytes_left;
    5.73 -
    5.74 -	err = xb_read(data->u.buffer, len);
    5.75 -	if (err)
    5.76 -		return err;
    5.77 -
    5.78 -	data->bytes_left -= len;
    5.79 -	if (ubuf && copy_to_user(ubuf, data->u.buffer, len) != 0)
    5.80 -		return -EFAULT;
    5.81 -	return len;
    5.82 +	return i;
    5.83  }
    5.84  
    5.85 -/* We do v. basic sanity checking so they don't screw up kernel later. */
    5.86 +static void queue_reply(struct xenbus_dev_data *u,
    5.87 +			char *data, unsigned int len)
    5.88 +{
    5.89 +	int i;
    5.90 +
    5.91 +	for (i = 0; i < len; i++, u->read_prod++)
    5.92 +		u->read_buffer[MASK_READ_IDX(u->read_prod)] = data[i];
    5.93 +
    5.94 +	BUG_ON((u->read_prod - u->read_cons) > sizeof(u->read_buffer));
    5.95 +
    5.96 +	wake_up(&u->read_waitq);
    5.97 +}
    5.98 +
    5.99  static ssize_t xenbus_dev_write(struct file *filp,
   5.100  				const char __user *ubuf,
   5.101  				size_t len, loff_t *ppos)
   5.102  {
   5.103 -	struct xenbus_dev_data *data = filp->private_data;
   5.104 -	int err;
   5.105 +	struct xenbus_dev_data *u = filp->private_data;
   5.106 +	void *reply;
   5.107 +	int err = 0;
   5.108  
   5.109 -	/* We gather data in buffer until we're ready to send it. */
   5.110 -	if (len > data->len + sizeof(data->u))
   5.111 +	if ((len + u->len) > sizeof(u->u.buffer))
   5.112  		return -EINVAL;
   5.113 -	if (copy_from_user(data->u.buffer + data->len, ubuf, len) != 0)
   5.114 +
   5.115 +	if (copy_from_user(u->u.buffer + u->len, ubuf, len) != 0)
   5.116  		return -EFAULT;
   5.117 -	data->len += len;
   5.118 -	if (data->len >= sizeof(data->u.msg) + data->u.msg.len) {
   5.119 -		err = xb_write(data->u.buffer, data->len);
   5.120 -		if (err)
   5.121 -			return err;
   5.122 -		data->len = 0;
   5.123 -		data->awaiting_reply = 1;
   5.124 +
   5.125 +	u->len += len;
   5.126 +	if (u->len < (sizeof(u->u.msg) + u->u.msg.len))
   5.127 +		return len;
   5.128 +
   5.129 +	switch (u->u.msg.type) {
   5.130 +	case XS_TRANSACTION_START:
   5.131 +	case XS_TRANSACTION_END:
   5.132 +	case XS_DIRECTORY:
   5.133 +	case XS_READ:
   5.134 +	case XS_GET_PERMS:
   5.135 +	case XS_RELEASE:
   5.136 +	case XS_GET_DOMAIN_PATH:
   5.137 +	case XS_WRITE:
   5.138 +	case XS_MKDIR:
   5.139 +	case XS_RM:
   5.140 +	case XS_SET_PERMS:
   5.141 +		reply = xenbus_dev_request_and_reply(&u->u.msg);
   5.142 +		if (IS_ERR(reply))
   5.143 +			err = PTR_ERR(reply);
   5.144 +		else {
   5.145 +			if (u->u.msg.type == XS_TRANSACTION_START)
   5.146 +				u->in_transaction = 1;
   5.147 +			if (u->u.msg.type == XS_TRANSACTION_END)
   5.148 +				u->in_transaction = 0;
   5.149 +			queue_reply(u, (char *)&u->u.msg, sizeof(u->u.msg));
   5.150 +			queue_reply(u, (char *)reply, u->u.msg.len);
   5.151 +			kfree(reply);
   5.152 +		}
   5.153 +		break;
   5.154 +
   5.155 +	default:
   5.156 +		err = -EINVAL;
   5.157 +		break;
   5.158  	}
   5.159 -	return len;
   5.160 +
   5.161 +	if (err == 0) {
   5.162 +		u->len = 0;
   5.163 +		err = len;
   5.164 +	}
   5.165 +
   5.166 +	return err;
   5.167  }
   5.168  
   5.169  static int xenbus_dev_open(struct inode *inode, struct file *filp)
   5.170 @@ -134,7 +162,6 @@ static int xenbus_dev_open(struct inode 
   5.171  	if (xen_start_info->store_evtchn == 0)
   5.172  		return -ENOENT;
   5.173  
   5.174 -	/* Don't try seeking. */
   5.175  	nonseekable_open(inode, filp);
   5.176  
   5.177  	u = kmalloc(sizeof(*u), GFP_KERNEL);
   5.178 @@ -142,28 +169,21 @@ static int xenbus_dev_open(struct inode 
   5.179  		return -ENOMEM;
   5.180  
   5.181  	memset(u, 0, sizeof(*u));
   5.182 +	init_waitqueue_head(&u->read_waitq);
   5.183  
   5.184  	filp->private_data = u;
   5.185  
   5.186 -	down(&xenbus_lock);
   5.187 -
   5.188  	return 0;
   5.189  }
   5.190  
   5.191  static int xenbus_dev_release(struct inode *inode, struct file *filp)
   5.192  {
   5.193 -	struct xenbus_dev_data *data = filp->private_data;
   5.194 -
   5.195 -	/* Discard any unread replies. */
   5.196 -	while (data->bytes_left || data->awaiting_reply)
   5.197 -		xenbus_dev_read(filp, NULL, sizeof(data->u.buffer), NULL);
   5.198 +	struct xenbus_dev_data *u = filp->private_data;
   5.199  
   5.200 -	/* Harmless if no transaction in progress. */
   5.201 -	xenbus_transaction_end(1);
   5.202 +	if (u->in_transaction)
   5.203 +		xenbus_transaction_end(1);
   5.204  
   5.205 -	up(&xenbus_lock);
   5.206 -
   5.207 -	kfree(data);
   5.208 +	kfree(u);
   5.209  
   5.210  	return 0;
   5.211  }
     6.1 --- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_probe.c	Sun Oct 09 12:29:24 2005 -0400
     6.2 +++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_probe.c	Sun Oct 09 18:52:54 2005 +0100
     6.3 @@ -43,6 +43,9 @@
     6.4  
     6.5  static struct notifier_block *xenstore_chain;
     6.6  
     6.7 +/* Now used to protect xenbus probes against save/restore. */
     6.8 +static DECLARE_MUTEX(xenbus_lock);
     6.9 +
    6.10  /* If something in array of ids matches this device, return it. */
    6.11  static const struct xenbus_device_id *
    6.12  match_device(const struct xenbus_device_id *arr, struct xenbus_device *dev)
    6.13 @@ -625,12 +628,13 @@ void xenbus_suspend(void)
    6.14  	down(&xenbus_lock);
    6.15  	bus_for_each_dev(&xenbus_frontend.bus, NULL, NULL, suspend_dev);
    6.16  	bus_for_each_dev(&xenbus_backend.bus, NULL, NULL, suspend_dev);
    6.17 +	xs_suspend();
    6.18  }
    6.19  
    6.20  void xenbus_resume(void)
    6.21  {
    6.22  	xb_init_comms();
    6.23 -	reregister_xenbus_watches();
    6.24 +	xs_resume();
    6.25  	bus_for_each_dev(&xenbus_frontend.bus, NULL, NULL, resume_dev);
    6.26  	bus_for_each_dev(&xenbus_backend.bus, NULL, NULL, resume_dev);
    6.27  	up(&xenbus_lock);
    6.28 @@ -685,6 +689,7 @@ int do_xenbus_probe(void *unused)
    6.29  	/* Notify others that xenstore is up */
    6.30  	notifier_call_chain(&xenstore_chain, 0, 0);
    6.31  	up(&xenbus_lock);
    6.32 +
    6.33  	return 0;
    6.34  }
    6.35  
     7.1 --- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c	Sun Oct 09 12:29:24 2005 -0400
     7.2 +++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c	Sun Oct 09 18:52:54 2005 +0100
     7.3 @@ -42,11 +42,67 @@
     7.4  
     7.5  #define streq(a, b) (strcmp((a), (b)) == 0)
     7.6  
     7.7 -static char printf_buffer[4096];
     7.8 -static LIST_HEAD(watches);
     7.9 +struct xs_stored_msg {
    7.10 +	struct xsd_sockmsg hdr;
    7.11 +
    7.12 +	union {
    7.13 +		/* Stored replies. */
    7.14 +		struct {
    7.15 +			struct list_head list;
    7.16 +			char *body;
    7.17 +		} reply;
    7.18 +
    7.19 +		/* Queued watch callbacks. */
    7.20 +		struct {
    7.21 +			struct work_struct work;
    7.22 +			struct xenbus_watch *handle;
    7.23 +			char **vec;
    7.24 +			unsigned int vec_size;
    7.25 +		} watch;
    7.26 +	} u;
    7.27 +};
    7.28 +
    7.29 +struct xs_handle {
    7.30 +	/* A list of replies. Currently only one will ever be outstanding. */
    7.31 +	struct list_head reply_list;
    7.32 +	spinlock_t reply_lock;
    7.33 +	wait_queue_head_t reply_waitq;
    7.34 +
    7.35 +	/* One request at a time. */
    7.36 +	struct semaphore request_mutex;
    7.37  
    7.38 -DECLARE_MUTEX(xenbus_lock);
    7.39 -EXPORT_SYMBOL(xenbus_lock);
    7.40 +	/* One transaction at a time. */
    7.41 +	struct semaphore transaction_mutex;
    7.42 +	int transaction_pid;
    7.43 +};
    7.44 +
    7.45 +static struct xs_handle xs_state;
    7.46 +
    7.47 +static LIST_HEAD(watches);
    7.48 +static DEFINE_SPINLOCK(watches_lock);
    7.49 +
    7.50 +/* Can wait on !xs_resuming for suspend/resume cycle to complete. */
    7.51 +static int xs_resuming;
    7.52 +static DECLARE_WAIT_QUEUE_HEAD(xs_resuming_waitq);
    7.53 +
    7.54 +static void request_mutex_acquire(void)
    7.55 +{
    7.56 +	/*
    7.57 +	 * We can't distinguish non-transactional from transactional
    7.58 +	 * requests right now. So temporarily acquire the transaction mutex
    7.59 +	 * if this task is outside transaction context.
    7.60 + 	 */
    7.61 +	if (xs_state.transaction_pid != current->pid)
    7.62 +		down(&xs_state.transaction_mutex);
    7.63 +	down(&xs_state.request_mutex);
    7.64 +}
    7.65 +
    7.66 +static void request_mutex_release(void)
    7.67 +{
    7.68 +	up(&xs_state.request_mutex);
    7.69 +	if (xs_state.transaction_pid != current->pid)
    7.70 +		up(&xs_state.transaction_mutex);
    7.71 +}
    7.72  
    7.73  static int get_error(const char *errorstring)
    7.74  {
    7.75 @@ -65,29 +121,32 @@ static int get_error(const char *errorst
    7.76  
    7.77  static void *read_reply(enum xsd_sockmsg_type *type, unsigned int *len)
    7.78  {
    7.79 -	struct xsd_sockmsg msg;
    7.80 -	void *ret;
    7.81 -	int err;
    7.82 +	struct xs_stored_msg *msg;
    7.83 +	char *body;
    7.84 +
    7.85 +	spin_lock(&xs_state.reply_lock);
    7.86  
    7.87 -	err = xb_read(&msg, sizeof(msg));
    7.88 -	if (err)
    7.89 -		return ERR_PTR(err);
    7.90 -
    7.91 -	ret = kmalloc(msg.len + 1, GFP_KERNEL);
    7.92 -	if (!ret)
    7.93 -		return ERR_PTR(-ENOMEM);
    7.94 +	while (list_empty(&xs_state.reply_list)) {
    7.95 +		spin_unlock(&xs_state.reply_lock);
    7.96 +		wait_event(xs_state.reply_waitq,
    7.97 +			   !list_empty(&xs_state.reply_list));
    7.98 +		spin_lock(&xs_state.reply_lock);
    7.99 +	}
   7.100  
   7.101 -	err = xb_read(ret, msg.len);
   7.102 -	if (err) {
   7.103 -		kfree(ret);
   7.104 -		return ERR_PTR(err);
   7.105 -	}
   7.106 -	((char*)ret)[msg.len] = '\0';
   7.107 +	msg = list_entry(xs_state.reply_list.next,
   7.108 +			 struct xs_stored_msg, u.reply.list);
   7.109 +	list_del(&msg->u.reply.list);
   7.110 +
   7.111 +	spin_unlock(&xs_state.reply_lock);
   7.112  
   7.113 -	*type = msg.type;
   7.114 +	*type = msg->hdr.type;
   7.115  	if (len)
   7.116 -		*len = msg.len;
   7.117 -	return ret;
   7.118 +		*len = msg->hdr.len;
   7.119 +	body = msg->u.reply.body;
   7.120 +
   7.121 +	kfree(msg);
   7.122 +
   7.123 +	return body;
   7.124  }
   7.125  
   7.126  /* Emergency write. */
   7.127 @@ -98,10 +157,45 @@ void xenbus_debug_write(const char *str,
   7.128  	msg.type = XS_DEBUG;
   7.129  	msg.len = sizeof("print") + count + 1;
   7.130  
   7.131 +	request_mutex_acquire();
   7.132  	xb_write(&msg, sizeof(msg));
   7.133  	xb_write("print", sizeof("print"));
   7.134  	xb_write(str, count);
   7.135  	xb_write("", 1);
   7.136 +	request_mutex_release();
   7.137 +}
   7.138 +
   7.139 +void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg)
   7.140 +{
   7.141 +	void *ret;
   7.142 +	struct xsd_sockmsg req_msg = *msg;
   7.143 +	int err;
   7.144 +
   7.145 +	if (req_msg.type == XS_TRANSACTION_START) {
   7.146 +		down(&xs_state.transaction_mutex);
   7.147 +		xs_state.transaction_pid = current->pid;
   7.148 +	}
   7.149 +
   7.150 +	request_mutex_acquire();
   7.151 +
   7.152 +	err = xb_write(msg, sizeof(*msg) + msg->len);
   7.153 +	if (err) {
   7.154 +		msg->type = XS_ERROR;
   7.155 +		ret = ERR_PTR(err);
   7.156 +	} else {
   7.157 +		ret = read_reply(&msg->type, &msg->len);
   7.158 +	}
   7.159 +
   7.160 +	request_mutex_release();
   7.161 +
   7.162 +	if ((msg->type == XS_TRANSACTION_END) ||
   7.163 +	    ((req_msg.type == XS_TRANSACTION_START) &&
   7.164 +	     (msg->type == XS_ERROR))) {
   7.165 +		xs_state.transaction_pid = -1;
   7.166 +		up(&xs_state.transaction_mutex);
   7.167 +	}
   7.168 +
   7.169 +	return ret;
   7.170  }
   7.171  
   7.172  /* Send message to xs, get kmalloc'ed reply.  ERR_PTR() on error. */
   7.173 @@ -115,31 +209,33 @@ static void *xs_talkv(enum xsd_sockmsg_t
   7.174  	unsigned int i;
   7.175  	int err;
   7.176  
   7.177 -	WARN_ON(down_trylock(&xenbus_lock) == 0);
   7.178 -
   7.179  	msg.type = type;
   7.180  	msg.len = 0;
   7.181  	for (i = 0; i < num_vecs; i++)
   7.182  		msg.len += iovec[i].iov_len;
   7.183  
   7.184 +	request_mutex_acquire();
   7.185 +
   7.186  	err = xb_write(&msg, sizeof(msg));
   7.187 -	if (err)
   7.188 +	if (err) {
   7.189 +		up(&xs_state.request_mutex);
   7.190  		return ERR_PTR(err);
   7.191 +	}
   7.192  
   7.193  	for (i = 0; i < num_vecs; i++) {
   7.194  		err = xb_write(iovec[i].iov_base, iovec[i].iov_len);;
   7.195 -		if (err)
   7.196 +		if (err) {
   7.197 +			request_mutex_release();
   7.198  			return ERR_PTR(err);
   7.199 +		}
   7.200  	}
   7.201  
   7.202 -	/* Watches can have fired before reply comes: daemon detects
   7.203 -	 * and re-transmits, so we can ignore this. */
   7.204 -	do {
   7.205 -		kfree(ret);
   7.206 -		ret = read_reply(&msg.type, len);
   7.207 -		if (IS_ERR(ret))
   7.208 -			return ret;
   7.209 -	} while (msg.type == XS_WATCH_EVENT);
   7.210 +	ret = read_reply(&msg.type, len);
   7.211 +
   7.212 +	request_mutex_release();
   7.213 +
   7.214 +	if (IS_ERR(ret))
   7.215 +		return ret;
   7.216  
   7.217  	if (msg.type == XS_ERROR) {
   7.218  		err = get_error(ret);
   7.219 @@ -187,8 +283,6 @@ static char *join(const char *dir, const
   7.220  {
   7.221  	static char buffer[4096];
   7.222  
   7.223 -	BUG_ON(down_trylock(&xenbus_lock) == 0);
   7.224 -	/* XXX FIXME: might not be correct if name == "" */
   7.225  	BUG_ON(strlen(dir) + strlen("/") + strlen(name) + 1 > sizeof(buffer));
   7.226  
   7.227  	strcpy(buffer, dir);
   7.228 @@ -207,7 +301,7 @@ static char **split(char *strings, unsig
   7.229  	*num = count_strings(strings, len);
   7.230  
   7.231  	/* Transfer to one big alloc for easy freeing. */
   7.232 -	ret = kmalloc(*num * sizeof(char *) + len, GFP_ATOMIC);
   7.233 +	ret = kmalloc(*num * sizeof(char *) + len, GFP_KERNEL);
   7.234  	if (!ret) {
   7.235  		kfree(strings);
   7.236  		return ERR_PTR(-ENOMEM);
   7.237 @@ -298,7 +392,18 @@ EXPORT_SYMBOL(xenbus_rm);
   7.238   */
   7.239  int xenbus_transaction_start(void)
   7.240  {
   7.241 -	return xs_error(xs_single(XS_TRANSACTION_START, "", NULL));
   7.242 +	int err;
   7.243 +
   7.244 +	down(&xs_state.transaction_mutex);
   7.245 +	xs_state.transaction_pid = current->pid;
   7.246 +
   7.247 +	err = xs_error(xs_single(XS_TRANSACTION_START, "", NULL));
   7.248 +	if (err) {
   7.249 +		xs_state.transaction_pid = -1;
   7.250 +		up(&xs_state.transaction_mutex);
   7.251 +	}
   7.252 +
   7.253 +	return err;
   7.254  }
   7.255  EXPORT_SYMBOL(xenbus_transaction_start);
   7.256  
   7.257 @@ -308,12 +413,19 @@ EXPORT_SYMBOL(xenbus_transaction_start);
   7.258  int xenbus_transaction_end(int abort)
   7.259  {
   7.260  	char abortstr[2];
   7.261 +	int err;
   7.262  
   7.263  	if (abort)
   7.264  		strcpy(abortstr, "F");
   7.265  	else
   7.266  		strcpy(abortstr, "T");
   7.267 -	return xs_error(xs_single(XS_TRANSACTION_END, abortstr, NULL));
   7.268 +
   7.269 +	err = xs_error(xs_single(XS_TRANSACTION_END, abortstr, NULL));
   7.270 +
   7.271 +	xs_state.transaction_pid = -1;
   7.272 +	up(&xs_state.transaction_mutex);
   7.273 +
   7.274 +	return err;
   7.275  }
   7.276  EXPORT_SYMBOL(xenbus_transaction_end);
   7.277  
   7.278 @@ -344,14 +456,23 @@ int xenbus_printf(const char *dir, const
   7.279  {
   7.280  	va_list ap;
   7.281  	int ret;
   7.282 +#define PRINTF_BUFFER_SIZE 4096
   7.283 +	char *printf_buffer;
   7.284  
   7.285 -	BUG_ON(down_trylock(&xenbus_lock) == 0);
   7.286 +	printf_buffer = kmalloc(PRINTF_BUFFER_SIZE, GFP_KERNEL);
   7.287 +	if (printf_buffer == NULL)
   7.288 +		return -ENOMEM;
   7.289 +
   7.290  	va_start(ap, fmt);
   7.291 -	ret = vsnprintf(printf_buffer, sizeof(printf_buffer), fmt, ap);
   7.292 +	ret = vsnprintf(printf_buffer, PRINTF_BUFFER_SIZE, fmt, ap);
   7.293  	va_end(ap);
   7.294  
   7.295 -	BUG_ON(ret > sizeof(printf_buffer)-1);
   7.296 -	return xenbus_write(dir, node, printf_buffer);
   7.297 +	BUG_ON(ret > PRINTF_BUFFER_SIZE-1);
   7.298 +	ret = xenbus_write(dir, node, printf_buffer);
   7.299 +
   7.300 +	kfree(printf_buffer);
   7.301 +
   7.302 +	return ret;
   7.303  }
   7.304  EXPORT_SYMBOL(xenbus_printf);
   7.305  
   7.306 @@ -361,19 +482,28 @@ void xenbus_dev_error(struct xenbus_devi
   7.307  	va_list ap;
   7.308  	int ret;
   7.309  	unsigned int len;
   7.310 +	char *printf_buffer;
   7.311  
   7.312 -	BUG_ON(down_trylock(&xenbus_lock) == 0);
   7.313 +	printf_buffer = kmalloc(PRINTF_BUFFER_SIZE, GFP_KERNEL);
   7.314 +	if (printf_buffer == NULL)
   7.315 +		goto fail;
   7.316  
   7.317  	len = sprintf(printf_buffer, "%i ", -err);
   7.318  	va_start(ap, fmt);
   7.319 -	ret = vsnprintf(printf_buffer+len, sizeof(printf_buffer)-len, fmt, ap);
   7.320 +	ret = vsnprintf(printf_buffer+len, PRINTF_BUFFER_SIZE-len, fmt, ap);
   7.321  	va_end(ap);
   7.322  
   7.323 -	BUG_ON(len + ret > sizeof(printf_buffer)-1);
   7.324 +	BUG_ON(len + ret > PRINTF_BUFFER_SIZE-1);
   7.325  	dev->has_error = 1;
   7.326  	if (xenbus_write(dev->nodename, "error", printf_buffer) != 0)
   7.327 -		printk("xenbus: failed to write error node for %s (%s)\n",
   7.328 -		       dev->nodename, printf_buffer);
   7.329 +		goto fail;
   7.330 +
   7.331 +	kfree(printf_buffer);
   7.332 +	return;
   7.333 +
   7.334 + fail:
   7.335 +	printk("xenbus: failed to write error node for %s (%s)\n",
   7.336 +	       dev->nodename, printf_buffer);
   7.337  }
   7.338  EXPORT_SYMBOL(xenbus_dev_error);
   7.339  
   7.340 @@ -432,26 +562,6 @@ static int xs_watch(const char *path, co
   7.341  	return xs_error(xs_talkv(XS_WATCH, iov, ARRAY_SIZE(iov), NULL));
   7.342  }
   7.343  
   7.344 -static char **xs_read_watch(unsigned int *num)
   7.345 -{
   7.346 -	enum xsd_sockmsg_type type;
   7.347 -	char *strings;
   7.348 -	unsigned int len;
   7.349 -
   7.350 -	strings = read_reply(&type, &len);
   7.351 -	if (IS_ERR(strings))
   7.352 -		return (char **)strings;
   7.353 -
   7.354 -	BUG_ON(type != XS_WATCH_EVENT);
   7.355 -
   7.356 -	return split(strings, len, num);
   7.357 -}
   7.358 -
   7.359 -static int xs_acknowledge_watch(const char *token)
   7.360 -{
   7.361 -	return xs_error(xs_single(XS_WATCH_ACK, token, NULL));
   7.362 -}
   7.363 -
   7.364  static int xs_unwatch(const char *path, const char *token)
   7.365  {
   7.366  	struct kvec iov[2];
   7.367 @@ -464,7 +574,6 @@ static int xs_unwatch(const char *path, 
   7.368  	return xs_error(xs_talkv(XS_UNWATCH, iov, ARRAY_SIZE(iov), NULL));
   7.369  }
   7.370  
   7.371 -/* A little paranoia: we don't just trust token. */
   7.372  static struct xenbus_watch *find_watch(const char *token)
   7.373  {
   7.374  	struct xenbus_watch *i, *cmp;
   7.375 @@ -474,6 +583,7 @@ static struct xenbus_watch *find_watch(c
   7.376  	list_for_each_entry(i, &watches, list)
   7.377  		if (i == cmp)
   7.378  			return i;
   7.379 +
   7.380  	return NULL;
   7.381  }
   7.382  
   7.383 @@ -485,11 +595,20 @@ int register_xenbus_watch(struct xenbus_
   7.384  	int err;
   7.385  
   7.386  	sprintf(token, "%lX", (long)watch);
   7.387 +
   7.388 +	spin_lock(&watches_lock);
   7.389  	BUG_ON(find_watch(token));
   7.390 +	spin_unlock(&watches_lock);
   7.391  
   7.392  	err = xs_watch(watch->node, token);
   7.393 -	if (!err)
   7.394 +
   7.395 +	/* Ignore errors due to multiple registration. */
   7.396 +	if ((err == 0) || (err == -EEXIST)) {
   7.397 +		spin_lock(&watches_lock);
   7.398  		list_add(&watch->list, &watches);
   7.399 +		spin_unlock(&watches_lock);
   7.400 +	}
   7.401 +
   7.402  	return err;
   7.403  }
   7.404  EXPORT_SYMBOL(register_xenbus_watch);
   7.405 @@ -500,77 +619,188 @@ void unregister_xenbus_watch(struct xenb
   7.406  	int err;
   7.407  
   7.408  	sprintf(token, "%lX", (long)watch);
   7.409 +
   7.410 +	spin_lock(&watches_lock);
   7.411  	BUG_ON(!find_watch(token));
   7.412 +	list_del(&watch->list);
   7.413 +	spin_unlock(&watches_lock);
   7.414 +
   7.415 +	/* Ensure xs_resume() is not in progress (see comments there). */
   7.416 +	wait_event(xs_resuming_waitq, !xs_resuming);
   7.417  
   7.418  	err = xs_unwatch(watch->node, token);
   7.419 -	list_del(&watch->list);
   7.420 -
   7.421  	if (err)
   7.422  		printk(KERN_WARNING
   7.423  		       "XENBUS Failed to release watch %s: %i\n",
   7.424  		       watch->node, err);
   7.425 +
   7.426 +	/* Make sure watch is not in use. */
   7.427 +	flush_scheduled_work();
   7.428  }
   7.429  EXPORT_SYMBOL(unregister_xenbus_watch);
   7.430  
   7.431 -/* Re-register callbacks to all watches. */
   7.432 -void reregister_xenbus_watches(void)
   7.433 +void xs_suspend(void)
   7.434  {
   7.435 +	down(&xs_state.transaction_mutex);
   7.436 +	down(&xs_state.request_mutex);
   7.437 +}
   7.438 +
   7.439 +void xs_resume(void)
   7.440 +{
   7.441 +	struct list_head *ent, *prev_ent = &watches;
   7.442  	struct xenbus_watch *watch;
   7.443  	char token[sizeof(watch) * 2 + 1];
   7.444  
   7.445 -	list_for_each_entry(watch, &watches, list) {
   7.446 -		sprintf(token, "%lX", (long)watch);
   7.447 -		xs_watch(watch->node, token);
   7.448 +	/* Protect against concurrent unregistration and freeing of watches. */
   7.449 +	BUG_ON(xs_resuming);
   7.450 +	xs_resuming = 1;
   7.451 +
   7.452 +	up(&xs_state.request_mutex);
   7.453 +	up(&xs_state.transaction_mutex);
   7.454 +
   7.455 +	/*
   7.456 +	 * Iterate over the watch list re-registering each node. We must
   7.457 +	 * be careful about concurrent registrations and unregistrations.
   7.458 +	 * We search for the node immediately following the previously
   7.459 +	 * re-registered node. If we get no match then either we are done
   7.460 +	 * (previous node is last in list) or the node was unregistered, in
   7.461 +	 * which case we restart from the beginning of the list.
   7.462 +	 * register_xenbus_watch() + unregister_xenbus_watch() is safe because
   7.463 +	 * it will only ever move a watch node earlier in the list, so it
   7.464 +	 * cannot cause us to skip nodes.
   7.465 +	 */
   7.466 +	for (;;) {
   7.467 +		spin_lock(&watches_lock);
   7.468 +		list_for_each(ent, &watches)
   7.469 +			if (ent->prev == prev_ent)
   7.470 +				break;
   7.471 +		spin_unlock(&watches_lock);
   7.472 +
   7.473 +		/* No match because prev_ent is at the end of the list? */
   7.474 +		if ((ent == &watches) && (watches.prev == prev_ent))
   7.475 +			 break; /* We're done! */
   7.476 +
   7.477 +		if ((prev_ent = ent) != &watches) {
   7.478 +			/*
   7.479 +			 * Safe even with watch_lock not held. We are saved by
   7.480 +			 * (xs_resumed==1) check in unregister_xenbus_watch.
   7.481 +			 */
   7.482 +			watch = list_entry(ent, struct xenbus_watch, list);
   7.483 +			sprintf(token, "%lX", (long)watch);
   7.484 +			xs_watch(watch->node, token);
   7.485 +		}
   7.486  	}
   7.487 +
   7.488 +	xs_resuming = 0;
   7.489 +	wake_up(&xs_resuming_waitq);
   7.490 +}
   7.491 +
   7.492 +static void xenbus_fire_watch(void *arg)
   7.493 +{
   7.494 +	struct xs_stored_msg *msg = arg;
   7.495 +
   7.496 +	msg->u.watch.handle->callback(msg->u.watch.handle,
   7.497 +				      (const char **)msg->u.watch.vec,
   7.498 +				      msg->u.watch.vec_size);
   7.499 +
   7.500 +	kfree(msg->u.watch.vec);
   7.501 +	kfree(msg);
   7.502  }
   7.503  
   7.504 -static int watch_thread(void *unused)
   7.505 +static int process_msg(void)
   7.506  {
   7.507 -	for (;;) {
   7.508 -		char **vec = NULL;
   7.509 -		unsigned int num;
   7.510 +	struct xs_stored_msg *msg;
   7.511 +	char *body;
   7.512 +	int err;
   7.513  
   7.514 -		wait_event(xb_waitq, xs_input_avail());
   7.515 +	msg = kmalloc(sizeof(*msg), GFP_KERNEL);
   7.516 +	if (msg == NULL)
   7.517 +		return -ENOMEM;
   7.518 +
   7.519 +	err = xb_read(&msg->hdr, sizeof(msg->hdr));
   7.520 +	if (err) {
   7.521 +		kfree(msg);
   7.522 +		return err;
   7.523 +	}
   7.524  
   7.525 -		/* If this is a spurious wakeup caused by someone
   7.526 -		 * doing an op, they'll hold the lock and the buffer
   7.527 -		 * will be empty by the time we get there.		 
   7.528 -		 */
   7.529 -		down(&xenbus_lock);
   7.530 -		if (xs_input_avail())
   7.531 -			vec = xs_read_watch(&num);
   7.532 +	body = kmalloc(msg->hdr.len + 1, GFP_KERNEL);
   7.533 +	if (body == NULL) {
   7.534 +		kfree(msg);
   7.535 +		return -ENOMEM;
   7.536 +	}
   7.537 +
   7.538 +	err = xb_read(body, msg->hdr.len);
   7.539 +	if (err) {
   7.540 +		kfree(body);
   7.541 +		kfree(msg);
   7.542 +		return err;
   7.543 +	}
   7.544 +	body[msg->hdr.len] = '\0';
   7.545 +
   7.546 +	if (msg->hdr.type == XS_WATCH_EVENT) {
   7.547 +		INIT_WORK(&msg->u.watch.work, xenbus_fire_watch, msg);
   7.548  
   7.549 -		if (vec && !IS_ERR(vec)) {
   7.550 -			struct xenbus_watch *w;
   7.551 -			int err;
   7.552 +		msg->u.watch.vec = split(body, msg->hdr.len,
   7.553 +					 &msg->u.watch.vec_size);
   7.554 +		if (IS_ERR(msg->u.watch.vec)) {
   7.555 +			kfree(msg);
   7.556 +			return PTR_ERR(msg->u.watch.vec);
   7.557 +		}
   7.558  
   7.559 -			err = xs_acknowledge_watch(vec[XS_WATCH_TOKEN]);
   7.560 -			if (err)
   7.561 -				printk(KERN_WARNING "XENBUS ack %s fail %i\n",
   7.562 -				       vec[XS_WATCH_TOKEN], err);
   7.563 -			w = find_watch(vec[XS_WATCH_TOKEN]);
   7.564 -			BUG_ON(!w);
   7.565 -			w->callback(w, (const char **)vec, num);
   7.566 -			kfree(vec);
   7.567 -		} else if (vec)
   7.568 -			printk(KERN_WARNING "XENBUS xs_read_watch: %li\n",
   7.569 -			       PTR_ERR(vec));
   7.570 -		up(&xenbus_lock);
   7.571 +		spin_lock(&watches_lock);
   7.572 +		msg->u.watch.handle = find_watch(
   7.573 +			msg->u.watch.vec[XS_WATCH_TOKEN]);
   7.574 +		if (msg->u.watch.handle != NULL) {
   7.575 +			schedule_work(&msg->u.watch.work);
   7.576 +		} else {
   7.577 +			kfree(msg->u.watch.vec);
   7.578 +			kfree(msg);
   7.579 +		}
   7.580 +		spin_unlock(&watches_lock);
   7.581 +	} else {
   7.582 +		msg->u.reply.body = body;
   7.583 +		spin_lock(&xs_state.reply_lock);
   7.584 +		list_add_tail(&msg->u.reply.list, &xs_state.reply_list);
   7.585 +		spin_unlock(&xs_state.reply_lock);
   7.586 +		wake_up(&xs_state.reply_waitq);
   7.587 +	}
   7.588 +
   7.589 +	return 0;
   7.590 +}
   7.591 +
   7.592 +static int read_thread(void *unused)
   7.593 +{
   7.594 +	int err;
   7.595 +
   7.596 +	for (;;) {
   7.597 +		err = process_msg();
   7.598 +		if (err)
   7.599 +			printk(KERN_WARNING "XENBUS error %d while reading "
   7.600 +			       "message\n", err);
   7.601  	}
   7.602  }
   7.603  
   7.604  int xs_init(void)
   7.605  {
   7.606  	int err;
   7.607 -	struct task_struct *watcher;
   7.608 +	struct task_struct *reader;
   7.609 +
   7.610 +	INIT_LIST_HEAD(&xs_state.reply_list);
   7.611 +	spin_lock_init(&xs_state.reply_lock);
   7.612 +	init_waitqueue_head(&xs_state.reply_waitq);
   7.613 +
   7.614 +	init_MUTEX(&xs_state.request_mutex);
   7.615 +	init_MUTEX(&xs_state.transaction_mutex);
   7.616 +	xs_state.transaction_pid = -1;
   7.617  
   7.618  	err = xb_init_comms();
   7.619  	if (err)
   7.620  		return err;
   7.621  	
   7.622 -	watcher = kthread_run(watch_thread, NULL, "kxbwatch");
   7.623 -	if (IS_ERR(watcher))
   7.624 -		return PTR_ERR(watcher);
   7.625 +	reader = kthread_run(read_thread, NULL, "xenbusd");
   7.626 +	if (IS_ERR(reader))
   7.627 +		return PTR_ERR(reader);
   7.628 +
   7.629  	return 0;
   7.630  }
   7.631  
     8.1 --- a/linux-2.6-xen-sparse/include/asm-xen/xenbus.h	Sun Oct 09 12:29:24 2005 -0400
     8.2 +++ b/linux-2.6-xen-sparse/include/asm-xen/xenbus.h	Sun Oct 09 18:52:54 2005 +0100
     8.3 @@ -78,10 +78,6 @@ int xenbus_register_driver(struct xenbus
     8.4  int xenbus_register_backend(struct xenbus_driver *drv);
     8.5  void xenbus_unregister_driver(struct xenbus_driver *drv);
     8.6  
     8.7 -/* Caller must hold this lock to call these functions: it's also held
     8.8 - * across watch callbacks. */
     8.9 -extern struct semaphore xenbus_lock;
    8.10 -
    8.11  char **xenbus_directory(const char *dir, const char *node, unsigned int *num);
    8.12  void *xenbus_read(const char *dir, const char *node, unsigned int *len);
    8.13  int xenbus_write(const char *dir, const char *node, const char *string);
    8.14 @@ -113,7 +109,11 @@ void xenbus_dev_ok(struct xenbus_device 
    8.15  struct xenbus_watch
    8.16  {
    8.17  	struct list_head list;
    8.18 +
    8.19 +	/* Path being watched. */
    8.20  	char *node;
    8.21 +
    8.22 +	/* Callback (executed in a process context with no locks held). */
    8.23  	void (*callback)(struct xenbus_watch *,
    8.24  			 const char **vec, unsigned int len);
    8.25  };
    8.26 @@ -124,7 +124,11 @@ void unregister_xenstore_notifier(struct
    8.27  
    8.28  int register_xenbus_watch(struct xenbus_watch *watch);
    8.29  void unregister_xenbus_watch(struct xenbus_watch *watch);
    8.30 -void reregister_xenbus_watches(void);
    8.31 +void xs_suspend(void);
    8.32 +void xs_resume(void);
    8.33 +
    8.34 +/* Used by xenbus_dev to borrow kernel's store connection. */
    8.35 +void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg);
    8.36  
    8.37  /* Called from xen core code. */
    8.38  void xenbus_suspend(void);
     9.1 --- a/tools/blktap/xenbus.c	Sun Oct 09 12:29:24 2005 -0400
     9.2 +++ b/tools/blktap/xenbus.c	Sun Oct 09 18:52:54 2005 +0100
     9.3 @@ -260,10 +260,6 @@ int xs_fire_next_watch(struct xs_handle 
     9.4      node  = res[XS_WATCH_PATH];
     9.5      token = res[XS_WATCH_TOKEN];
     9.6  
     9.7 -    er = xs_acknowledge_watch(h, token);
     9.8 -    if (er == 0)
     9.9 -        warn("Couldn't acknowledge watch (%s)", token);
    9.10 -
    9.11      w = find_watch(token);
    9.12      if (!w)
    9.13      {
    10.1 --- a/tools/console/daemon/io.c	Sun Oct 09 12:29:24 2005 -0400
    10.2 +++ b/tools/console/daemon/io.c	Sun Oct 09 18:52:54 2005 +0100
    10.3 @@ -505,7 +505,6 @@ static void handle_xs(int fd)
    10.4  			domain_create_ring(dom);
    10.5  	}
    10.6  
    10.7 -	xs_acknowledge_watch(xs, vec[1]);
    10.8  	free(vec);
    10.9  }
   10.10  
    11.1 --- a/tools/python/xen/lowlevel/xs/xs.c	Sun Oct 09 12:29:24 2005 -0400
    11.2 +++ b/tools/python/xen/lowlevel/xs/xs.c	Sun Oct 09 18:52:54 2005 +0100
    11.3 @@ -442,9 +442,6 @@ static PyObject *xspy_watch(PyObject *se
    11.4  
    11.5  #define xspy_read_watch_doc "\n"				\
    11.6  	"Read a watch notification.\n"				\
    11.7 -	"The notification must be acknowledged by passing\n"	\
    11.8 -	"the token to acknowledge_watch().\n"			\
    11.9 -	" path [string]: xenstore path.\n"			\
   11.10  	"\n"							\
   11.11  	"Returns: [tuple] (path, token).\n"			\
   11.12  	"Raises RuntimeError on error.\n"			\
   11.13 @@ -495,44 +492,6 @@ static PyObject *xspy_read_watch(PyObjec
   11.14      return val;
   11.15  }
   11.16  
   11.17 -#define xspy_acknowledge_watch_doc "\n"					\
   11.18 -	"Acknowledge a watch notification that has been read.\n"	\
   11.19 -	" token [string] : from the watch notification\n"		\
   11.20 -	"\n"								\
   11.21 -	"Returns None on success.\n"					\
   11.22 -	"Raises RuntimeError on error.\n"				\
   11.23 -	"\n"
   11.24 -
   11.25 -static PyObject *xspy_acknowledge_watch(PyObject *self, PyObject *args,
   11.26 -                                        PyObject *kwds)
   11.27 -{
   11.28 -    static char *kwd_spec[] = { "token", NULL };
   11.29 -    static char *arg_spec = "O";
   11.30 -    PyObject *token;
   11.31 -    char token_str[MAX_STRLEN(unsigned long) + 1];
   11.32 -
   11.33 -    struct xs_handle *xh = xshandle(self);
   11.34 -    PyObject *val = NULL;
   11.35 -    int xsval = 0;
   11.36 -
   11.37 -    if (!xh)
   11.38 -        goto exit;
   11.39 -    if (!PyArg_ParseTupleAndKeywords(args, kwds, arg_spec, kwd_spec, &token))
   11.40 -        goto exit;
   11.41 -    sprintf(token_str, "%li", (unsigned long)token);
   11.42 -    Py_BEGIN_ALLOW_THREADS
   11.43 -    xsval = xs_acknowledge_watch(xh, token_str);
   11.44 -    Py_END_ALLOW_THREADS
   11.45 -    if (!xsval) {
   11.46 -        PyErr_SetFromErrno(PyExc_RuntimeError);
   11.47 -        goto exit;
   11.48 -    }
   11.49 -    Py_INCREF(Py_None);
   11.50 -    val = Py_None;
   11.51 - exit:
   11.52 -    return val;
   11.53 -}
   11.54 -
   11.55  #define xspy_unwatch_doc "\n"				\
   11.56  	"Stop watching a path.\n"			\
   11.57  	" path  [string] : xenstore path.\n"		\
   11.58 @@ -833,7 +792,6 @@ static PyMethodDef xshandle_methods[] = 
   11.59       XSPY_METH(set_permissions),
   11.60       XSPY_METH(watch),
   11.61       XSPY_METH(read_watch),
   11.62 -     XSPY_METH(acknowledge_watch),
   11.63       XSPY_METH(unwatch),
   11.64       XSPY_METH(transaction_start),
   11.65       XSPY_METH(transaction_end),
    12.1 --- a/tools/python/xen/xend/xenstore/xswatch.py	Sun Oct 09 12:29:24 2005 -0400
    12.2 +++ b/tools/python/xen/xend/xenstore/xswatch.py	Sun Oct 09 18:52:54 2005 +0100
    12.3 @@ -8,6 +8,7 @@
    12.4  import select
    12.5  import threading
    12.6  from xen.lowlevel import xs
    12.7 +from xen.xend.xenstore.xsutil import xshandle
    12.8  
    12.9  class xswatch:
   12.10  
   12.11 @@ -27,10 +28,7 @@ class xswatch:
   12.12          if cls.watchThread:
   12.13              cls.xslock.release()
   12.14              return
   12.15 -        # XXX: When we fix xenstored to have better watch semantics,
   12.16 -        # this can change to shared xshandle(). Currently that would result
   12.17 -        # in duplicate watch firings, thus failed extra xs.acknowledge_watch.
   12.18 -        cls.xs = xs.open()
   12.19 +        cls.xs = xshandle()
   12.20          cls.watchThread = threading.Thread(name="Watcher",
   12.21                                             target=cls.watchMain)
   12.22          cls.watchThread.setDaemon(True)
   12.23 @@ -43,11 +41,10 @@ class xswatch:
   12.24          while True:
   12.25              try:
   12.26                  we = cls.xs.read_watch()
   12.27 -                watch = we[1]
   12.28 -                cls.xs.acknowledge_watch(watch)
   12.29              except RuntimeError, ex:
   12.30                  print ex
   12.31                  raise
   12.32 +            watch = we[1]
   12.33              watch.fn(*watch.args, **watch.kwargs)
   12.34  
   12.35      watchMain = classmethod(watchMain)
    13.1 --- a/tools/xenstore/testsuite/07watch.test	Sun Oct 09 12:29:24 2005 -0400
    13.2 +++ b/tools/xenstore/testsuite/07watch.test	Sun Oct 09 18:52:54 2005 +0100
    13.3 @@ -5,7 +5,6 @@ 1 watch /test token
    13.4  2 write /test contents2
    13.5  expect 1:/test:token
    13.6  1 waitwatch
    13.7 -1 ackwatch token
    13.8  1 close
    13.9  
   13.10  # Check that reads don't set it off.
   13.11 @@ -22,15 +21,12 @@ 1 watch /dir token
   13.12  2 mkdir /dir/newdir
   13.13  expect 1:/dir/newdir:token
   13.14  1 waitwatch
   13.15 -1 ackwatch token
   13.16  2 setperm /dir/newdir 0 READ
   13.17  expect 1:/dir/newdir:token
   13.18  1 waitwatch
   13.19 -1 ackwatch token
   13.20  2 rm /dir/newdir
   13.21  expect 1:/dir/newdir:token
   13.22  1 waitwatch
   13.23 -1 ackwatch token
   13.24  1 close
   13.25  2 close
   13.26  
   13.27 @@ -49,7 +45,6 @@ expect contents
   13.28  read /dir/test
   13.29  expect /dir/test:token
   13.30  waitwatch
   13.31 -ackwatch token
   13.32  close
   13.33  
   13.34  # watch priority test: all simultaneous
   13.35 @@ -59,13 +54,10 @@ 2 watch /dir token2
   13.36  write /dir/test contents
   13.37  expect 3:/dir/test:token3
   13.38  3 waitwatch
   13.39 -3 ackwatch token3
   13.40  expect 2:/dir/test:token2
   13.41  2 waitwatch
   13.42 -2 ackwatch token2
   13.43  expect 1:/dir/test:token1
   13.44  1 waitwatch
   13.45 -1 ackwatch token1
   13.46  1 close
   13.47  2 close
   13.48  3 close
   13.49 @@ -79,7 +71,6 @@ 2 waitwatch
   13.50  2 close
   13.51  expect 1:/dir/test:token1
   13.52  1 waitwatch
   13.53 -1 ackwatch token1
   13.54  1 close
   13.55  
   13.56  # If one dies (without reading at all), the other should still get ack.
   13.57 @@ -89,7 +80,6 @@ write /dir/test contents
   13.58  2 close
   13.59  expect 1:/dir/test:token1
   13.60  1 waitwatch
   13.61 -1 ackwatch token1
   13.62  1 close
   13.63  2 close
   13.64  
   13.65 @@ -111,7 +101,6 @@ write /dir/test contents
   13.66  2 unwatch /dir token2
   13.67  expect 1:/dir/test:token1
   13.68  1 waitwatch
   13.69 -1 ackwatch token1
   13.70  1 close
   13.71  2 close
   13.72  
   13.73 @@ -123,14 +112,12 @@ 1 watch /dir/test token2
   13.74  write /dir/test contents2
   13.75  expect 1:/dir/test:token2
   13.76  1 waitwatch
   13.77 -1 ackwatch token2
   13.78  
   13.79  # check we only get notified once.
   13.80  1 watch /test token
   13.81  2 write /test contents2
   13.82  expect 1:/test:token
   13.83  1 waitwatch
   13.84 -1 ackwatch token
   13.85  expect 1: waitwatch failed: Connection timed out
   13.86  1 waitwatch
   13.87  1 close
   13.88 @@ -142,13 +129,10 @@ 2 write /test2 contents
   13.89  2 write /test3 contents
   13.90  expect 1:/test1:token
   13.91  1 waitwatch
   13.92 -1 ackwatch token
   13.93  expect 1:/test2:token
   13.94  1 waitwatch
   13.95 -1 ackwatch token
   13.96  expect 1:/test3:token
   13.97  1 waitwatch
   13.98 -1 ackwatch token
   13.99  1 close
  13.100  
  13.101  # Creation of subpaths should be covered correctly.
  13.102 @@ -157,10 +141,8 @@ 2 write /test/subnode contents2
  13.103  2 write /test/subnode/subnode contents2
  13.104  expect 1:/test/subnode:token
  13.105  1 waitwatch
  13.106 -1 ackwatch token
  13.107  expect 1:/test/subnode/subnode:token
  13.108  1 waitwatch
  13.109 -1 ackwatch token
  13.110  expect 1: waitwatch failed: Connection timed out
  13.111  1 waitwatch
  13.112  1 close
  13.113 @@ -171,7 +153,6 @@ 2 write /test/subnode contents2
  13.114  1 watchnoack / token2 0
  13.115  expect 1:/test/subnode:token
  13.116  1 waitwatch
  13.117 -1 ackwatch token
  13.118  expect 1:/:token2
  13.119  1 waitwatch
  13.120  expect 1: waitwatch failed: Connection timed out
  13.121 @@ -183,7 +164,6 @@ 1 watch /test/subnode token
  13.122  2 rm /test
  13.123  expect 1:/test/subnode:token
  13.124  1 waitwatch
  13.125 -1 ackwatch token
  13.126  
  13.127  # Watch should not double-send after we ack, even if we did something in between.
  13.128  1 watch /test2 token
  13.129 @@ -192,6 +172,5 @@ expect 1:/test2/foo:token
  13.130  1 waitwatch
  13.131  expect 1:contents2
  13.132  1 read /test2/foo
  13.133 -1 ackwatch token
  13.134  expect 1: waitwatch failed: Connection timed out
  13.135  1 waitwatch
    14.1 --- a/tools/xenstore/testsuite/08transaction.test	Sun Oct 09 12:29:24 2005 -0400
    14.2 +++ b/tools/xenstore/testsuite/08transaction.test	Sun Oct 09 18:52:54 2005 +0100
    14.3 @@ -68,7 +68,6 @@ 2 mkdir /test/dir/sub
    14.4  2 commit
    14.5  expect 1:/test/dir/sub:token
    14.6  1 waitwatch
    14.7 -1 ackwatch token
    14.8  1 close
    14.9  
   14.10  # Rm inside transaction works like rm outside: children get notified.
   14.11 @@ -78,7 +77,6 @@ 2 rm /test/dir
   14.12  2 commit
   14.13  expect 1:/test/dir/sub:token
   14.14  1 waitwatch
   14.15 -1 ackwatch token
   14.16  1 close
   14.17  
   14.18  # Multiple events from single transaction don't trigger assert
   14.19 @@ -89,8 +87,6 @@ 2 write /test/2 contents
   14.20  2 commit
   14.21  expect 1:/test/1:token
   14.22  1 waitwatch
   14.23 -1 ackwatch token
   14.24  expect 1:/test/2:token
   14.25  1 waitwatch
   14.26 -1 ackwatch token
   14.27  1 close
    15.1 --- a/tools/xenstore/testsuite/10domain-homedir.test	Sun Oct 09 12:29:24 2005 -0400
    15.2 +++ b/tools/xenstore/testsuite/10domain-homedir.test	Sun Oct 09 18:52:54 2005 +0100
    15.3 @@ -16,4 +16,3 @@ 1 watch foo token
    15.4  write /home/foo/bar contents
    15.5  expect 1:foo/bar:token
    15.6  1 waitwatch
    15.7 -1 ackwatch token
    16.1 --- a/tools/xenstore/testsuite/11domain-watch.test	Sun Oct 09 12:29:24 2005 -0400
    16.2 +++ b/tools/xenstore/testsuite/11domain-watch.test	Sun Oct 09 18:52:54 2005 +0100
    16.3 @@ -10,7 +10,6 @@ 1 watch /test token
    16.4  write /test contents2
    16.5  expect 1:/test:token
    16.6  1 waitwatch
    16.7 -1 ackwatch token
    16.8  1 unwatch /test token
    16.9  release 1
   16.10  1 close
   16.11 @@ -25,7 +24,6 @@ 1 write /dir/test3 contents3
   16.12  1 write /dir/test4 contents4
   16.13  expect 1:/dir/test:token
   16.14  1 waitwatch
   16.15 -1 ackwatch token
   16.16  release 1
   16.17  1 close
   16.18  
    17.1 --- a/tools/xenstore/testsuite/12readonly.test	Sun Oct 09 12:29:24 2005 -0400
    17.2 +++ b/tools/xenstore/testsuite/12readonly.test	Sun Oct 09 18:52:54 2005 +0100
    17.3 @@ -36,4 +36,3 @@ 1 readwrite
    17.4  1 write /test contents
    17.5  expect /test:token
    17.6  waitwatch
    17.7 -ackwatch token
    18.1 --- a/tools/xenstore/testsuite/13watch-ack.test	Sun Oct 09 12:29:24 2005 -0400
    18.2 +++ b/tools/xenstore/testsuite/13watch-ack.test	Sun Oct 09 18:52:54 2005 +0100
    18.3 @@ -18,5 +18,4 @@ expect 1:/test/2:token2
    18.4  1 waitwatch
    18.5  3 write /test/1 contents1
    18.6  4 write /test/3 contents3
    18.7 -1 ackwatch token2
    18.8  1 close
    19.1 --- a/tools/xenstore/xenstored_core.c	Sun Oct 09 12:29:24 2005 -0400
    19.2 +++ b/tools/xenstore/xenstored_core.c	Sun Oct 09 18:52:54 2005 +0100
    19.3 @@ -154,7 +154,6 @@ static char *sockmsg_string(enum xsd_soc
    19.4  	case XS_READ: return "READ";
    19.5  	case XS_GET_PERMS: return "GET_PERMS";
    19.6  	case XS_WATCH: return "WATCH";
    19.7 -	case XS_WATCH_ACK: return "WATCH_ACK";
    19.8  	case XS_UNWATCH: return "UNWATCH";
    19.9  	case XS_TRANSACTION_START: return "TRANSACTION_START";
   19.10  	case XS_TRANSACTION_END: return "TRANSACTION_END";
   19.11 @@ -1103,10 +1102,6 @@ static void process_message(struct conne
   19.12  		do_watch(conn, in);
   19.13  		break;
   19.14  
   19.15 -	case XS_WATCH_ACK:
   19.16 -		do_watch_ack(conn, onearg(in));
   19.17 -		break;
   19.18 -
   19.19  	case XS_UNWATCH:
   19.20  		do_unwatch(conn, in);
   19.21  		break;
   19.22 @@ -1168,11 +1163,6 @@ static void consider_message(struct conn
   19.23  		xprintf("Got message %s len %i from %p\n",
   19.24  			sockmsg_string(type), conn->in->hdr.msg.len, conn);
   19.25  
   19.26 -	/* We might get a command while waiting for an ack: this means
   19.27 -	 * the other end discarded it: we will re-transmit. */
   19.28 -	if (type != XS_WATCH_ACK)
   19.29 -		conn->waiting_for_ack = NULL;
   19.30 -
   19.31  	/* Careful: process_message may free connection.  We detach
   19.32  	 * "in" beforehand and allocate the new buffer to avoid
   19.33  	 * touching conn after process_message.
   19.34 @@ -1266,7 +1256,6 @@ struct connection *new_connection(connwr
   19.35  
   19.36  	new->state = OK;
   19.37  	new->out = new->waiting_reply = NULL;
   19.38 -	new->waiting_for_ack = NULL;
   19.39  	new->fd = -1;
   19.40  	new->id = 0;
   19.41  	new->domain = NULL;
    20.1 --- a/tools/xenstore/xenstored_core.h	Sun Oct 09 12:29:24 2005 -0400
    20.2 +++ b/tools/xenstore/xenstored_core.h	Sun Oct 09 18:52:54 2005 +0100
    20.3 @@ -71,9 +71,6 @@ struct connection
    20.4  	/* Is this a read-only connection? */
    20.5  	bool can_write;
    20.6  
    20.7 -	/* Are we waiting for a watch event ack? */
    20.8 -	struct watch *waiting_for_ack;
    20.9 -
   20.10  	/* Buffered incoming data. */
   20.11  	struct buffered_data *in;
   20.12  
    21.1 --- a/tools/xenstore/xenstored_watch.c	Sun Oct 09 12:29:24 2005 -0400
    21.2 +++ b/tools/xenstore/xenstored_watch.c	Sun Oct 09 18:52:54 2005 +0100
    21.3 @@ -69,18 +69,14 @@ void queue_next_event(struct connection 
    21.4  	if (conn->waiting_reply) {
    21.5  		conn->out = conn->waiting_reply;
    21.6  		conn->waiting_reply = NULL;
    21.7 -		conn->waiting_for_ack = NULL;
    21.8  		return;
    21.9  	}
   21.10  
   21.11 -	/* If we're already waiting for ack, don't queue more. */
   21.12 -	if (conn->waiting_for_ack)
   21.13 -		return;
   21.14 -
   21.15  	list_for_each_entry(watch, &conn->watches, list) {
   21.16  		event = list_top(&watch->events, struct watch_event, list);
   21.17  		if (event) {
   21.18 -			conn->waiting_for_ack = watch;
   21.19 +			list_del(&event->list);
   21.20 +			talloc_free(event);
   21.21  			send_reply(conn,XS_WATCH_EVENT,event->data,event->len);
   21.22  			break;
   21.23  		}
   21.24 @@ -181,6 +177,15 @@ void do_watch(struct connection *conn, s
   21.25  		}
   21.26  	}
   21.27  
   21.28 +	/* Check for duplicates. */
   21.29 +	list_for_each_entry(watch, &conn->watches, list) {
   21.30 +		if (streq(watch->node, vec[0]) &&
   21.31 +                    streq(watch->token, vec[1])) {
   21.32 +			send_error(conn, EEXIST);
   21.33 +			return;
   21.34 +		}
   21.35 +	}
   21.36 +
   21.37  	watch = talloc(conn, struct watch);
   21.38  	watch->node = talloc_strdup(watch, vec[0]);
   21.39  	watch->token = talloc_strdup(watch, vec[1]);
   21.40 @@ -200,37 +205,6 @@ void do_watch(struct connection *conn, s
   21.41  	add_event(conn, watch, watch->node);
   21.42  }
   21.43  
   21.44 -void do_watch_ack(struct connection *conn, const char *token)
   21.45 -{
   21.46 -	struct watch_event *event;
   21.47 -
   21.48 -	if (!token) {
   21.49 -		send_error(conn, EINVAL);
   21.50 -		return;
   21.51 -	}
   21.52 -
   21.53 -	if (!conn->waiting_for_ack) {
   21.54 -		send_error(conn, ENOENT);
   21.55 -		return;
   21.56 -	}
   21.57 -
   21.58 -	if (!streq(conn->waiting_for_ack->token, token)) {
   21.59 -		/* They're confused: this will cause us to send event again */
   21.60 -		conn->waiting_for_ack = NULL;
   21.61 -		send_error(conn, EINVAL);
   21.62 -		return;
   21.63 -	}
   21.64 -
   21.65 -	/* Remove event: after ack sent, core will call queue_next_event */
   21.66 -	event = list_top(&conn->waiting_for_ack->events, struct watch_event,
   21.67 -			 list);
   21.68 -	list_del(&event->list);
   21.69 -	talloc_free(event);
   21.70 -
   21.71 -	conn->waiting_for_ack = NULL;
   21.72 -	send_ack(conn, XS_WATCH_ACK);
   21.73 -}
   21.74 -
   21.75  void do_unwatch(struct connection *conn, struct buffered_data *in)
   21.76  {
   21.77  	struct watch *watch;
   21.78 @@ -241,9 +215,6 @@ void do_unwatch(struct connection *conn,
   21.79  		return;
   21.80  	}
   21.81  
   21.82 -	/* We don't need to worry if we're waiting for an ack for the
   21.83 -	 * watch we're deleting: conn->waiting_for_ack was reset by
   21.84 -	 * this command in consider_message anyway. */
   21.85  	node = canonicalize(conn, vec[0]);
   21.86  	list_for_each_entry(watch, &conn->watches, list) {
   21.87  		if (streq(watch->node, node) && streq(watch->token, vec[1])) {
   21.88 @@ -262,11 +233,6 @@ void dump_watches(struct connection *con
   21.89  	struct watch *watch;
   21.90  	struct watch_event *event;
   21.91  
   21.92 -	if (conn->waiting_for_ack)
   21.93 -		printf("    waiting_for_ack for watch on %s token %s\n",
   21.94 -		       conn->waiting_for_ack->node,
   21.95 -		       conn->waiting_for_ack->token);
   21.96 -
   21.97  	list_for_each_entry(watch, &conn->watches, list) {
   21.98  		printf("    watch on %s token %s\n",
   21.99  		       watch->node, watch->token);
    22.1 --- a/tools/xenstore/xs.c	Sun Oct 09 12:29:24 2005 -0400
    22.2 +++ b/tools/xenstore/xs.c	Sun Oct 09 18:52:54 2005 +0100
    22.3 @@ -78,10 +78,30 @@ struct xs_handle {
    22.4  
    22.5  	/* One transaction at a time. */
    22.6  	pthread_mutex_t transaction_mutex;
    22.7 +	pthread_t transaction_pthread;
    22.8  };
    22.9  
   22.10  static void *read_thread(void *arg);
   22.11  
   22.12 +static void request_mutex_acquire(struct xs_handle *h)
   22.13 +{
   22.14 +	/*
   22.15 +	 * We can't distinguish non-transactional from transactional
   22.16 +	 * requests right now. So temporarily acquire the transaction mutex
   22.17 +	 * if this task is outside transaction context.
   22.18 + 	 */
   22.19 +	if (h->transaction_pthread != pthread_self())
   22.20 +		pthread_mutex_lock(&h->transaction_mutex);
   22.21 +	pthread_mutex_lock(&h->request_mutex);
   22.22 +}
   22.23 +
   22.24 +static void request_mutex_release(struct xs_handle *h)
   22.25 +{
   22.26 +	pthread_mutex_unlock(&h->request_mutex);
   22.27 +	if (h->transaction_pthread != pthread_self())
   22.28 +		pthread_mutex_unlock(&h->transaction_mutex);
   22.29 +}
   22.30 +
   22.31  int xs_fileno(struct xs_handle *h)
   22.32  {
   22.33  	char c = 0;
   22.34 @@ -163,6 +183,7 @@ static struct xs_handle *get_handle(cons
   22.35  
   22.36  	pthread_mutex_init(&h->request_mutex, NULL);
   22.37  	pthread_mutex_init(&h->transaction_mutex, NULL);
   22.38 +	h->transaction_pthread = -1;
   22.39  
   22.40  	if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0)
   22.41  		goto error;
   22.42 @@ -316,7 +337,7 @@ static void *xs_talkv(struct xs_handle *
   22.43  	ignorepipe.sa_flags = 0;
   22.44  	sigaction(SIGPIPE, &ignorepipe, &oldact);
   22.45  
   22.46 -	pthread_mutex_lock(&h->request_mutex);
   22.47 +	request_mutex_acquire(h);
   22.48  
   22.49  	if (!xs_write_all(h->fd, &msg, sizeof(msg)))
   22.50  		goto fail;
   22.51 @@ -329,7 +350,7 @@ static void *xs_talkv(struct xs_handle *
   22.52  	if (!ret)
   22.53  		goto fail;
   22.54  
   22.55 -	pthread_mutex_unlock(&h->request_mutex);
   22.56 +	request_mutex_release(h);
   22.57  
   22.58  	sigaction(SIGPIPE, &oldact, NULL);
   22.59  	if (msg.type == XS_ERROR) {
   22.60 @@ -350,7 +371,7 @@ static void *xs_talkv(struct xs_handle *
   22.61  fail:
   22.62  	/* We're in a bad state, so close fd. */
   22.63  	saved_errno = errno;
   22.64 -	pthread_mutex_unlock(&h->request_mutex);
   22.65 +	request_mutex_release(h);
   22.66  	sigaction(SIGPIPE, &oldact, NULL);
   22.67  close_fd:
   22.68  	close(h->fd);
   22.69 @@ -593,15 +614,6 @@ char **xs_read_watch(struct xs_handle *h
   22.70  	return ret;
   22.71  }
   22.72  
   22.73 -/* Acknowledge watch on node.  Watches must be acknowledged before
   22.74 - * any other watches can be read.
   22.75 - * Returns false on failure.
   22.76 - */
   22.77 -bool xs_acknowledge_watch(struct xs_handle *h, const char *token)
   22.78 -{
   22.79 -	return xs_bool(xs_single(h, XS_WATCH_ACK, token, NULL));
   22.80 -}
   22.81 -
   22.82  /* Remove a watch on a node.
   22.83   * Returns false on failure (no watch on that node).
   22.84   */
   22.85 @@ -624,8 +636,18 @@ bool xs_unwatch(struct xs_handle *h, con
   22.86   */
   22.87  bool xs_transaction_start(struct xs_handle *h)
   22.88  {
   22.89 +	bool rc;
   22.90 +
   22.91  	pthread_mutex_lock(&h->transaction_mutex);
   22.92 -	return xs_bool(xs_single(h, XS_TRANSACTION_START, "", NULL));
   22.93 +	h->transaction_pthread = pthread_self();
   22.94 +
   22.95 +	rc = xs_bool(xs_single(h, XS_TRANSACTION_START, "", NULL));
   22.96 +	if (!rc) {
   22.97 +		h->transaction_pthread = -1;
   22.98 +		pthread_mutex_unlock(&h->transaction_mutex);
   22.99 +	}
  22.100 +
  22.101 +	return rc;
  22.102  }
  22.103  
  22.104  /* End a transaction.
  22.105 @@ -645,6 +667,7 @@ bool xs_transaction_end(struct xs_handle
  22.106  	
  22.107  	rc = xs_bool(xs_single(h, XS_TRANSACTION_END, abortstr, NULL));
  22.108  
  22.109 +	h->transaction_pthread = -1;
  22.110  	pthread_mutex_unlock(&h->transaction_mutex);
  22.111  
  22.112  	return rc;
    23.1 --- a/tools/xenstore/xs.h	Sun Oct 09 12:29:24 2005 -0400
    23.2 +++ b/tools/xenstore/xs.h	Sun Oct 09 18:52:54 2005 +0100
    23.3 @@ -96,12 +96,6 @@ int xs_fileno(struct xs_handle *h);
    23.4   */
    23.5  char **xs_read_watch(struct xs_handle *h, unsigned int *num);
    23.6  
    23.7 -/* Acknowledge watch on node.  Watches must be acknowledged before
    23.8 - * any other watches can be read.
    23.9 - * Returns false on failure.
   23.10 - */
   23.11 -bool xs_acknowledge_watch(struct xs_handle *h, const char *token);
   23.12 -
   23.13  /* Remove a watch on a node: implicitly acks any outstanding watch.
   23.14   * Returns false on failure (no watch on that node).
   23.15   */
    24.1 --- a/tools/xenstore/xs_test.c	Sun Oct 09 12:29:24 2005 -0400
    24.2 +++ b/tools/xenstore/xs_test.c	Sun Oct 09 18:52:54 2005 +0100
    24.3 @@ -201,7 +201,6 @@ static void __attribute__((noreturn)) us
    24.4  	     "  watch <path> <token>\n"
    24.5  	     "  watchnoack <path> <token>\n"
    24.6  	     "  waitwatch\n"
    24.7 -	     "  ackwatch <token>\n"
    24.8  	     "  unwatch <path> <token>\n"
    24.9  	     "  close\n"
   24.10  	     "  start <node>\n"
   24.11 @@ -455,8 +454,6 @@ static void do_watch(unsigned int handle
   24.12  		    !streq(vec[XS_WATCH_PATH], node) ||
   24.13  		    !streq(vec[XS_WATCH_TOKEN], token))
   24.14  			failed(handle);
   24.15 -		if (!xs_acknowledge_watch(handles[handle], token))
   24.16 -			failed(handle);
   24.17  	}
   24.18  }
   24.19  
   24.20 @@ -515,12 +512,6 @@ static void do_waitwatch(unsigned int ha
   24.21  	free(vec);
   24.22  }
   24.23  
   24.24 -static void do_ackwatch(unsigned int handle, const char *token)
   24.25 -{
   24.26 -	if (!xs_acknowledge_watch(handles[handle], token))
   24.27 -		failed(handle);
   24.28 -}
   24.29 -
   24.30  static void do_unwatch(unsigned int handle, const char *node, const char *token)
   24.31  {
   24.32  	if (!xs_unwatch(handles[handle], node, token))
   24.33 @@ -746,8 +737,6 @@ static void do_command(unsigned int defa
   24.34  		do_watch(handle, arg(line, 1), arg(line, 2), false);
   24.35  	else if (streq(command, "waitwatch"))
   24.36  		do_waitwatch(handle);
   24.37 -	else if (streq(command, "ackwatch"))
   24.38 -		do_ackwatch(handle, arg(line, 1));
   24.39  	else if (streq(command, "unwatch"))
   24.40  		do_unwatch(handle, arg(line, 1), arg(line, 2));
   24.41  	else if (streq(command, "close")) {
    25.1 --- a/xen/include/public/io/xs_wire.h	Sun Oct 09 12:29:24 2005 -0400
    25.2 +++ b/xen/include/public/io/xs_wire.h	Sun Oct 09 18:52:54 2005 +0100
    25.3 @@ -35,11 +35,9 @@ enum xsd_sockmsg_type
    25.4  	XS_READ,
    25.5  	XS_GET_PERMS,
    25.6  	XS_WATCH,
    25.7 -	XS_WATCH_ACK,
    25.8  	XS_UNWATCH,
    25.9  	XS_TRANSACTION_START,
   25.10  	XS_TRANSACTION_END,
   25.11 -	XS_OP_READ_ONLY = XS_TRANSACTION_END,
   25.12  	XS_INTRODUCE,
   25.13  	XS_RELEASE,
   25.14  	XS_GET_DOMAIN_PATH,