ia64/xen-unstable

changeset 7300:015f8ae81276

xenstored now supports multiple concurrent transactions per
connection, plus interleaving of transactional and
non-transactional accesses. A transaction identifier is added
to the xsd_sockmsg header structure (0 means 'not in context
of a transaction'). The user and kernel xs interfaces accept
a pointer to a transaction handle where appropriate --
currently this is directly cast to an integer identifier in
the client library / kernel driver, but will allow for keeping
extra dynamic client-side state in future if we need to.

The transaction mutex has now gone. It's replaced with a
read-write mutex, but this is only acquired for exclusive
access during suspend/resume, to ensure there are no in-progress
transactions.

Signed-off-by: Keir Fraser <keir@xensource.com>
author kaf24@firebug.cl.cam.ac.uk
date Mon Oct 10 15:38:01 2005 +0100 (2005-10-10)
parents 1ac39c7a0435
children f9bd8df8a098
files linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_dev.c linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c tools/xenstore/utils.h tools/xenstore/xenstore_client.c tools/xenstore/xenstored_core.c tools/xenstore/xenstored_core.h tools/xenstore/xenstored_transaction.c tools/xenstore/xenstored_transaction.h tools/xenstore/xs.c
line diff
     1.1 --- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_dev.c	Mon Oct 10 14:46:53 2005 +0100
     1.2 +++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_dev.c	Mon Oct 10 15:38:01 2005 +0100
     1.3 @@ -45,8 +45,14 @@
     1.4  #include <asm-xen/xen_proc.h>
     1.5  #include <asm/hypervisor.h>
     1.6  
     1.7 +struct xenbus_dev_transaction {
     1.8 +	struct list_head list;
     1.9 +	struct xenbus_transaction *handle;
    1.10 +};
    1.11 +
    1.12  struct xenbus_dev_data {
    1.13 -	int in_transaction;
    1.14 +	/* In-progress transaction. */
    1.15 +	struct list_head transactions;
    1.16  
    1.17  	/* Partial request. */
    1.18  	unsigned int len;
    1.19 @@ -103,6 +109,7 @@ static ssize_t xenbus_dev_write(struct f
    1.20  				size_t len, loff_t *ppos)
    1.21  {
    1.22  	struct xenbus_dev_data *u = filp->private_data;
    1.23 +	struct xenbus_dev_transaction *trans;
    1.24  	void *reply;
    1.25  	int err = 0;
    1.26  
    1.27 @@ -129,13 +136,24 @@ static ssize_t xenbus_dev_write(struct f
    1.28  	case XS_RM:
    1.29  	case XS_SET_PERMS:
    1.30  		reply = xenbus_dev_request_and_reply(&u->u.msg);
    1.31 -		if (IS_ERR(reply))
    1.32 +		if (IS_ERR(reply)) {
    1.33  			err = PTR_ERR(reply);
    1.34 -		else {
    1.35 -			if (u->u.msg.type == XS_TRANSACTION_START)
    1.36 -				u->in_transaction = 1;
    1.37 -			if (u->u.msg.type == XS_TRANSACTION_END)
    1.38 -				u->in_transaction = 0;
    1.39 +		} else {
    1.40 +			if (u->u.msg.type == XS_TRANSACTION_START) {
    1.41 +				trans = kmalloc(sizeof(*trans), GFP_KERNEL);
    1.42 +				trans->handle = (struct xenbus_transaction *)
    1.43 +					simple_strtoul(reply, NULL, 0);
    1.44 +				list_add(&trans->list, &u->transactions);
    1.45 +			} else if (u->u.msg.type == XS_TRANSACTION_END) {
    1.46 +				list_for_each_entry(trans, &u->transactions,
    1.47 +						    list)
    1.48 +					if ((unsigned long)trans->handle ==
    1.49 +					    (unsigned long)u->u.msg.tx_id)
    1.50 +						break;
    1.51 +				BUG_ON(&trans->list == &u->transactions);
    1.52 +				list_del(&trans->list);
    1.53 +				kfree(trans);
    1.54 +			}
    1.55  			queue_reply(u, (char *)&u->u.msg, sizeof(u->u.msg));
    1.56  			queue_reply(u, (char *)reply, u->u.msg.len);
    1.57  			kfree(reply);
    1.58 @@ -169,6 +187,7 @@ static int xenbus_dev_open(struct inode 
    1.59  		return -ENOMEM;
    1.60  
    1.61  	memset(u, 0, sizeof(*u));
    1.62 +	INIT_LIST_HEAD(&u->transactions);
    1.63  	init_waitqueue_head(&u->read_waitq);
    1.64  
    1.65  	filp->private_data = u;
    1.66 @@ -179,9 +198,13 @@ static int xenbus_dev_open(struct inode 
    1.67  static int xenbus_dev_release(struct inode *inode, struct file *filp)
    1.68  {
    1.69  	struct xenbus_dev_data *u = filp->private_data;
    1.70 +	struct xenbus_dev_transaction *trans, *tmp;
    1.71  
    1.72 -	if (u->in_transaction)
    1.73 -		xenbus_transaction_end((struct xenbus_transaction *)1, 1);
    1.74 +	list_for_each_entry_safe(trans, tmp, &u->transactions, list) {
    1.75 +		xenbus_transaction_end(trans->handle, 1);
    1.76 +		list_del(&trans->list);
    1.77 +		kfree(trans);
    1.78 +	}
    1.79  
    1.80  	kfree(u);
    1.81  
     2.1 --- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c	Mon Oct 10 14:46:53 2005 +0100
     2.2 +++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c	Mon Oct 10 15:38:01 2005 +0100
     2.3 @@ -71,9 +71,8 @@ struct xs_handle {
     2.4  	/* One request at a time. */
     2.5  	struct semaphore request_mutex;
     2.6  
     2.7 -	/* One transaction at a time. */
     2.8 -	struct semaphore transaction_mutex;
     2.9 -	int transaction_pid;
    2.10 +	/* Protect transactions against save/restore. */
    2.11 +	struct rw_semaphore suspend_mutex;
    2.12  };
    2.13  
    2.14  static struct xs_handle xs_state;
    2.15 @@ -81,29 +80,6 @@ static struct xs_handle xs_state;
    2.16  static LIST_HEAD(watches);
    2.17  static DEFINE_SPINLOCK(watches_lock);
    2.18  
    2.19 -/* Can wait on !xs_resuming for suspend/resume cycle to complete. */
    2.20 -static int xs_resuming;
    2.21 -static DECLARE_WAIT_QUEUE_HEAD(xs_resuming_waitq);
    2.22 -
    2.23 -static void request_mutex_acquire(void)
    2.24 -{
    2.25 -	/*
    2.26 -	 * We can't distinguish non-transactional from transactional
    2.27 -	 * requests right now. So temporarily acquire the transaction mutex
    2.28 -	 * if this task is outside transaction context.
    2.29 - 	 */
    2.30 -	if (xs_state.transaction_pid != current->pid)
    2.31 -		down(&xs_state.transaction_mutex);
    2.32 -	down(&xs_state.request_mutex);
    2.33 -}
    2.34 -
    2.35 -static void request_mutex_release(void)
    2.36 -{
    2.37 -	up(&xs_state.request_mutex);
    2.38 -	if (xs_state.transaction_pid != current->pid)
    2.39 -		up(&xs_state.transaction_mutex);
    2.40 -}
    2.41 -
    2.42  static int get_error(const char *errorstring)
    2.43  {
    2.44  	unsigned int i;
    2.45 @@ -152,17 +128,17 @@ static void *read_reply(enum xsd_sockmsg
    2.46  /* Emergency write. */
    2.47  void xenbus_debug_write(const char *str, unsigned int count)
    2.48  {
    2.49 -	struct xsd_sockmsg msg;
    2.50 +	struct xsd_sockmsg msg = { 0 };
    2.51  
    2.52  	msg.type = XS_DEBUG;
    2.53  	msg.len = sizeof("print") + count + 1;
    2.54  
    2.55 -	request_mutex_acquire();
    2.56 +	down(&xs_state.request_mutex);
    2.57  	xb_write(&msg, sizeof(msg));
    2.58  	xb_write("print", sizeof("print"));
    2.59  	xb_write(str, count);
    2.60  	xb_write("", 1);
    2.61 -	request_mutex_release();
    2.62 +	up(&xs_state.request_mutex);
    2.63  }
    2.64  
    2.65  void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg)
    2.66 @@ -171,12 +147,10 @@ void *xenbus_dev_request_and_reply(struc
    2.67  	struct xsd_sockmsg req_msg = *msg;
    2.68  	int err;
    2.69  
    2.70 -	if (req_msg.type == XS_TRANSACTION_START) {
    2.71 -		down(&xs_state.transaction_mutex);
    2.72 -		xs_state.transaction_pid = current->pid;
    2.73 -	}
    2.74 +	if (req_msg.type == XS_TRANSACTION_START)
    2.75 +		down_read(&xs_state.suspend_mutex);
    2.76  
    2.77 -	request_mutex_acquire();
    2.78 +	down(&xs_state.request_mutex);
    2.79  
    2.80  	err = xb_write(msg, sizeof(*msg) + msg->len);
    2.81  	if (err) {
    2.82 @@ -186,20 +160,19 @@ void *xenbus_dev_request_and_reply(struc
    2.83  		ret = read_reply(&msg->type, &msg->len);
    2.84  	}
    2.85  
    2.86 -	request_mutex_release();
    2.87 +	up(&xs_state.request_mutex);
    2.88  
    2.89  	if ((msg->type == XS_TRANSACTION_END) ||
    2.90  	    ((req_msg.type == XS_TRANSACTION_START) &&
    2.91 -	     (msg->type == XS_ERROR))) {
    2.92 -		xs_state.transaction_pid = -1;
    2.93 -		up(&xs_state.transaction_mutex);
    2.94 -	}
    2.95 +	     (msg->type == XS_ERROR)))
    2.96 +		up_read(&xs_state.suspend_mutex);
    2.97  
    2.98  	return ret;
    2.99  }
   2.100  
   2.101  /* Send message to xs, get kmalloc'ed reply.  ERR_PTR() on error. */
   2.102 -static void *xs_talkv(enum xsd_sockmsg_type type,
   2.103 +static void *xs_talkv(struct xenbus_transaction *t,
   2.104 +		      enum xsd_sockmsg_type type,
   2.105  		      const struct kvec *iovec,
   2.106  		      unsigned int num_vecs,
   2.107  		      unsigned int *len)
   2.108 @@ -209,12 +182,13 @@ static void *xs_talkv(enum xsd_sockmsg_t
   2.109  	unsigned int i;
   2.110  	int err;
   2.111  
   2.112 +	msg.tx_id = (u32)(unsigned long)t;
   2.113  	msg.type = type;
   2.114  	msg.len = 0;
   2.115  	for (i = 0; i < num_vecs; i++)
   2.116  		msg.len += iovec[i].iov_len;
   2.117  
   2.118 -	request_mutex_acquire();
   2.119 +	down(&xs_state.request_mutex);
   2.120  
   2.121  	err = xb_write(&msg, sizeof(msg));
   2.122  	if (err) {
   2.123 @@ -225,14 +199,14 @@ static void *xs_talkv(enum xsd_sockmsg_t
   2.124  	for (i = 0; i < num_vecs; i++) {
   2.125  		err = xb_write(iovec[i].iov_base, iovec[i].iov_len);;
   2.126  		if (err) {
   2.127 -			request_mutex_release();
   2.128 +			up(&xs_state.request_mutex);
   2.129  			return ERR_PTR(err);
   2.130  		}
   2.131  	}
   2.132  
   2.133  	ret = read_reply(&msg.type, len);
   2.134  
   2.135 -	request_mutex_release();
   2.136 +	up(&xs_state.request_mutex);
   2.137  
   2.138  	if (IS_ERR(ret))
   2.139  		return ret;
   2.140 @@ -248,14 +222,16 @@ static void *xs_talkv(enum xsd_sockmsg_t
   2.141  }
   2.142  
   2.143  /* Simplified version of xs_talkv: single message. */
   2.144 -static void *xs_single(enum xsd_sockmsg_type type,
   2.145 -		       const char *string, unsigned int *len)
   2.146 +static void *xs_single(struct xenbus_transaction *t,
   2.147 +		       enum xsd_sockmsg_type type,
   2.148 +		       const char *string,
   2.149 +		       unsigned int *len)
   2.150  {
   2.151  	struct kvec iovec;
   2.152  
   2.153  	iovec.iov_base = (void *)string;
   2.154  	iovec.iov_len = strlen(string) + 1;
   2.155 -	return xs_talkv(type, &iovec, 1, len);
   2.156 +	return xs_talkv(t, type, &iovec, 1, len);
   2.157  }
   2.158  
   2.159  /* Many commands only need an ack, don't care what it says. */
   2.160 @@ -322,7 +298,7 @@ char **xenbus_directory(struct xenbus_tr
   2.161  	char *strings;
   2.162  	unsigned int len;
   2.163  
   2.164 -	strings = xs_single(XS_DIRECTORY, join(dir, node), &len);
   2.165 +	strings = xs_single(t, XS_DIRECTORY, join(dir, node), &len);
   2.166  	if (IS_ERR(strings))
   2.167  		return (char **)strings;
   2.168  
   2.169 @@ -352,7 +328,7 @@ EXPORT_SYMBOL(xenbus_exists);
   2.170  void *xenbus_read(struct xenbus_transaction *t,
   2.171  		  const char *dir, const char *node, unsigned int *len)
   2.172  {
   2.173 -	return xs_single(XS_READ, join(dir, node), len);
   2.174 +	return xs_single(t, XS_READ, join(dir, node), len);
   2.175  }
   2.176  EXPORT_SYMBOL(xenbus_read);
   2.177  
   2.178 @@ -372,7 +348,7 @@ int xenbus_write(struct xenbus_transacti
   2.179  	iovec[1].iov_base = (void *)string;
   2.180  	iovec[1].iov_len = strlen(string);
   2.181  
   2.182 -	return xs_error(xs_talkv(XS_WRITE, iovec, ARRAY_SIZE(iovec), NULL));
   2.183 +	return xs_error(xs_talkv(t, XS_WRITE, iovec, ARRAY_SIZE(iovec), NULL));
   2.184  }
   2.185  EXPORT_SYMBOL(xenbus_write);
   2.186  
   2.187 @@ -380,14 +356,14 @@ EXPORT_SYMBOL(xenbus_write);
   2.188  int xenbus_mkdir(struct xenbus_transaction *t,
   2.189  		 const char *dir, const char *node)
   2.190  {
   2.191 -	return xs_error(xs_single(XS_MKDIR, join(dir, node), NULL));
   2.192 +	return xs_error(xs_single(t, XS_MKDIR, join(dir, node), NULL));
   2.193  }
   2.194  EXPORT_SYMBOL(xenbus_mkdir);
   2.195  
   2.196  /* Destroy a file or directory (directories must be empty). */
   2.197  int xenbus_rm(struct xenbus_transaction *t, const char *dir, const char *node)
   2.198  {
   2.199 -	return xs_error(xs_single(XS_RM, join(dir, node), NULL));
   2.200 +	return xs_error(xs_single(t, XS_RM, join(dir, node), NULL));
   2.201  }
   2.202  EXPORT_SYMBOL(xenbus_rm);
   2.203  
   2.204 @@ -396,18 +372,21 @@ EXPORT_SYMBOL(xenbus_rm);
   2.205   */
   2.206  struct xenbus_transaction *xenbus_transaction_start(void)
   2.207  {
   2.208 -	int err;
   2.209 +	char *id_str;
   2.210 +	unsigned long id;
   2.211  
   2.212 -	down(&xs_state.transaction_mutex);
   2.213 -	xs_state.transaction_pid = current->pid;
   2.214 +	down_read(&xs_state.suspend_mutex);
   2.215  
   2.216 -	err = xs_error(xs_single(XS_TRANSACTION_START, "", NULL));
   2.217 -	if (err) {
   2.218 -		xs_state.transaction_pid = -1;
   2.219 -		up(&xs_state.transaction_mutex);
   2.220 +	id_str = xs_single(NULL, XS_TRANSACTION_START, "", NULL);
   2.221 +	if (IS_ERR(id_str)) {
   2.222 +		up_read(&xs_state.suspend_mutex);
   2.223 +		return (struct xenbus_transaction *)id_str;
   2.224  	}
   2.225  
   2.226 -	return err ? ERR_PTR(err) : (struct xenbus_transaction *)1;
   2.227 +	id = simple_strtoul(id_str, NULL, 0);
   2.228 +	kfree(id_str);
   2.229 +
   2.230 +	return (struct xenbus_transaction *)id;
   2.231  }
   2.232  EXPORT_SYMBOL(xenbus_transaction_start);
   2.233  
   2.234 @@ -419,17 +398,14 @@ int xenbus_transaction_end(struct xenbus
   2.235  	char abortstr[2];
   2.236  	int err;
   2.237  
   2.238 -	BUG_ON(t == NULL);
   2.239 -
   2.240  	if (abort)
   2.241  		strcpy(abortstr, "F");
   2.242  	else
   2.243  		strcpy(abortstr, "T");
   2.244  
   2.245 -	err = xs_error(xs_single(XS_TRANSACTION_END, abortstr, NULL));
   2.246 +	err = xs_error(xs_single(t, XS_TRANSACTION_END, abortstr, NULL));
   2.247  
   2.248 -	xs_state.transaction_pid = -1;
   2.249 -	up(&xs_state.transaction_mutex);
   2.250 +	up_read(&xs_state.suspend_mutex);
   2.251  
   2.252  	return err;
   2.253  }
   2.254 @@ -567,7 +543,8 @@ static int xs_watch(const char *path, co
   2.255  	iov[1].iov_base = (void *)token;
   2.256  	iov[1].iov_len = strlen(token) + 1;
   2.257  
   2.258 -	return xs_error(xs_talkv(XS_WATCH, iov, ARRAY_SIZE(iov), NULL));
   2.259 +	return xs_error(xs_talkv(NULL, XS_WATCH, iov,
   2.260 +				 ARRAY_SIZE(iov), NULL));
   2.261  }
   2.262  
   2.263  static int xs_unwatch(const char *path, const char *token)
   2.264 @@ -579,7 +556,8 @@ static int xs_unwatch(const char *path, 
   2.265  	iov[1].iov_base = (char *)token;
   2.266  	iov[1].iov_len = strlen(token) + 1;
   2.267  
   2.268 -	return xs_error(xs_talkv(XS_UNWATCH, iov, ARRAY_SIZE(iov), NULL));
   2.269 +	return xs_error(xs_talkv(NULL, XS_UNWATCH, iov,
   2.270 +				 ARRAY_SIZE(iov), NULL));
   2.271  }
   2.272  
   2.273  static struct xenbus_watch *find_watch(const char *token)
   2.274 @@ -604,6 +582,8 @@ int register_xenbus_watch(struct xenbus_
   2.275  
   2.276  	sprintf(token, "%lX", (long)watch);
   2.277  
   2.278 +	down_read(&xs_state.suspend_mutex);
   2.279 +
   2.280  	spin_lock(&watches_lock);
   2.281  	BUG_ON(find_watch(token));
   2.282  	spin_unlock(&watches_lock);
   2.283 @@ -617,6 +597,8 @@ int register_xenbus_watch(struct xenbus_
   2.284  		spin_unlock(&watches_lock);
   2.285  	}
   2.286  
   2.287 +	up_read(&xs_state.suspend_mutex);
   2.288 +
   2.289  	return err;
   2.290  }
   2.291  EXPORT_SYMBOL(register_xenbus_watch);
   2.292 @@ -628,20 +610,21 @@ void unregister_xenbus_watch(struct xenb
   2.293  
   2.294  	sprintf(token, "%lX", (long)watch);
   2.295  
   2.296 +	down_read(&xs_state.suspend_mutex);
   2.297 +
   2.298  	spin_lock(&watches_lock);
   2.299  	BUG_ON(!find_watch(token));
   2.300  	list_del(&watch->list);
   2.301  	spin_unlock(&watches_lock);
   2.302  
   2.303 -	/* Ensure xs_resume() is not in progress (see comments there). */
   2.304 -	wait_event(xs_resuming_waitq, !xs_resuming);
   2.305 -
   2.306  	err = xs_unwatch(watch->node, token);
   2.307  	if (err)
   2.308  		printk(KERN_WARNING
   2.309  		       "XENBUS Failed to release watch %s: %i\n",
   2.310  		       watch->node, err);
   2.311  
   2.312 +	up_read(&xs_state.suspend_mutex);
   2.313 +
   2.314  	/* Make sure watch is not in use. */
   2.315  	flush_scheduled_work();
   2.316  }
   2.317 @@ -649,58 +632,24 @@ EXPORT_SYMBOL(unregister_xenbus_watch);
   2.318  
   2.319  void xs_suspend(void)
   2.320  {
   2.321 -	down(&xs_state.transaction_mutex);
   2.322 +	down_write(&xs_state.suspend_mutex);
   2.323  	down(&xs_state.request_mutex);
   2.324  }
   2.325  
   2.326  void xs_resume(void)
   2.327  {
   2.328 -	struct list_head *ent, *prev_ent = &watches;
   2.329  	struct xenbus_watch *watch;
   2.330  	char token[sizeof(watch) * 2 + 1];
   2.331  
   2.332 -	/* Protect against concurrent unregistration and freeing of watches. */
   2.333 -	BUG_ON(xs_resuming);
   2.334 -	xs_resuming = 1;
   2.335 -
   2.336  	up(&xs_state.request_mutex);
   2.337 -	up(&xs_state.transaction_mutex);
   2.338  
   2.339 -	/*
   2.340 -	 * Iterate over the watch list re-registering each node. We must
   2.341 -	 * be careful about concurrent registrations and unregistrations.
   2.342 -	 * We search for the node immediately following the previously
   2.343 -	 * re-registered node. If we get no match then either we are done
   2.344 -	 * (previous node is last in list) or the node was unregistered, in
   2.345 -	 * which case we restart from the beginning of the list.
   2.346 -	 * register_xenbus_watch() + unregister_xenbus_watch() is safe because
   2.347 -	 * it will only ever move a watch node earlier in the list, so it
   2.348 -	 * cannot cause us to skip nodes.
   2.349 -	 */
   2.350 -	for (;;) {
   2.351 -		spin_lock(&watches_lock);
   2.352 -		list_for_each(ent, &watches)
   2.353 -			if (ent->prev == prev_ent)
   2.354 -				break;
   2.355 -		spin_unlock(&watches_lock);
   2.356 -
   2.357 -		/* No match because prev_ent is at the end of the list? */
   2.358 -		if ((ent == &watches) && (watches.prev == prev_ent))
   2.359 -			 break; /* We're done! */
   2.360 -
   2.361 -		if ((prev_ent = ent) != &watches) {
   2.362 -			/*
   2.363 -			 * Safe even with watch_lock not held. We are saved by
   2.364 -			 * (xs_resumed==1) check in unregister_xenbus_watch.
   2.365 -			 */
   2.366 -			watch = list_entry(ent, struct xenbus_watch, list);
   2.367 -			sprintf(token, "%lX", (long)watch);
   2.368 -			xs_watch(watch->node, token);
   2.369 -		}
   2.370 +	/* No need for watches_lock: the suspend_mutex is sufficient. */
   2.371 +	list_for_each_entry(watch, &watches, list) {
   2.372 +		sprintf(token, "%lX", (long)watch);
   2.373 +		xs_watch(watch->node, token);
   2.374  	}
   2.375  
   2.376 -	xs_resuming = 0;
   2.377 -	wake_up(&xs_resuming_waitq);
   2.378 +	up_write(&xs_state.suspend_mutex);
   2.379  }
   2.380  
   2.381  static void xenbus_fire_watch(void *arg)
   2.382 @@ -801,8 +750,7 @@ int xs_init(void)
   2.383  	init_waitqueue_head(&xs_state.reply_waitq);
   2.384  
   2.385  	init_MUTEX(&xs_state.request_mutex);
   2.386 -	init_MUTEX(&xs_state.transaction_mutex);
   2.387 -	xs_state.transaction_pid = -1;
   2.388 +	init_rwsem(&xs_state.suspend_mutex);
   2.389  
   2.390  	/* Initialize the shared memory rings to talk to xenstored */
   2.391  	err = xb_init_comms();
     3.1 --- a/tools/xenstore/utils.h	Mon Oct 10 14:46:53 2005 +0100
     3.2 +++ b/tools/xenstore/utils.h	Mon Oct 10 15:38:01 2005 +0100
     3.3 @@ -55,4 +55,34 @@ void xprintf(const char *fmt, ...);
     3.4  #define dprintf(_fmt, _args...) ((void)0)
     3.5  #endif
     3.6  
     3.7 +/*
     3.8 + * Mux errno values onto returned pointers.
     3.9 + */
    3.10 +
    3.11 +static inline void *ERR_PTR(long error)
    3.12 +{
    3.13 +	return (void *)error;
    3.14 +}
    3.15 +
    3.16 +static inline long PTR_ERR(const void *ptr)
    3.17 +{
    3.18 +	return (long)ptr;
    3.19 +}
    3.20 +
    3.21 +static inline long IS_ERR(const void *ptr)
    3.22 +{
    3.23 +	return ((unsigned long)ptr > (unsigned long)-1000L);
    3.24 +}
    3.25 +
    3.26 +
    3.27  #endif /* _UTILS_H */
    3.28 +
    3.29 +/*
    3.30 + * Local variables:
    3.31 + *  c-file-style: "linux"
    3.32 + *  indent-tabs-mode: t
    3.33 + *  c-indent-level: 8
    3.34 + *  c-basic-offset: 8
    3.35 + *  tab-width: 8
    3.36 + * End:
    3.37 + */
     4.1 --- a/tools/xenstore/xenstore_client.c	Mon Oct 10 14:46:53 2005 +0100
     4.2 +++ b/tools/xenstore/xenstore_client.c	Mon Oct 10 15:38:01 2005 +0100
     4.3 @@ -34,15 +34,11 @@ main(int argc, char **argv)
     4.4      struct xs_handle *xsh;
     4.5      struct xs_transaction_handle *xth;
     4.6      bool success;
     4.7 -    int ret = 0;
     4.8 +    int ret = 0, socket = 0;
     4.9  #if defined(CLIENT_read) || defined(CLIENT_list)
    4.10      int prefix = 0;
    4.11  #endif
    4.12  
    4.13 -    xsh = xs_domain_open();
    4.14 -    if (xsh == NULL)
    4.15 -	err(1, "xs_domain_open");
    4.16 -
    4.17      while (1) {
    4.18  	int c, index = 0;
    4.19  	static struct option long_options[] = {
    4.20 @@ -50,10 +46,11 @@ main(int argc, char **argv)
    4.21  #if defined(CLIENT_read) || defined(CLIENT_list)
    4.22  	    {"prefix", 0, 0, 'p'},
    4.23  #endif
    4.24 +            {"socket", 0, 0, 's'},
    4.25  	    {0, 0, 0, 0}
    4.26  	};
    4.27  
    4.28 -	c = getopt_long(argc, argv, "h"
    4.29 +	c = getopt_long(argc, argv, "hs"
    4.30  #if defined(CLIENT_read) || defined(CLIENT_list)
    4.31  			"p"
    4.32  #endif
    4.33 @@ -65,6 +62,9 @@ main(int argc, char **argv)
    4.34  	case 'h':
    4.35  	    usage(argv[0]);
    4.36  	    /* NOTREACHED */
    4.37 +        case 's':
    4.38 +            socket = 1;
    4.39 +            break;
    4.40  #if defined(CLIENT_read) || defined(CLIENT_list)
    4.41  	case 'p':
    4.42  	    prefix = 1;
    4.43 @@ -84,6 +84,10 @@ main(int argc, char **argv)
    4.44      }
    4.45  #endif
    4.46  
    4.47 +    xsh = socket ? xs_daemon_open() : xs_domain_open();
    4.48 +    if (xsh == NULL)
    4.49 +	err(1, socket ? "xs_daemon_open" : "xs_domain_open");
    4.50 +
    4.51    again:
    4.52      xth = xs_transaction_start(xsh);
    4.53      if (xth == NULL)
     5.1 --- a/tools/xenstore/xenstored_core.c	Mon Oct 10 14:46:53 2005 +0100
     5.2 +++ b/tools/xenstore/xenstored_core.c	Mon Oct 10 15:38:01 2005 +0100
     5.3 @@ -238,46 +238,47 @@ void trace(const char *fmt, ...)
     5.4  static bool write_messages(struct connection *conn)
     5.5  {
     5.6  	int ret;
     5.7 -	struct buffered_data *out, *tmp;
     5.8 +	struct buffered_data *out;
     5.9  
    5.10 -	list_for_each_entry_safe(out, tmp, &conn->out_list, list) {
    5.11 -		if (out->inhdr) {
    5.12 -			if (verbose)
    5.13 -				xprintf("Writing msg %s (%s) out to %p\n",
    5.14 -					sockmsg_string(out->hdr.msg.type),
    5.15 -					out->buffer, conn);
    5.16 -			ret = conn->write(conn, out->hdr.raw + out->used,
    5.17 -					  sizeof(out->hdr) - out->used);
    5.18 -			if (ret < 0)
    5.19 -				return false;
    5.20 +	out = list_top(&conn->out_list, struct buffered_data, list);
    5.21 +	if (out == NULL)
    5.22 +		return true;
    5.23  
    5.24 -			out->used += ret;
    5.25 -			if (out->used < sizeof(out->hdr))
    5.26 -				return true;
    5.27 -
    5.28 -			out->inhdr = false;
    5.29 -			out->used = 0;
    5.30 -
    5.31 -			/* Second write might block if non-zero. */
    5.32 -			if (out->hdr.msg.len && !conn->domain)
    5.33 -				return true;
    5.34 -		}
    5.35 -
    5.36 -		ret = conn->write(conn, out->buffer + out->used,
    5.37 -				  out->hdr.msg.len - out->used);
    5.38 -
    5.39 +	if (out->inhdr) {
    5.40 +		if (verbose)
    5.41 +			xprintf("Writing msg %s (%s) out to %p\n",
    5.42 +				sockmsg_string(out->hdr.msg.type),
    5.43 +				out->buffer, conn);
    5.44 +		ret = conn->write(conn, out->hdr.raw + out->used,
    5.45 +				  sizeof(out->hdr) - out->used);
    5.46  		if (ret < 0)
    5.47  			return false;
    5.48  
    5.49  		out->used += ret;
    5.50 -		if (out->used != out->hdr.msg.len)
    5.51 +		if (out->used < sizeof(out->hdr))
    5.52  			return true;
    5.53  
    5.54 -		trace_io(conn, "OUT", out);
    5.55 +		out->inhdr = false;
    5.56 +		out->used = 0;
    5.57 +
    5.58 +		/* Second write might block if non-zero. */
    5.59 +		if (out->hdr.msg.len && !conn->domain)
    5.60 +			return true;
    5.61 +	}
    5.62  
    5.63 -		list_del(&out->list);
    5.64 -		talloc_free(out);
    5.65 -	}
    5.66 +	ret = conn->write(conn, out->buffer + out->used,
    5.67 +			  out->hdr.msg.len - out->used);
    5.68 +	if (ret < 0)
    5.69 +		return false;
    5.70 +
    5.71 +	out->used += ret;
    5.72 +	if (out->used != out->hdr.msg.len)
    5.73 +		return true;
    5.74 +
    5.75 +	trace_io(conn, "OUT", out);
    5.76 +
    5.77 +	list_del(&out->list);
    5.78 +	talloc_free(out);
    5.79  
    5.80  	return true;
    5.81  }
    5.82 @@ -1042,6 +1043,17 @@ static void do_set_perms(struct connecti
    5.83   */
    5.84  static void process_message(struct connection *conn, struct buffered_data *in)
    5.85  {
    5.86 +	struct transaction *trans;
    5.87 +
    5.88 +	trans = transaction_lookup(conn, in->hdr.msg.tx_id);
    5.89 +	if (IS_ERR(trans)) {
    5.90 +		send_error(conn, -PTR_ERR(trans));
    5.91 +		return;
    5.92 +	}
    5.93 +
    5.94 +	assert(conn->transaction == NULL);
    5.95 +	conn->transaction = trans;
    5.96 +
    5.97  	switch (in->hdr.msg.type) {
    5.98  	case XS_DIRECTORY:
    5.99  		send_directory(conn, onearg(in));
   5.100 @@ -1116,11 +1128,13 @@ static void process_message(struct conne
   5.101  		do_get_domain_path(conn, onearg(in));
   5.102  		break;
   5.103  
   5.104 -	case XS_WATCH_EVENT:
   5.105  	default:
   5.106  		eprintf("Client unknown operation %i", in->hdr.msg.type);
   5.107  		send_error(conn, ENOSYS);
   5.108 +		break;
   5.109  	}
   5.110 +
   5.111 +	conn->transaction = NULL;
   5.112  }
   5.113  
   5.114  static int out_of_mem(void *data)
   5.115 @@ -1239,15 +1253,14 @@ struct connection *new_connection(connwr
   5.116  	if (!new)
   5.117  		return NULL;
   5.118  
   5.119 +	memset(new, 0, sizeof(*new));
   5.120  	new->fd = -1;
   5.121 -	new->id = 0;
   5.122 -	new->domain = NULL;
   5.123 -	new->transaction = NULL;
   5.124  	new->write = write;
   5.125  	new->read = read;
   5.126  	new->can_write = true;
   5.127  	INIT_LIST_HEAD(&new->out_list);
   5.128  	INIT_LIST_HEAD(&new->watches);
   5.129 +	INIT_LIST_HEAD(&new->transaction_list);
   5.130  
   5.131  	talloc_set_fail_handler(out_of_mem, &talloc_fail);
   5.132  	if (setjmp(talloc_fail)) {
   5.133 @@ -1410,6 +1423,7 @@ static void daemonize(void)
   5.134  
   5.135  
   5.136  static struct option options[] = {
   5.137 +	{ "no-domain-init", 0, NULL, 'D' },
   5.138  	{ "pid-file", 1, NULL, 'F' },
   5.139  	{ "no-fork", 0, NULL, 'N' },
   5.140  	{ "output-pid", 0, NULL, 'P' },
   5.141 @@ -1424,11 +1438,15 @@ int main(int argc, char *argv[])
   5.142  	fd_set inset, outset;
   5.143  	bool dofork = true;
   5.144  	bool outputpid = false;
   5.145 +	bool no_domain_init = false;
   5.146  	const char *pidfile = NULL;
   5.147  
   5.148 -	while ((opt = getopt_long(argc, argv, "F:NPT:V", options,
   5.149 +	while ((opt = getopt_long(argc, argv, "DF:NPT:V", options,
   5.150  				  NULL)) != -1) {
   5.151  		switch (opt) {
   5.152 +		case 'D':
   5.153 +			no_domain_init = true;
   5.154 +			break;
   5.155  		case 'F':
   5.156  			pidfile = optarg;
   5.157  			break;
   5.158 @@ -1501,7 +1519,8 @@ int main(int argc, char *argv[])
   5.159  	setup_structure();
   5.160  
   5.161  	/* Listen to hypervisor. */
   5.162 -	event_fd = domain_init();
   5.163 +	if (!no_domain_init)
   5.164 +		event_fd = domain_init();
   5.165  
   5.166  	/* Restore existing connections. */
   5.167  	restore_existing_connections();
     6.1 --- a/tools/xenstore/xenstored_core.h	Mon Oct 10 14:46:53 2005 +0100
     6.2 +++ b/tools/xenstore/xenstored_core.h	Mon Oct 10 15:38:01 2005 +0100
     6.3 @@ -71,9 +71,13 @@ struct connection
     6.4  	/* Buffered output data */
     6.5  	struct list_head out_list;
     6.6  
     6.7 -	/* My transaction, if any. */
     6.8 +	/* Transaction context for current request (NULL if none). */
     6.9  	struct transaction *transaction;
    6.10  
    6.11 +	/* List of in-progress transactions. */
    6.12 +	struct list_head transaction_list;
    6.13 +	u32 next_transaction_id;
    6.14 +
    6.15  	/* The domain I'm associated with, if any. */
    6.16  	struct domain *domain;
    6.17  
     7.1 --- a/tools/xenstore/xenstored_transaction.c	Mon Oct 10 14:46:53 2005 +0100
     7.2 +++ b/tools/xenstore/xenstored_transaction.c	Mon Oct 10 15:38:01 2005 +0100
     7.3 @@ -37,7 +37,7 @@
     7.4  
     7.5  struct changed_node
     7.6  {
     7.7 -	/* The list within this transaction. */
     7.8 +	/* List of all changed nodes in the context of this transaction. */
     7.9  	struct list_head list;
    7.10  
    7.11  	/* The name of the node. */
    7.12 @@ -49,15 +49,15 @@ struct changed_node
    7.13  
    7.14  struct transaction
    7.15  {
    7.16 -	/* Global list of transactions. */
    7.17 +	/* List of all transactions active on this connection. */
    7.18  	struct list_head list;
    7.19  
    7.20 +	/* Connection-local identifier for this transaction. */
    7.21 +	u32 id;
    7.22 +
    7.23  	/* Generation when transaction started. */
    7.24  	unsigned int generation;
    7.25  
    7.26 -	/* My owner (conn->transaction == me). */
    7.27 -	struct connection *conn;
    7.28 -
    7.29  	/* TDB to work on, and filename */
    7.30  	TDB_CONTEXT *tdb;
    7.31  	char *tdb_name;
    7.32 @@ -65,7 +65,7 @@ struct transaction
    7.33  	/* List of changed nodes. */
    7.34  	struct list_head changes;
    7.35  };
    7.36 -static LIST_HEAD(transactions);
    7.37 +
    7.38  static unsigned int generation;
    7.39  
    7.40  /* Return tdb context to use for this connection. */
    7.41 @@ -100,7 +100,6 @@ static int destroy_transaction(void *_tr
    7.42  {
    7.43  	struct transaction *trans = _transaction;
    7.44  
    7.45 -	list_del(&trans->list);
    7.46  	trace_destroy(trans, "transaction");
    7.47  	if (trans->tdb)
    7.48  		tdb_close(trans->tdb);
    7.49 @@ -108,10 +107,26 @@ static int destroy_transaction(void *_tr
    7.50  	return 0;
    7.51  }
    7.52  
    7.53 -void do_transaction_start(struct connection *conn, struct buffered_data *in)
    7.54 +struct transaction *transaction_lookup(struct connection *conn, u32 id)
    7.55  {
    7.56  	struct transaction *trans;
    7.57  
    7.58 +	if (id == 0)
    7.59 +		return NULL;
    7.60 +
    7.61 +	list_for_each_entry(trans, &conn->transaction_list, list)
    7.62 +		if (trans->id == id)
    7.63 +			return trans;
    7.64 +
    7.65 +	return ERR_PTR(-ENOENT);
    7.66 +}
    7.67 +
    7.68 +void do_transaction_start(struct connection *conn, struct buffered_data *in)
    7.69 +{
    7.70 +	struct transaction *trans, *exists;
    7.71 +	char id_str[20];
    7.72 +
    7.73 +	/* We don't support nested transactions. */
    7.74  	if (conn->transaction) {
    7.75  		send_error(conn, EBUSY);
    7.76  		return;
    7.77 @@ -120,7 +135,6 @@ void do_transaction_start(struct connect
    7.78  	/* Attach transaction to input for autofree until it's complete */
    7.79  	trans = talloc(in, struct transaction);
    7.80  	INIT_LIST_HEAD(&trans->changes);
    7.81 -	trans->conn = conn;
    7.82  	trans->generation = generation;
    7.83  	trans->tdb_name = talloc_asprintf(trans, "%s.%p",
    7.84  					  xs_daemon_tdb(), trans);
    7.85 @@ -132,11 +146,19 @@ void do_transaction_start(struct connect
    7.86  	/* Make it close if we go away. */
    7.87  	talloc_steal(trans, trans->tdb);
    7.88  
    7.89 +	/* Pick an unused transaction identifier. */
    7.90 +	do {
    7.91 +		trans->id = conn->next_transaction_id;
    7.92 +		exists = transaction_lookup(conn, conn->next_transaction_id++);
    7.93 +	} while (!IS_ERR(exists));
    7.94 +
    7.95  	/* Now we own it. */
    7.96 -	conn->transaction = talloc_steal(conn, trans);
    7.97 -	list_add_tail(&trans->list, &transactions);
    7.98 +	list_add_tail(&trans->list, &conn->transaction_list);
    7.99 +	talloc_steal(conn, trans);
   7.100  	talloc_set_destructor(trans, destroy_transaction);
   7.101 -	send_ack(conn, XS_TRANSACTION_START);
   7.102 +
   7.103 +	sprintf(id_str, "%u", trans->id);
   7.104 +	send_reply(conn, XS_TRANSACTION_START, id_str, strlen(id_str)+1);
   7.105  }
   7.106  
   7.107  void do_transaction_end(struct connection *conn, const char *arg)
   7.108 @@ -149,13 +171,13 @@ void do_transaction_end(struct connectio
   7.109  		return;
   7.110  	}
   7.111  
   7.112 -	if (!conn->transaction) {
   7.113 +	if ((trans = conn->transaction) == NULL) {
   7.114  		send_error(conn, ENOENT);
   7.115  		return;
   7.116  	}
   7.117  
   7.118 -	trans = conn->transaction;
   7.119  	conn->transaction = NULL;
   7.120 +	list_del(&trans->list);
   7.121  
   7.122  	/* Attach transaction to arg for auto-cleanup */
   7.123  	talloc_steal(arg, trans);
   7.124 @@ -181,3 +203,12 @@ void do_transaction_end(struct connectio
   7.125  	send_ack(conn, XS_TRANSACTION_END);
   7.126  }
   7.127  
   7.128 +/*
   7.129 + * Local variables:
   7.130 + *  c-file-style: "linux"
   7.131 + *  indent-tabs-mode: t
   7.132 + *  c-indent-level: 8
   7.133 + *  c-basic-offset: 8
   7.134 + *  tab-width: 8
   7.135 + * End:
   7.136 + */
     8.1 --- a/tools/xenstore/xenstored_transaction.h	Mon Oct 10 14:46:53 2005 +0100
     8.2 +++ b/tools/xenstore/xenstored_transaction.h	Mon Oct 10 15:38:01 2005 +0100
     8.3 @@ -25,10 +25,11 @@ struct transaction;
     8.4  void do_transaction_start(struct connection *conn, struct buffered_data *node);
     8.5  void do_transaction_end(struct connection *conn, const char *arg);
     8.6  
     8.7 -bool transaction_block(struct connection *conn);
     8.8 +struct transaction *transaction_lookup(struct connection *conn, u32 id);
     8.9  
    8.10  /* This node was changed: can fail and longjmp. */
    8.11 -void add_change_node(struct transaction *trans, const char *node, bool recurse);
    8.12 +void add_change_node(struct transaction *trans, const char *node,
    8.13 +                     bool recurse);
    8.14  
    8.15  /* Return tdb context to use for this connection. */
    8.16  TDB_CONTEXT *tdb_transaction_context(struct transaction *trans);
     9.1 --- a/tools/xenstore/xs.c	Mon Oct 10 14:46:53 2005 +0100
     9.2 +++ b/tools/xenstore/xs.c	Mon Oct 10 15:38:01 2005 +0100
     9.3 @@ -75,37 +75,10 @@ struct xs_handle {
     9.4  
     9.5  	/* One request at a time. */
     9.6  	pthread_mutex_t request_mutex;
     9.7 -
     9.8 -	/* One transaction at a time. */
     9.9 -	pthread_mutex_t transaction_mutex;
    9.10 -	pthread_t transaction_pthread;
    9.11 -};
    9.12 -
    9.13 -struct xs_transaction_handle {
    9.14 -	int id;
    9.15  };
    9.16  
    9.17  static void *read_thread(void *arg);
    9.18  
    9.19 -static void request_mutex_acquire(struct xs_handle *h)
    9.20 -{
    9.21 -	/*
    9.22 -	 * We can't distinguish non-transactional from transactional
    9.23 -	 * requests right now. So temporarily acquire the transaction mutex
    9.24 -	 * if this task is outside transaction context.
    9.25 - 	 */
    9.26 -	if (h->transaction_pthread != pthread_self())
    9.27 -		pthread_mutex_lock(&h->transaction_mutex);
    9.28 -	pthread_mutex_lock(&h->request_mutex);
    9.29 -}
    9.30 -
    9.31 -static void request_mutex_release(struct xs_handle *h)
    9.32 -{
    9.33 -	pthread_mutex_unlock(&h->request_mutex);
    9.34 -	if (h->transaction_pthread != pthread_self())
    9.35 -		pthread_mutex_unlock(&h->transaction_mutex);
    9.36 -}
    9.37 -
    9.38  int xs_fileno(struct xs_handle *h)
    9.39  {
    9.40  	char c = 0;
    9.41 @@ -186,8 +159,6 @@ static struct xs_handle *get_handle(cons
    9.42  	pthread_cond_init(&h->reply_condvar, NULL);
    9.43  
    9.44  	pthread_mutex_init(&h->request_mutex, NULL);
    9.45 -	pthread_mutex_init(&h->transaction_mutex, NULL);
    9.46 -	h->transaction_pthread = -1;
    9.47  
    9.48  	if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0)
    9.49  		goto error;
    9.50 @@ -223,7 +194,6 @@ void xs_daemon_close(struct xs_handle *h
    9.51  {
    9.52  	struct xs_stored_msg *msg, *tmsg;
    9.53  
    9.54 -	pthread_mutex_lock(&h->transaction_mutex);
    9.55  	pthread_mutex_lock(&h->request_mutex);
    9.56  	pthread_mutex_lock(&h->reply_mutex);
    9.57  	pthread_mutex_lock(&h->watch_mutex);
    9.58 @@ -242,7 +212,6 @@ void xs_daemon_close(struct xs_handle *h
    9.59  		free(msg);
    9.60  	}
    9.61  
    9.62 -	pthread_mutex_unlock(&h->transaction_mutex);
    9.63  	pthread_mutex_unlock(&h->request_mutex);
    9.64  	pthread_mutex_unlock(&h->reply_mutex);
    9.65  	pthread_mutex_unlock(&h->watch_mutex);
    9.66 @@ -321,8 +290,10 @@ static void *read_reply(
    9.67  }
    9.68  
    9.69  /* Send message to xs, get malloc'ed reply.  NULL and set errno on error. */
    9.70 -static void *xs_talkv(struct xs_handle *h, enum xsd_sockmsg_type type,
    9.71 -		      const struct iovec *iovec, unsigned int num_vecs,
    9.72 +static void *xs_talkv(struct xs_handle *h, struct xs_transaction_handle *t,
    9.73 +		      enum xsd_sockmsg_type type,
    9.74 +		      const struct iovec *iovec,
    9.75 +		      unsigned int num_vecs,
    9.76  		      unsigned int *len)
    9.77  {
    9.78  	struct xsd_sockmsg msg;
    9.79 @@ -331,6 +302,7 @@ static void *xs_talkv(struct xs_handle *
    9.80  	unsigned int i;
    9.81  	struct sigaction ignorepipe, oldact;
    9.82  
    9.83 +	msg.tx_id = (u32)(unsigned long)t;
    9.84  	msg.type = type;
    9.85  	msg.len = 0;
    9.86  	for (i = 0; i < num_vecs; i++)
    9.87 @@ -341,7 +313,7 @@ static void *xs_talkv(struct xs_handle *
    9.88  	ignorepipe.sa_flags = 0;
    9.89  	sigaction(SIGPIPE, &ignorepipe, &oldact);
    9.90  
    9.91 -	request_mutex_acquire(h);
    9.92 +	pthread_mutex_lock(&h->request_mutex);
    9.93  
    9.94  	if (!xs_write_all(h->fd, &msg, sizeof(msg)))
    9.95  		goto fail;
    9.96 @@ -354,7 +326,7 @@ static void *xs_talkv(struct xs_handle *
    9.97  	if (!ret)
    9.98  		goto fail;
    9.99  
   9.100 -	request_mutex_release(h);
   9.101 +	pthread_mutex_unlock(&h->request_mutex);
   9.102  
   9.103  	sigaction(SIGPIPE, &oldact, NULL);
   9.104  	if (msg.type == XS_ERROR) {
   9.105 @@ -375,7 +347,7 @@ static void *xs_talkv(struct xs_handle *
   9.106  fail:
   9.107  	/* We're in a bad state, so close fd. */
   9.108  	saved_errno = errno;
   9.109 -	request_mutex_release(h);
   9.110 +	pthread_mutex_unlock(&h->request_mutex);
   9.111  	sigaction(SIGPIPE, &oldact, NULL);
   9.112  close_fd:
   9.113  	close(h->fd);
   9.114 @@ -393,14 +365,16 @@ static void free_no_errno(void *p)
   9.115  }
   9.116  
   9.117  /* Simplified version of xs_talkv: single message. */
   9.118 -static void *xs_single(struct xs_handle *h, enum xsd_sockmsg_type type,
   9.119 -		       const char *string, unsigned int *len)
   9.120 +static void *xs_single(struct xs_handle *h, struct xs_transaction_handle *t,
   9.121 +		       enum xsd_sockmsg_type type,
   9.122 +		       const char *string,
   9.123 +		       unsigned int *len)
   9.124  {
   9.125  	struct iovec iovec;
   9.126  
   9.127  	iovec.iov_base = (void *)string;
   9.128  	iovec.iov_len = strlen(string) + 1;
   9.129 -	return xs_talkv(h, type, &iovec, 1, len);
   9.130 +	return xs_talkv(h, t, type, &iovec, 1, len);
   9.131  }
   9.132  
   9.133  static bool xs_bool(char *reply)
   9.134 @@ -417,7 +391,7 @@ char **xs_directory(struct xs_handle *h,
   9.135  	char *strings, *p, **ret;
   9.136  	unsigned int len;
   9.137  
   9.138 -	strings = xs_single(h, XS_DIRECTORY, path, &len);
   9.139 +	strings = xs_single(h, t, XS_DIRECTORY, path, &len);
   9.140  	if (!strings)
   9.141  		return NULL;
   9.142  
   9.143 @@ -446,7 +420,7 @@ char **xs_directory(struct xs_handle *h,
   9.144  void *xs_read(struct xs_handle *h, struct xs_transaction_handle *t,
   9.145  	      const char *path, unsigned int *len)
   9.146  {
   9.147 -	return xs_single(h, XS_READ, path, len);
   9.148 +	return xs_single(h, t, XS_READ, path, len);
   9.149  }
   9.150  
   9.151  /* Write the value of a single file.
   9.152 @@ -462,7 +436,8 @@ bool xs_write(struct xs_handle *h, struc
   9.153  	iovec[1].iov_base = (void *)data;
   9.154  	iovec[1].iov_len = len;
   9.155  
   9.156 -	return xs_bool(xs_talkv(h, XS_WRITE, iovec, ARRAY_SIZE(iovec), NULL));
   9.157 +	return xs_bool(xs_talkv(h, t, XS_WRITE, iovec,
   9.158 +				ARRAY_SIZE(iovec), NULL));
   9.159  }
   9.160  
   9.161  /* Create a new directory.
   9.162 @@ -471,7 +446,7 @@ bool xs_write(struct xs_handle *h, struc
   9.163  bool xs_mkdir(struct xs_handle *h, struct xs_transaction_handle *t,
   9.164  	      const char *path)
   9.165  {
   9.166 -	return xs_bool(xs_single(h, XS_MKDIR, path, NULL));
   9.167 +	return xs_bool(xs_single(h, t, XS_MKDIR, path, NULL));
   9.168  }
   9.169  
   9.170  /* Destroy a file or directory (directories must be empty).
   9.171 @@ -480,7 +455,7 @@ bool xs_mkdir(struct xs_handle *h, struc
   9.172  bool xs_rm(struct xs_handle *h, struct xs_transaction_handle *t,
   9.173  	   const char *path)
   9.174  {
   9.175 -	return xs_bool(xs_single(h, XS_RM, path, NULL));
   9.176 +	return xs_bool(xs_single(h, t, XS_RM, path, NULL));
   9.177  }
   9.178  
   9.179  /* Get permissions of node (first element is owner).
   9.180 @@ -494,7 +469,7 @@ struct xs_permissions *xs_get_permission
   9.181  	unsigned int len;
   9.182  	struct xs_permissions *ret;
   9.183  
   9.184 -	strings = xs_single(h, XS_GET_PERMS, path, &len);
   9.185 +	strings = xs_single(h, t, XS_GET_PERMS, path, &len);
   9.186  	if (!strings)
   9.187  		return NULL;
   9.188  
   9.189 @@ -544,7 +519,7 @@ bool xs_set_permissions(struct xs_handle
   9.190  			goto unwind;
   9.191  	}
   9.192  
   9.193 -	if (!xs_bool(xs_talkv(h, XS_SET_PERMS, iov, 1+num_perms, NULL)))
   9.194 +	if (!xs_bool(xs_talkv(h, t, XS_SET_PERMS, iov, 1+num_perms, NULL)))
   9.195  		goto unwind;
   9.196  	for (i = 0; i < num_perms; i++)
   9.197  		free(iov[i+1].iov_base);
   9.198 @@ -571,7 +546,8 @@ bool xs_watch(struct xs_handle *h, const
   9.199  	iov[1].iov_base = (void *)token;
   9.200  	iov[1].iov_len = strlen(token) + 1;
   9.201  
   9.202 -	return xs_bool(xs_talkv(h, XS_WATCH, iov, ARRAY_SIZE(iov), NULL));
   9.203 +	return xs_bool(xs_talkv(h, NULL, XS_WATCH, iov,
   9.204 +				ARRAY_SIZE(iov), NULL));
   9.205  }
   9.206  
   9.207  /* Find out what node change was on (will block if nothing pending).
   9.208 @@ -637,7 +613,8 @@ bool xs_unwatch(struct xs_handle *h, con
   9.209  	iov[1].iov_base = (char *)token;
   9.210  	iov[1].iov_len = strlen(token) + 1;
   9.211  
   9.212 -	return xs_bool(xs_talkv(h, XS_UNWATCH, iov, ARRAY_SIZE(iov), NULL));
   9.213 +	return xs_bool(xs_talkv(h, NULL, XS_UNWATCH, iov,
   9.214 +				ARRAY_SIZE(iov), NULL));
   9.215  }
   9.216  
   9.217  /* Start a transaction: changes by others will not be seen during this
   9.218 @@ -647,18 +624,17 @@ bool xs_unwatch(struct xs_handle *h, con
   9.219   */
   9.220  struct xs_transaction_handle *xs_transaction_start(struct xs_handle *h)
   9.221  {
   9.222 -	bool rc;
   9.223 -
   9.224 -	pthread_mutex_lock(&h->transaction_mutex);
   9.225 -	h->transaction_pthread = pthread_self();
   9.226 +	char *id_str;
   9.227 +	unsigned long id;
   9.228  
   9.229 -	rc = xs_bool(xs_single(h, XS_TRANSACTION_START, "", NULL));
   9.230 -	if (!rc) {
   9.231 -		h->transaction_pthread = -1;
   9.232 -		pthread_mutex_unlock(&h->transaction_mutex);
   9.233 -	}
   9.234 +	id_str = xs_single(h, NULL, XS_TRANSACTION_START, "", NULL);
   9.235 +	if (id_str == NULL)
   9.236 +		return NULL;
   9.237  
   9.238 -	return (struct xs_transaction_handle *)rc;
   9.239 +	id = strtoul(id_str, NULL, 0);
   9.240 +	free(id_str);
   9.241 +
   9.242 +	return (struct xs_transaction_handle *)id;
   9.243  }
   9.244  
   9.245  /* End a transaction.
   9.246 @@ -670,22 +646,13 @@ bool xs_transaction_end(struct xs_handle
   9.247  			bool abort)
   9.248  {
   9.249  	char abortstr[2];
   9.250 -	bool rc;
   9.251 -
   9.252 -	if (t == NULL)
   9.253 -		return -EINVAL;
   9.254  
   9.255  	if (abort)
   9.256  		strcpy(abortstr, "F");
   9.257  	else
   9.258  		strcpy(abortstr, "T");
   9.259  	
   9.260 -	rc = xs_bool(xs_single(h, XS_TRANSACTION_END, abortstr, NULL));
   9.261 -
   9.262 -	h->transaction_pthread = -1;
   9.263 -	pthread_mutex_unlock(&h->transaction_mutex);
   9.264 -
   9.265 -	return rc;
   9.266 +	return xs_bool(xs_single(h, t, XS_TRANSACTION_END, abortstr, NULL));
   9.267  }
   9.268  
   9.269  /* Introduce a new domain.
   9.270 @@ -713,7 +680,8 @@ bool xs_introduce_domain(struct xs_handl
   9.271  	iov[3].iov_base = (char *)path;
   9.272  	iov[3].iov_len = strlen(path) + 1;
   9.273  
   9.274 -	return xs_bool(xs_talkv(h, XS_INTRODUCE, iov, ARRAY_SIZE(iov), NULL));
   9.275 +	return xs_bool(xs_talkv(h, NULL, XS_INTRODUCE, iov,
   9.276 +				ARRAY_SIZE(iov), NULL));
   9.277  }
   9.278  
   9.279  bool xs_release_domain(struct xs_handle *h, domid_t domid)
   9.280 @@ -722,7 +690,7 @@ bool xs_release_domain(struct xs_handle 
   9.281  
   9.282  	sprintf(domid_str, "%u", domid);
   9.283  
   9.284 -	return xs_bool(xs_single(h, XS_RELEASE, domid_str, NULL));
   9.285 +	return xs_bool(xs_single(h, NULL, XS_RELEASE, domid_str, NULL));
   9.286  }
   9.287  
   9.288  char *xs_get_domain_path(struct xs_handle *h, domid_t domid)
   9.289 @@ -731,7 +699,7 @@ char *xs_get_domain_path(struct xs_handl
   9.290  
   9.291  	sprintf(domid_str, "%u", domid);
   9.292  
   9.293 -	return xs_single(h, XS_GET_DOMAIN_PATH, domid_str, NULL);
   9.294 +	return xs_single(h, NULL, XS_GET_DOMAIN_PATH, domid_str, NULL);
   9.295  }
   9.296  
   9.297  /* Only useful for DEBUG versions */
   9.298 @@ -745,7 +713,8 @@ char *xs_debug_command(struct xs_handle 
   9.299  	iov[1].iov_base = data;
   9.300  	iov[1].iov_len = len;
   9.301  
   9.302 -	return xs_talkv(h, XS_DEBUG, iov, ARRAY_SIZE(iov), NULL);
   9.303 +	return xs_talkv(h, NULL, XS_DEBUG, iov,
   9.304 +			ARRAY_SIZE(iov), NULL);
   9.305  }
   9.306  
   9.307  static void *read_thread(void *arg)