ia64/xen-unstable

changeset 7277:2144de6eabcc

Make libxenstore thread-safe. It also spawns an internal
thread to read messages from the comms channel.

Signed-off-by: Keir Fraser <keir@xensource.com>
author kaf24@firebug.cl.cam.ac.uk
date Sat Oct 08 19:19:27 2005 +0100 (2005-10-08)
parents e69413dca684
children 4e0c94871be2 17f110647efa
files tools/python/xen/lowlevel/xs/xs.c tools/python/xen/xend/xenstore/xsutil.py tools/python/xen/xend/xenstore/xswatch.py tools/xenstore/Makefile tools/xenstore/testsuite/12readonly.test tools/xenstore/testsuite/test.sh tools/xenstore/xenstored_core.c tools/xenstore/xs.c tools/xenstore/xs.h tools/xenstore/xs_random.c tools/xenstore/xs_test.c xen/include/public/io/xs_wire.h
line diff
     1.1 --- a/tools/python/xen/lowlevel/xs/xs.c	Sat Oct 08 10:22:01 2005 +0100
     1.2 +++ b/tools/python/xen/lowlevel/xs/xs.c	Sat Oct 08 19:19:27 2005 +0100
     1.3 @@ -775,39 +775,6 @@ static PyObject *xspy_close(PyObject *se
     1.4      return val;
     1.5  }
     1.6  
     1.7 -#define xspy_shutdown_doc "\n"			\
     1.8 -	"Shutdown the xenstore daemon.\n"	\
     1.9 -	"\n"					\
    1.10 -	"Returns None on success.\n"		\
    1.11 -	"Raises RuntimeError on error.\n"	\
    1.12 -	"\n"
    1.13 -
    1.14 -static PyObject *xspy_shutdown(PyObject *self, PyObject *args, PyObject *kwds)
    1.15 -{
    1.16 -    static char *kwd_spec[] = { NULL };
    1.17 -    static char *arg_spec = "";
    1.18 -
    1.19 -    struct xs_handle *xh = xshandle(self);
    1.20 -    PyObject *val = NULL;
    1.21 -    int xsval = 0;
    1.22 -
    1.23 -    if (!xh)
    1.24 -        goto exit;
    1.25 -    if (!PyArg_ParseTupleAndKeywords(args, kwds, arg_spec, kwd_spec))
    1.26 -        goto exit;
    1.27 -    Py_BEGIN_ALLOW_THREADS
    1.28 -    xsval = xs_shutdown(xh);
    1.29 -    Py_END_ALLOW_THREADS
    1.30 -    if (!xsval) {
    1.31 -        PyErr_SetFromErrno(PyExc_RuntimeError);
    1.32 -        goto exit;
    1.33 -    }
    1.34 -    Py_INCREF(Py_None);
    1.35 -    val = Py_None;
    1.36 - exit:
    1.37 -    return val;
    1.38 -}
    1.39 -
    1.40  #define xspy_get_domain_path_doc "\n"			\
    1.41  	"Return store path of domain.\n"		\
    1.42  	" domid [int]: domain id\n"			\
    1.43 @@ -850,28 +817,6 @@ static PyObject *xspy_get_domain_path(Py
    1.44      return val;
    1.45  }
    1.46  
    1.47 -#define xspy_fileno_doc "\n"					\
    1.48 -	"Get the file descriptor of the xenstore socket.\n"	\
    1.49 -	"Allows an xs object to be passed to select().\n"	\
    1.50 -	"\n"							\
    1.51 -	"Returns: [int] file descriptor.\n"			\
    1.52 -	"\n"
    1.53 -
    1.54 -static PyObject *xspy_fileno(PyObject *self, PyObject *args, PyObject *kwds)
    1.55 -{
    1.56 -    static char *kwd_spec[] = { NULL };
    1.57 -    static char *arg_spec = "";
    1.58 -
    1.59 -    struct xs_handle *xh = xshandle(self);
    1.60 -    PyObject *val = NULL;
    1.61 -
    1.62 -    if (!PyArg_ParseTupleAndKeywords(args, kwds, arg_spec, kwd_spec))
    1.63 -        goto exit;
    1.64 -    val = PyInt_FromLong((xh ? xs_fileno(xh) : -1));
    1.65 - exit:
    1.66 -    return val;
    1.67 -}
    1.68 -
    1.69  #define XSPY_METH(_name) {			\
    1.70      .ml_name  = #_name,				\
    1.71      .ml_meth  = (PyCFunction) xspy_ ## _name,	\
    1.72 @@ -895,9 +840,7 @@ static PyMethodDef xshandle_methods[] = 
    1.73       XSPY_METH(introduce_domain),
    1.74       XSPY_METH(release_domain),
    1.75       XSPY_METH(close),
    1.76 -     XSPY_METH(shutdown),
    1.77       XSPY_METH(get_domain_path),
    1.78 -     XSPY_METH(fileno),
    1.79       { /* Terminator. */ },
    1.80  };
    1.81  
     2.1 --- a/tools/python/xen/xend/xenstore/xsutil.py	Sat Oct 08 10:22:01 2005 +0100
     2.2 +++ b/tools/python/xen/xend/xenstore/xsutil.py	Sat Oct 08 19:19:27 2005 +0100
     2.3 @@ -7,14 +7,17 @@
     2.4  import threading
     2.5  from xen.lowlevel import xs
     2.6  
     2.7 -handles = {}
     2.8 +xs_lock = threading.Lock()
     2.9 +xs_handle = None
    2.10  
    2.11 -# XXX need to g/c handles from dead threads
    2.12  def xshandle():
    2.13 -    if not handles.has_key(threading.currentThread()):
    2.14 -        handles[threading.currentThread()] = xs.open()
    2.15 -    return handles[threading.currentThread()]
    2.16 -
    2.17 +    global xs_handle, xs_lock
    2.18 +    if not xs_handle:
    2.19 +        xs_lock.acquire()
    2.20 +        if not xs_handle:
    2.21 +            xs_handle = xs.open()
    2.22 +        xs_lock.release()
    2.23 +    return xs_handle
    2.24  
    2.25  def IntroduceDomain(domid, page, port, path):
    2.26      return xshandle().introduce_domain(domid, page, port, path)
     3.1 --- a/tools/python/xen/xend/xenstore/xswatch.py	Sat Oct 08 10:22:01 2005 +0100
     3.2 +++ b/tools/python/xen/xend/xenstore/xswatch.py	Sat Oct 08 19:19:27 2005 +0100
     3.3 @@ -12,7 +12,6 @@ from xen.lowlevel import xs
     3.4  class xswatch:
     3.5  
     3.6      watchThread = None
     3.7 -    threadcond = threading.Condition()
     3.8      xs = None
     3.9      xslock = threading.Lock()
    3.10      
    3.11 @@ -21,43 +20,31 @@ class xswatch:
    3.12          self.args = args
    3.13          self.kwargs = kwargs
    3.14          xswatch.watchStart()
    3.15 -        xswatch.xslock.acquire()
    3.16          xswatch.xs.watch(path, self)
    3.17 -        xswatch.xslock.release()
    3.18  
    3.19      def watchStart(cls):
    3.20 -        cls.threadcond.acquire()
    3.21 +        cls.xslock.acquire()
    3.22          if cls.watchThread:
    3.23 -            cls.threadcond.release()
    3.24 +            cls.xslock.release()
    3.25              return
    3.26 +        # XXX: When we fix xenstored to have better watch semantics,
    3.27 +        # this can change to shared xshandle(). Currently that would result
    3.28 +        # in duplicate watch firings, thus failed extra xs.acknowledge_watch.
    3.29 +        cls.xs = xs.open()
    3.30          cls.watchThread = threading.Thread(name="Watcher",
    3.31                                             target=cls.watchMain)
    3.32          cls.watchThread.setDaemon(True)
    3.33          cls.watchThread.start()
    3.34 -        while cls.xs == None:
    3.35 -            cls.threadcond.wait()
    3.36 -        cls.threadcond.release()
    3.37 +        cls.xslock.release()
    3.38  
    3.39      watchStart = classmethod(watchStart)
    3.40  
    3.41      def watchMain(cls):
    3.42 -        cls.threadcond.acquire()
    3.43 -        cls.xs = xs.open()
    3.44 -        cls.threadcond.notifyAll()
    3.45 -        cls.threadcond.release()
    3.46          while True:
    3.47              try:
    3.48 -                (fd, _1, _2) = select.select([ cls.xs ], [], [])
    3.49 -                cls.xslock.acquire()
    3.50 -                # reconfirm ready to read with lock
    3.51 -                (fd, _1, _2) = select.select([ cls.xs ], [], [], 0.001)
    3.52 -                if not cls.xs in fd:
    3.53 -                    cls.xslock.release()
    3.54 -                    continue
    3.55                  we = cls.xs.read_watch()
    3.56                  watch = we[1]
    3.57                  cls.xs.acknowledge_watch(watch)
    3.58 -                cls.xslock.release()
    3.59              except RuntimeError, ex:
    3.60                  print ex
    3.61                  raise
     4.1 --- a/tools/xenstore/Makefile	Sat Oct 08 10:22:01 2005 +0100
     4.2 +++ b/tools/xenstore/Makefile	Sat Oct 08 19:19:27 2005 +0100
     4.3 @@ -34,7 +34,6 @@ testcode: xs_test xenstored_test xs_rand
     4.4  xenstored: xenstored_core.o xenstored_watch.o xenstored_domain.o xenstored_transaction.o xs_lib.o talloc.o utils.o tdb.o
     4.5  	$(LINK.o) $^ $(LOADLIBES) $(LDLIBS) -lxenctrl -o $@
     4.6  
     4.7 -$(CLIENTS): libxenstore.so
     4.8  $(CLIENTS): xenstore-%: xenstore_%.o
     4.9  	$(LINK.o) $^ $(LOADLIBES) $(LDLIBS) -lxenctrl -L. -lxenstore -o $@
    4.10  
    4.11 @@ -47,6 +46,7 @@ xenstored_test: xenstored_core_test.o xe
    4.12  xs_tdb_dump: xs_tdb_dump.o utils.o tdb.o talloc.o
    4.13  	$(LINK.o) $^ $(LOADLIBES) $(LDLIBS) -o $@
    4.14  
    4.15 +xs_test xs_random xs_stress xs_crashme: LDFLAGS+=-lpthread
    4.16  xs_test: xs_test.o xs_lib.o utils.o
    4.17  xs_random: xs_random.o xs_test_lib.o xs_lib.o talloc.o utils.o
    4.18  xs_stress: xs_stress.o xs_test_lib.o xs_lib.o talloc.o utils.o
    4.19 @@ -69,7 +69,7 @@ talloc_test.o: talloc.c
    4.20  	$(COMPILE.c) -o $@ $<
    4.21  
    4.22  libxenstore.so: xs.opic xs_lib.opic
    4.23 -	$(CC) $(CFLAGS) $(LDFLAGS) -Wl,-soname -Wl,libxenstore.so -shared -o $@ $^
    4.24 +	$(CC) $(CFLAGS) $(LDFLAGS) -Wl,-soname -Wl,libxenstore.so -shared -o $@ $^ -lpthread
    4.25  
    4.26  clean: testsuite-clean
    4.27  	rm -f *.o *.opic *.so
     5.1 --- a/tools/xenstore/testsuite/12readonly.test	Sat Oct 08 10:22:01 2005 +0100
     5.2 +++ b/tools/xenstore/testsuite/12readonly.test	Sat Oct 08 19:19:27 2005 +0100
     5.3 @@ -27,8 +27,6 @@ expect setperm failed: Permission denied
     5.4  setperm /test 100 NONE
     5.5  expect setperm failed: Permission denied
     5.6  setperm /test 100 NONE
     5.7 -expect shutdown failed: Permission denied
     5.8 -shutdown
     5.9  expect introduce failed: Permission denied
    5.10  introduce 1 100 7 /home
    5.11  
     6.1 --- a/tools/xenstore/testsuite/15nowait.test	Sat Oct 08 10:22:01 2005 +0100
     6.2 +++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
     6.3 @@ -1,25 +0,0 @@
     6.4 -# If we don't wait for an ack, we can crash daemon as it never expects to be
     6.5 -# sending out two replies on top of each other.
     6.6 -noackwrite /1 1
     6.7 -noackwrite /2 2
     6.8 -noackwrite /3 3
     6.9 -noackwrite /4 4
    6.10 -noackwrite /5 5
    6.11 -readack
    6.12 -readack
    6.13 -readack
    6.14 -readack
    6.15 -readack
    6.16 -
    6.17 -expect handle is 1
    6.18 -introduce 1 100 7 /my/home
    6.19 -1 noackwrite /1 1
    6.20 -1 noackwrite /2 2
    6.21 -1 noackwrite /3 3
    6.22 -1 noackwrite /4 4
    6.23 -1 noackwrite /5 5
    6.24 -1 readack
    6.25 -1 readack
    6.26 -1 readack
    6.27 -1 readack
    6.28 -1 readack
     7.1 --- a/tools/xenstore/testsuite/16block-watch-crash.test	Sat Oct 08 10:22:01 2005 +0100
     7.2 +++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
     7.3 @@ -1,14 +0,0 @@
     7.4 -# Test case where blocked connection gets sent watch.
     7.5 -
     7.6 -# FIXME: We no longer block connections 
     7.7 -# mkdir /test
     7.8 -# watch /test token
     7.9 -# 1 start
    7.10 -# # This will block on above
    7.11 -# noackwrite /test/entry contents
    7.12 -# 1 write /test/entry2 contents
    7.13 -# 1 commit
    7.14 -# readack
    7.15 -# expect /test/entry2:token
    7.16 -# waitwatch
    7.17 -# ackwatch token
     8.1 --- a/tools/xenstore/testsuite/test.sh	Sat Oct 08 10:22:01 2005 +0100
     8.2 +++ b/tools/xenstore/testsuite/test.sh	Sat Oct 08 19:19:27 2005 +0100
     8.3 @@ -23,7 +23,8 @@ run_test()
     8.4  	    cat testsuite/tmp/xenstored_errors
     8.5  	    return 1
     8.6  	fi
     8.7 -	echo shutdown | ./xs_test
     8.8 +	kill $PID
     8.9 +	sleep 1
    8.10  	return 0
    8.11      else
    8.12  	# In case daemon is wedged.
     9.1 --- a/tools/xenstore/xenstored_core.c	Sat Oct 08 10:22:01 2005 +0100
     9.2 +++ b/tools/xenstore/xenstored_core.c	Sat Oct 08 19:19:27 2005 +0100
     9.3 @@ -150,7 +150,6 @@ static char *sockmsg_string(enum xsd_soc
     9.4  {
     9.5  	switch (type) {
     9.6  	case XS_DEBUG: return "DEBUG";
     9.7 -	case XS_SHUTDOWN: return "SHUTDOWN";
     9.8  	case XS_DIRECTORY: return "DIRECTORY";
     9.9  	case XS_READ: return "READ";
    9.10  	case XS_GET_PERMS: return "GET_PERMS";
    9.11 @@ -1083,17 +1082,6 @@ static void process_message(struct conne
    9.12  		do_set_perms(conn, in);
    9.13  		break;
    9.14  
    9.15 -	case XS_SHUTDOWN:
    9.16 -		/* FIXME: Implement gentle shutdown too. */
    9.17 -		/* Only tools can do this. */
    9.18 -		if (conn->id != 0 || !conn->can_write) {
    9.19 -			send_error(conn, EACCES);
    9.20 -			break;
    9.21 -		}
    9.22 -		send_ack(conn, XS_SHUTDOWN);
    9.23 -		/* Everything hangs off auto-free context, freed at exit. */
    9.24 -		exit(0);
    9.25 -
    9.26  	case XS_DEBUG:
    9.27  		if (streq(in->buffer, "print"))
    9.28  			xprintf("debug: %s", in->buffer + get_string(in, 0));
    10.1 --- a/tools/xenstore/xs.c	Sat Oct 08 10:22:01 2005 +0100
    10.2 +++ b/tools/xenstore/xs.c	Sat Oct 08 19:19:27 2005 +0100
    10.3 @@ -32,80 +32,151 @@
    10.4  #include <stdint.h>
    10.5  #include <errno.h>
    10.6  #include <sys/ioctl.h>
    10.7 +#include <pthread.h>
    10.8  #include "xs.h"
    10.9 +#include "list.h"
   10.10  #include "utils.h"
   10.11  
   10.12 -struct xs_handle
   10.13 -{
   10.14 -	int fd;
   10.15 +struct xs_stored_msg {
   10.16 +	struct list_head list;
   10.17 +	struct xsd_sockmsg hdr;
   10.18 +	char *body;
   10.19  };
   10.20  
   10.21 -/* Get the socket from the store daemon handle.
   10.22 - */
   10.23 +struct xs_handle {
   10.24 +	/* Communications channel to xenstore daemon. */
   10.25 +	int fd;
   10.26 +
   10.27 +	/*
   10.28 +         * A read thread which pulls messages off the comms channel and
   10.29 +         * signals waiters.
   10.30 +         */
   10.31 +	pthread_t read_thr;
   10.32 +
   10.33 +	/*
   10.34 +         * A list of fired watch messages, protected by a mutex. Users can
   10.35 +         * wait on the conditional variable until a watch is pending.
   10.36 +         */
   10.37 +	struct list_head watch_list;
   10.38 +	pthread_mutex_t watch_mutex;
   10.39 +	pthread_cond_t watch_condvar;
   10.40 +
   10.41 +	/* Clients can select() on this pipe to wait for a watch to fire. */
   10.42 +	int watch_pipe[2];
   10.43 +
   10.44 +	/*
   10.45 +         * A list of replies. Currently only one will ever be outstanding
   10.46 +         * because we serialise requests. The requester can wait on the
   10.47 +         * conditional variable for its response.
   10.48 +         */
   10.49 +	struct list_head reply_list;
   10.50 +	pthread_mutex_t reply_mutex;
   10.51 +	pthread_cond_t reply_condvar;
   10.52 +
   10.53 +	/* One request at a time. */
   10.54 +	pthread_mutex_t request_mutex;
   10.55 +
   10.56 +	/* One transaction at a time. */
   10.57 +	pthread_mutex_t transaction_mutex;
   10.58 +};
   10.59 +
   10.60 +static void *read_thread(void *arg);
   10.61 +
   10.62  int xs_fileno(struct xs_handle *h)
   10.63  {
   10.64 -	return h->fd;
   10.65 +	char c = 0;
   10.66 +
   10.67 +	pthread_mutex_lock(&h->watch_mutex);
   10.68 +
   10.69 +	if ((h->watch_pipe[0] == -1) && (pipe(h->watch_pipe) != -1)) {
   10.70 +		/* Kick things off if the watch list is already non-empty. */
   10.71 +		if (!list_empty(&h->watch_list))
   10.72 +			while (write(h->watch_pipe[1], &c, 1) != 1)
   10.73 +				continue;
   10.74 +	}
   10.75 +
   10.76 +	pthread_mutex_unlock(&h->watch_mutex);
   10.77 +
   10.78 +	return h->watch_pipe[0];
   10.79  }
   10.80  
   10.81 -static struct xs_handle *get_socket(const char *connect_to)
   10.82 +static int get_socket(const char *connect_to)
   10.83  {
   10.84  	struct sockaddr_un addr;
   10.85  	int sock, saved_errno;
   10.86 -	struct xs_handle *h = NULL;
   10.87  
   10.88  	sock = socket(PF_UNIX, SOCK_STREAM, 0);
   10.89  	if (sock < 0)
   10.90 -		return NULL;
   10.91 +		return -1;
   10.92  
   10.93  	addr.sun_family = AF_UNIX;
   10.94  	strcpy(addr.sun_path, connect_to);
   10.95  
   10.96 -	if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) == 0) {
   10.97 -		h = malloc(sizeof(*h));
   10.98 -		if (h) {
   10.99 -			h->fd = sock;
  10.100 -			return h;
  10.101 -		}
  10.102 +	if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) != 0) {
  10.103 +		saved_errno = errno;
  10.104 +		close(sock);
  10.105 +		errno = saved_errno;
  10.106 +		return -1;
  10.107  	}
  10.108  
  10.109 -	saved_errno = errno;
  10.110 -	close(sock);
  10.111 -	errno = saved_errno;
  10.112 -	return NULL;
  10.113 +	return sock;
  10.114  }
  10.115  
  10.116 -static struct xs_handle *get_dev(const char *connect_to)
  10.117 +static int get_dev(const char *connect_to)
  10.118  {
  10.119 -	int fd, saved_errno;
  10.120 -	struct xs_handle *h;
  10.121 -
  10.122 -	fd = open(connect_to, O_RDWR);
  10.123 -	if (fd < 0)
  10.124 -		return NULL;
  10.125 -
  10.126 -	h = malloc(sizeof(*h));
  10.127 -	if (h) {
  10.128 -		h->fd = fd;
  10.129 -		return h;
  10.130 -	}
  10.131 -
  10.132 -	saved_errno = errno;
  10.133 -	close(fd);
  10.134 -	errno = saved_errno;
  10.135 -	return NULL;
  10.136 +	return open(connect_to, O_RDWR);
  10.137  }
  10.138  
  10.139  static struct xs_handle *get_handle(const char *connect_to)
  10.140  {
  10.141  	struct stat buf;
  10.142 +	struct xs_handle *h = NULL;
  10.143 +	int fd = -1, saved_errno;
  10.144  
  10.145  	if (stat(connect_to, &buf) != 0)
  10.146 -		return NULL;
  10.147 +		goto error;
  10.148  
  10.149  	if (S_ISSOCK(buf.st_mode))
  10.150 -		return get_socket(connect_to);
  10.151 +		fd = get_socket(connect_to);
  10.152  	else
  10.153 -		return get_dev(connect_to);
  10.154 +		fd = get_dev(connect_to);
  10.155 +
  10.156 +	if (fd == -1)
  10.157 +		goto error;
  10.158 +
  10.159 +	h = malloc(sizeof(*h));
  10.160 +	if (h == NULL)
  10.161 +		goto error;
  10.162 +
  10.163 +	h->fd = fd;
  10.164 +
  10.165 +	/* Watch pipe is allocated on demand in xs_fileno(). */
  10.166 +	h->watch_pipe[0] = h->watch_pipe[1] = -1;
  10.167 +
  10.168 +	INIT_LIST_HEAD(&h->watch_list);
  10.169 +	pthread_mutex_init(&h->watch_mutex, NULL);
  10.170 +	pthread_cond_init(&h->watch_condvar, NULL);
  10.171 +
  10.172 +	INIT_LIST_HEAD(&h->reply_list);
  10.173 +	pthread_mutex_init(&h->reply_mutex, NULL);
  10.174 +	pthread_cond_init(&h->reply_condvar, NULL);
  10.175 +
  10.176 +	pthread_mutex_init(&h->request_mutex, NULL);
  10.177 +	pthread_mutex_init(&h->transaction_mutex, NULL);
  10.178 +
  10.179 +	if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0)
  10.180 +		goto error;
  10.181 +
  10.182 +	return h;
  10.183 +
  10.184 + error:
  10.185 +	saved_errno = errno;
  10.186 +	if (h != NULL)
  10.187 +		free(h);
  10.188 +	if (fd != -1)
  10.189 +		close(fd);
  10.190 +	errno = saved_errno;
  10.191 +	return NULL;
  10.192  }
  10.193  
  10.194  struct xs_handle *xs_daemon_open(void)
  10.195 @@ -125,8 +196,39 @@ struct xs_handle *xs_domain_open(void)
  10.196  
  10.197  void xs_daemon_close(struct xs_handle *h)
  10.198  {
  10.199 -	if (h->fd >= 0)
  10.200 -		close(h->fd);
  10.201 +	struct xs_stored_msg *msg, *tmsg;
  10.202 +
  10.203 +	pthread_mutex_lock(&h->transaction_mutex);
  10.204 +	pthread_mutex_lock(&h->request_mutex);
  10.205 +	pthread_mutex_lock(&h->reply_mutex);
  10.206 +	pthread_mutex_lock(&h->watch_mutex);
  10.207 +
  10.208 +	/* XXX FIXME: May leak an unpublished message buffer. */
  10.209 +	pthread_cancel(h->read_thr);
  10.210 +	pthread_join(h->read_thr, NULL);
  10.211 +
  10.212 +	list_for_each_entry_safe(msg, tmsg, &h->reply_list, list) {
  10.213 +		free(msg->body);
  10.214 +		free(msg);
  10.215 +	}
  10.216 +
  10.217 +	list_for_each_entry_safe(msg, tmsg, &h->watch_list, list) {
  10.218 +		free(msg->body);
  10.219 +		free(msg);
  10.220 +	}
  10.221 +
  10.222 +	pthread_mutex_unlock(&h->transaction_mutex);
  10.223 +	pthread_mutex_unlock(&h->request_mutex);
  10.224 +	pthread_mutex_unlock(&h->reply_mutex);
  10.225 +	pthread_mutex_unlock(&h->watch_mutex);
  10.226 +
  10.227 +	if (h->watch_pipe[0] != -1) {
  10.228 +		close(h->watch_pipe[0]);
  10.229 +		close(h->watch_pipe[1]);
  10.230 +	}
  10.231 +
  10.232 +	close(h->fd);
  10.233 +
  10.234  	free(h);
  10.235  }
  10.236  
  10.237 @@ -169,31 +271,28 @@ static int get_error(const char *errorst
  10.238  }
  10.239  
  10.240  /* Adds extra nul terminator, because we generally (always?) hold strings. */
  10.241 -static void *read_reply(int fd, enum xsd_sockmsg_type *type, unsigned int *len)
  10.242 +static void *read_reply(
  10.243 +	struct xs_handle *h, enum xsd_sockmsg_type *type, unsigned int *len)
  10.244  {
  10.245 -	struct xsd_sockmsg msg;
  10.246 -	void *ret;
  10.247 -	int saved_errno;
  10.248 -
  10.249 -	if (!read_all(fd, &msg, sizeof(msg)))
  10.250 -		return NULL;
  10.251 -
  10.252 -	ret = malloc(msg.len + 1);
  10.253 -	if (!ret)
  10.254 -		return NULL;
  10.255 +	struct xs_stored_msg *msg;
  10.256 +	char *body;
  10.257  
  10.258 -	if (!read_all(fd, ret, msg.len)) {
  10.259 -		saved_errno = errno;
  10.260 -		free(ret);
  10.261 -		errno = saved_errno;
  10.262 -		return NULL;
  10.263 -	}
  10.264 +	pthread_mutex_lock(&h->reply_mutex);
  10.265 +	while (list_empty(&h->reply_list))
  10.266 +		pthread_cond_wait(&h->reply_condvar, &h->reply_mutex);
  10.267 +	msg = list_top(&h->reply_list, struct xs_stored_msg, list);
  10.268 +	list_del(&msg->list);
  10.269 +	assert(list_empty(&h->reply_list));
  10.270 +	pthread_mutex_unlock(&h->reply_mutex);
  10.271  
  10.272 -	*type = msg.type;
  10.273 +	*type = msg->hdr.type;
  10.274  	if (len)
  10.275 -		*len = msg.len;
  10.276 -	((char *)ret)[msg.len] = '\0';
  10.277 -	return ret;
  10.278 +		*len = msg->hdr.len;
  10.279 +	body = msg->body;
  10.280 +
  10.281 +	free(msg);
  10.282 +
  10.283 +	return body;
  10.284  }
  10.285  
  10.286  /* Send message to xs, get malloc'ed reply.  NULL and set errno on error. */
  10.287 @@ -217,6 +316,8 @@ static void *xs_talkv(struct xs_handle *
  10.288  	ignorepipe.sa_flags = 0;
  10.289  	sigaction(SIGPIPE, &ignorepipe, &oldact);
  10.290  
  10.291 +	pthread_mutex_lock(&h->request_mutex);
  10.292 +
  10.293  	if (!xs_write_all(h->fd, &msg, sizeof(msg)))
  10.294  		goto fail;
  10.295  
  10.296 @@ -224,14 +325,11 @@ static void *xs_talkv(struct xs_handle *
  10.297  		if (!xs_write_all(h->fd, iovec[i].iov_base, iovec[i].iov_len))
  10.298  			goto fail;
  10.299  
  10.300 -	/* Watches can have fired before reply comes: daemon detects
  10.301 -	 * and re-transmits, so we can ignore this. */
  10.302 -	do {
  10.303 -		free(ret);
  10.304 -		ret = read_reply(h->fd, &msg.type, len);
  10.305 -		if (!ret)
  10.306 -			goto fail;
  10.307 -	} while (msg.type == XS_WATCH_EVENT);
  10.308 +	ret = read_reply(h, &msg.type, len);
  10.309 +	if (!ret)
  10.310 +		goto fail;
  10.311 +
  10.312 +	pthread_mutex_unlock(&h->request_mutex);
  10.313  
  10.314  	sigaction(SIGPIPE, &oldact, NULL);
  10.315  	if (msg.type == XS_ERROR) {
  10.316 @@ -252,6 +350,7 @@ static void *xs_talkv(struct xs_handle *
  10.317  fail:
  10.318  	/* We're in a bad state, so close fd. */
  10.319  	saved_errno = errno;
  10.320 +	pthread_mutex_unlock(&h->request_mutex);
  10.321  	sigaction(SIGPIPE, &oldact, NULL);
  10.322  close_fd:
  10.323  	close(h->fd);
  10.324 @@ -449,39 +548,45 @@ bool xs_watch(struct xs_handle *h, const
  10.325   */
  10.326  char **xs_read_watch(struct xs_handle *h, unsigned int *num)
  10.327  {
  10.328 -	struct xsd_sockmsg msg;
  10.329 -	char **ret;
  10.330 -	char *strings;
  10.331 +	struct xs_stored_msg *msg;
  10.332 +	char **ret, *strings, c = 0;
  10.333  	unsigned int num_strings, i;
  10.334  
  10.335 -	if (!read_all(h->fd, &msg, sizeof(msg)))
  10.336 -		return NULL;
  10.337 +	pthread_mutex_lock(&h->watch_mutex);
  10.338  
  10.339 -	assert(msg.type == XS_WATCH_EVENT);
  10.340 -	strings = malloc(msg.len);
  10.341 -	if (!strings)
  10.342 -		return NULL;
  10.343 +	/* Wait on the condition variable for a watch to fire. */
  10.344 +	while (list_empty(&h->watch_list))
  10.345 +		pthread_cond_wait(&h->watch_condvar, &h->watch_mutex);
  10.346 +	msg = list_top(&h->watch_list, struct xs_stored_msg, list);
  10.347 +	list_del(&msg->list);
  10.348  
  10.349 -	if (!read_all(h->fd, strings, msg.len)) {
  10.350 -		free_no_errno(strings);
  10.351 -		return NULL;
  10.352 -	}
  10.353 +	/* Clear the pipe token if there are no more pending watches. */
  10.354 +	if (list_empty(&h->watch_list) && (h->watch_pipe[0] != -1))
  10.355 +		while (read(h->watch_pipe[0], &c, 1) != 1)
  10.356 +			continue;
  10.357 +
  10.358 +	pthread_mutex_unlock(&h->watch_mutex);
  10.359  
  10.360 -	num_strings = xs_count_strings(strings, msg.len);
  10.361 +	assert(msg->hdr.type == XS_WATCH_EVENT);
  10.362  
  10.363 -	ret = malloc(sizeof(char*) * num_strings + msg.len);
  10.364 +	strings     = msg->body;
  10.365 +	num_strings = xs_count_strings(strings, msg->hdr.len);
  10.366 +
  10.367 +	ret = malloc(sizeof(char*) * num_strings + msg->hdr.len);
  10.368  	if (!ret) {
  10.369  		free_no_errno(strings);
  10.370 +		free_no_errno(msg);
  10.371  		return NULL;
  10.372  	}
  10.373  
  10.374  	ret[0] = (char *)(ret + num_strings);
  10.375 -	memcpy(ret[0], strings, msg.len);
  10.376 -	free(strings);
  10.377 +	memcpy(ret[0], strings, msg->hdr.len);
  10.378  
  10.379 -	for (i = 1; i < num_strings; i++) {
  10.380 +	free(strings);
  10.381 +	free(msg);
  10.382 +
  10.383 +	for (i = 1; i < num_strings; i++)
  10.384  		ret[i] = ret[i - 1] + strlen(ret[i - 1]) + 1;
  10.385 -	}
  10.386  
  10.387  	*num = num_strings;
  10.388  
  10.389 @@ -519,6 +624,7 @@ bool xs_unwatch(struct xs_handle *h, con
  10.390   */
  10.391  bool xs_transaction_start(struct xs_handle *h)
  10.392  {
  10.393 +	pthread_mutex_lock(&h->transaction_mutex);
  10.394  	return xs_bool(xs_single(h, XS_TRANSACTION_START, "", NULL));
  10.395  }
  10.396  
  10.397 @@ -530,12 +636,18 @@ bool xs_transaction_start(struct xs_hand
  10.398  bool xs_transaction_end(struct xs_handle *h, bool abort)
  10.399  {
  10.400  	char abortstr[2];
  10.401 +	bool rc;
  10.402  
  10.403  	if (abort)
  10.404  		strcpy(abortstr, "F");
  10.405  	else
  10.406  		strcpy(abortstr, "T");
  10.407 -	return xs_bool(xs_single(h, XS_TRANSACTION_END, abortstr, NULL));
  10.408 +	
  10.409 +	rc = xs_bool(xs_single(h, XS_TRANSACTION_END, abortstr, NULL));
  10.410 +
  10.411 +	pthread_mutex_unlock(&h->transaction_mutex);
  10.412 +
  10.413 +	return rc;
  10.414  }
  10.415  
  10.416  /* Introduce a new domain.
  10.417 @@ -584,18 +696,6 @@ char *xs_get_domain_path(struct xs_handl
  10.418  	return xs_single(h, XS_GET_DOMAIN_PATH, domid_str, NULL);
  10.419  }
  10.420  
  10.421 -bool xs_shutdown(struct xs_handle *h)
  10.422 -{
  10.423 -	bool ret = xs_bool(xs_single(h, XS_SHUTDOWN, "", NULL));
  10.424 -	if (ret) {
  10.425 -		char c;
  10.426 -		/* Wait for it to actually shutdown. */
  10.427 -		while ((read(h->fd, &c, 1) < 0) && (errno == EINTR))
  10.428 -			continue;
  10.429 -	}
  10.430 -	return ret;
  10.431 -}
  10.432 -
  10.433  /* Only useful for DEBUG versions */
  10.434  char *xs_debug_command(struct xs_handle *h, const char *cmd,
  10.435  		       void *data, unsigned int len)
  10.436 @@ -609,3 +709,75 @@ char *xs_debug_command(struct xs_handle 
  10.437  
  10.438  	return xs_talkv(h, XS_DEBUG, iov, ARRAY_SIZE(iov), NULL);
  10.439  }
  10.440 +
  10.441 +static void *read_thread(void *arg)
  10.442 +{
  10.443 +	struct xs_handle *h = arg;
  10.444 +	struct xs_stored_msg *msg = NULL;
  10.445 +	char *body = NULL;
  10.446 +
  10.447 +	for (;;) {
  10.448 +		msg = NULL;
  10.449 +		body = NULL;
  10.450 +
  10.451 +		/* Allocate message structure and read the message header. */
  10.452 +		msg = malloc(sizeof(*msg));
  10.453 +		if (msg == NULL)
  10.454 +			goto error;
  10.455 +		if (!read_all(h->fd, &msg->hdr, sizeof(msg->hdr)))
  10.456 +			goto error;
  10.457 +
  10.458 +		/* Allocate and read the message body. */
  10.459 +		body = msg->body = malloc(msg->hdr.len + 1);
  10.460 +		if (body == NULL)
  10.461 +			goto error;
  10.462 +		if (!read_all(h->fd, body, msg->hdr.len))
  10.463 +			goto error;
  10.464 +		body[msg->hdr.len] = '\0';
  10.465 +
  10.466 +		if (msg->hdr.type == XS_WATCH_EVENT) {
  10.467 +			pthread_mutex_lock(&h->watch_mutex);
  10.468 +
  10.469 +			/* Kick users out of their select() loop. */
  10.470 +			if (list_empty(&h->watch_list) &&
  10.471 +			    (h->watch_pipe[1] != -1))
  10.472 +				while (write(h->watch_pipe[1], body, 1) != 1)
  10.473 +					continue;
  10.474 +
  10.475 +			list_add_tail(&msg->list, &h->watch_list);
  10.476 +			pthread_cond_signal(&h->watch_condvar);
  10.477 +
  10.478 +			pthread_mutex_unlock(&h->watch_mutex);
  10.479 +		} else {
  10.480 +			pthread_mutex_lock(&h->reply_mutex);
  10.481 +
  10.482 +			/* There should only ever be one response pending! */
  10.483 +			if (!list_empty(&h->reply_list)) {
  10.484 +				pthread_mutex_unlock(&h->reply_mutex);
  10.485 +				goto error;
  10.486 +			}
  10.487 +
  10.488 +			list_add_tail(&msg->list, &h->reply_list);
  10.489 +			pthread_cond_signal(&h->reply_condvar);
  10.490 +
  10.491 +			pthread_mutex_unlock(&h->reply_mutex);
  10.492 +		}
  10.493 +	}
  10.494 +
  10.495 + error:
  10.496 +	if (body != NULL)
  10.497 +		free(body);
  10.498 +	if (msg != NULL)
  10.499 +		free(msg);
  10.500 +	return NULL;
  10.501 +}
  10.502 +
  10.503 +/*
  10.504 + * Local variables:
  10.505 + *  c-file-style: "linux"
  10.506 + *  indent-tabs-mode: t
  10.507 + *  c-indent-level: 8
  10.508 + *  c-basic-offset: 8
  10.509 + *  tab-width: 8
  10.510 + * End:
  10.511 + */
    11.1 --- a/tools/xenstore/xs.h	Sat Oct 08 10:22:01 2005 +0100
    11.2 +++ b/tools/xenstore/xs.h	Sat Oct 08 19:19:27 2005 +0100
    11.3 @@ -141,7 +141,4 @@ char *xs_get_domain_path(struct xs_handl
    11.4  char *xs_debug_command(struct xs_handle *h, const char *cmd,
    11.5  		       void *data, unsigned int len);
    11.6  
    11.7 -/* Shut down the daemon. */
    11.8 -bool xs_shutdown(struct xs_handle *h);
    11.9 -
   11.10  #endif /* _XS_H */
    12.1 --- a/tools/xenstore/xs_random.c	Sat Oct 08 10:22:01 2005 +0100
    12.2 +++ b/tools/xenstore/xs_random.c	Sat Oct 08 19:19:27 2005 +0100
    12.3 @@ -879,20 +879,11 @@ static int daemon_pid;
    12.4  static void cleanup_xs_ops(void)
    12.5  {
    12.6  	char *cmd;
    12.7 +
    12.8  	if (daemon_pid) {
    12.9 -		struct xs_handle *h;
   12.10 -		h = xs_daemon_open();
   12.11 -		if (h) {
   12.12 -			if (xs_shutdown(h)) {
   12.13 -				waitpid(daemon_pid, NULL, 0);
   12.14 -				daemon_pid = 0;
   12.15 -			}
   12.16 -			xs_daemon_close(h);
   12.17 -		}
   12.18 -		if (daemon_pid) {
   12.19 -			kill(daemon_pid, SIGTERM);
   12.20 -			waitpid(daemon_pid, NULL, 0);
   12.21 -		}
   12.22 +		kill(daemon_pid, SIGTERM);
   12.23 +		waitpid(daemon_pid, NULL, 0);
   12.24 +		daemon_pid = 0;
   12.25  	}
   12.26  	
   12.27  	cmd = talloc_asprintf(NULL, "rm -rf testsuite/tmp/*");
    13.1 --- a/tools/xenstore/xs_test.c	Sat Oct 08 10:22:01 2005 +0100
    13.2 +++ b/tools/xenstore/xs_test.c	Sat Oct 08 19:19:27 2005 +0100
    13.3 @@ -198,7 +198,6 @@ static void __attribute__((noreturn)) us
    13.4  	     "  rm <path>\n"
    13.5  	     "  getperm <path>\n"
    13.6  	     "  setperm <path> <id> <flags> ...\n"
    13.7 -	     "  shutdown\n"
    13.8  	     "  watch <path> <token>\n"
    13.9  	     "  watchnoack <path> <token>\n"
   13.10  	     "  waitwatch\n"
   13.11 @@ -214,8 +213,6 @@ static void __attribute__((noreturn)) us
   13.12  	     "  notimeout\n"
   13.13  	     "  readonly\n"
   13.14  	     "  readwrite\n"
   13.15 -	     "  noackwrite <path> <value>...\n"
   13.16 -	     "  readack\n"
   13.17  	     "  dump\n");
   13.18  }
   13.19  
   13.20 @@ -355,37 +352,6 @@ static void do_write(unsigned int handle
   13.21  		failed(handle);
   13.22  }
   13.23  
   13.24 -static void do_noackwrite(unsigned int handle,
   13.25 -			  char *path, char *data)
   13.26 -{
   13.27 -	struct xsd_sockmsg msg;
   13.28 -
   13.29 -	msg.len = strlen(path) + 1 + strlen(data);
   13.30 -	msg.type = XS_WRITE;
   13.31 -	if (!write_all_choice(handles[handle]->fd, &msg, sizeof(msg)))
   13.32 -		failed(handle);
   13.33 -	if (!write_all_choice(handles[handle]->fd, path, strlen(path) + 1))
   13.34 -		failed(handle);
   13.35 -	if (!write_all_choice(handles[handle]->fd, data, strlen(data)))
   13.36 -		failed(handle);
   13.37 -	/* Do not wait for ack. */
   13.38 -}
   13.39 -
   13.40 -static void do_readack(unsigned int handle)
   13.41 -{
   13.42 -	enum xsd_sockmsg_type type;
   13.43 -	char *ret = NULL;
   13.44 -
   13.45 -	/* Watches can have fired before reply comes: daemon detects
   13.46 -	 * and re-transmits, so we can ignore this. */
   13.47 -	do {
   13.48 -		free(ret);
   13.49 -		ret = read_reply(handles[handle]->fd, &type, NULL);
   13.50 -		if (!ret)
   13.51 -			failed(handle);
   13.52 -	} while (type == XS_WATCH_EVENT);
   13.53 -}
   13.54 -
   13.55  static void do_setid(unsigned int handle, char *id)
   13.56  {
   13.57  	if (!xs_bool(xs_debug_command(handles[handle], "setid", id,
   13.58 @@ -475,12 +441,6 @@ static void do_setperm(unsigned int hand
   13.59  		failed(handle);
   13.60  }
   13.61  
   13.62 -static void do_shutdown(unsigned int handle)
   13.63 -{
   13.64 -	if (!xs_shutdown(handles[handle]))
   13.65 -		failed(handle);
   13.66 -}
   13.67 -
   13.68  static void do_watch(unsigned int handle, const char *node, const char *token,
   13.69  		     bool swallow_event)
   13.70  {
   13.71 @@ -780,8 +740,6 @@ static void do_command(unsigned int defa
   13.72  		do_getperm(handle, arg(line, 1));
   13.73  	else if (streq(command, "setperm"))
   13.74  		do_setperm(handle, arg(line, 1), line);
   13.75 -	else if (streq(command, "shutdown"))
   13.76 -		do_shutdown(handle);
   13.77  	else if (streq(command, "watch"))
   13.78  		do_watch(handle, arg(line, 1), arg(line, 2), true);
   13.79  	else if (streq(command, "watchnoack"))
   13.80 @@ -823,11 +781,7 @@ static void do_command(unsigned int defa
   13.81  		readonly = false;
   13.82  		xs_daemon_close(handles[handle]);
   13.83  		handles[handle] = NULL;
   13.84 -	} else if (streq(command, "noackwrite"))
   13.85 -		do_noackwrite(handle, arg(line,1), arg(line,2));
   13.86 -	else if (streq(command, "readack"))
   13.87 -		do_readack(handle);
   13.88 -	else
   13.89 +	} else
   13.90  		barf("Unknown command %s", command);
   13.91  	fflush(stdout);
   13.92  	disarm_timeout();
    14.1 --- a/xen/include/public/io/xs_wire.h	Sat Oct 08 10:22:01 2005 +0100
    14.2 +++ b/xen/include/public/io/xs_wire.h	Sat Oct 08 19:19:27 2005 +0100
    14.3 @@ -31,7 +31,6 @@
    14.4  enum xsd_sockmsg_type
    14.5  {
    14.6  	XS_DEBUG,
    14.7 -	XS_SHUTDOWN,
    14.8  	XS_DIRECTORY,
    14.9  	XS_READ,
   14.10  	XS_GET_PERMS,