ia64/xen-unstable
changeset 7277:2144de6eabcc
Make libxenstore thread-safe. It also spawns an internal
thread to read messages from the comms channel.
Signed-off-by: Keir Fraser <keir@xensource.com>
thread to read messages from the comms channel.
Signed-off-by: Keir Fraser <keir@xensource.com>
author | kaf24@firebug.cl.cam.ac.uk |
---|---|
date | Sat Oct 08 19:19:27 2005 +0100 (2005-10-08) |
parents | e69413dca684 |
children | 4e0c94871be2 17f110647efa |
files | tools/python/xen/lowlevel/xs/xs.c tools/python/xen/xend/xenstore/xsutil.py tools/python/xen/xend/xenstore/xswatch.py tools/xenstore/Makefile tools/xenstore/testsuite/12readonly.test tools/xenstore/testsuite/test.sh tools/xenstore/xenstored_core.c tools/xenstore/xs.c tools/xenstore/xs.h tools/xenstore/xs_random.c tools/xenstore/xs_test.c xen/include/public/io/xs_wire.h |
line diff
1.1 --- a/tools/python/xen/lowlevel/xs/xs.c Sat Oct 08 10:22:01 2005 +0100 1.2 +++ b/tools/python/xen/lowlevel/xs/xs.c Sat Oct 08 19:19:27 2005 +0100 1.3 @@ -775,39 +775,6 @@ static PyObject *xspy_close(PyObject *se 1.4 return val; 1.5 } 1.6 1.7 -#define xspy_shutdown_doc "\n" \ 1.8 - "Shutdown the xenstore daemon.\n" \ 1.9 - "\n" \ 1.10 - "Returns None on success.\n" \ 1.11 - "Raises RuntimeError on error.\n" \ 1.12 - "\n" 1.13 - 1.14 -static PyObject *xspy_shutdown(PyObject *self, PyObject *args, PyObject *kwds) 1.15 -{ 1.16 - static char *kwd_spec[] = { NULL }; 1.17 - static char *arg_spec = ""; 1.18 - 1.19 - struct xs_handle *xh = xshandle(self); 1.20 - PyObject *val = NULL; 1.21 - int xsval = 0; 1.22 - 1.23 - if (!xh) 1.24 - goto exit; 1.25 - if (!PyArg_ParseTupleAndKeywords(args, kwds, arg_spec, kwd_spec)) 1.26 - goto exit; 1.27 - Py_BEGIN_ALLOW_THREADS 1.28 - xsval = xs_shutdown(xh); 1.29 - Py_END_ALLOW_THREADS 1.30 - if (!xsval) { 1.31 - PyErr_SetFromErrno(PyExc_RuntimeError); 1.32 - goto exit; 1.33 - } 1.34 - Py_INCREF(Py_None); 1.35 - val = Py_None; 1.36 - exit: 1.37 - return val; 1.38 -} 1.39 - 1.40 #define xspy_get_domain_path_doc "\n" \ 1.41 "Return store path of domain.\n" \ 1.42 " domid [int]: domain id\n" \ 1.43 @@ -850,28 +817,6 @@ static PyObject *xspy_get_domain_path(Py 1.44 return val; 1.45 } 1.46 1.47 -#define xspy_fileno_doc "\n" \ 1.48 - "Get the file descriptor of the xenstore socket.\n" \ 1.49 - "Allows an xs object to be passed to select().\n" \ 1.50 - "\n" \ 1.51 - "Returns: [int] file descriptor.\n" \ 1.52 - "\n" 1.53 - 1.54 -static PyObject *xspy_fileno(PyObject *self, PyObject *args, PyObject *kwds) 1.55 -{ 1.56 - static char *kwd_spec[] = { NULL }; 1.57 - static char *arg_spec = ""; 1.58 - 1.59 - struct xs_handle *xh = xshandle(self); 1.60 - PyObject *val = NULL; 1.61 - 1.62 - if (!PyArg_ParseTupleAndKeywords(args, kwds, arg_spec, kwd_spec)) 1.63 - goto exit; 1.64 - val = PyInt_FromLong((xh ? xs_fileno(xh) : -1)); 1.65 - exit: 1.66 - return val; 1.67 -} 1.68 - 1.69 #define XSPY_METH(_name) { \ 1.70 .ml_name = #_name, \ 1.71 .ml_meth = (PyCFunction) xspy_ ## _name, \ 1.72 @@ -895,9 +840,7 @@ static PyMethodDef xshandle_methods[] = 1.73 XSPY_METH(introduce_domain), 1.74 XSPY_METH(release_domain), 1.75 XSPY_METH(close), 1.76 - XSPY_METH(shutdown), 1.77 XSPY_METH(get_domain_path), 1.78 - XSPY_METH(fileno), 1.79 { /* Terminator. */ }, 1.80 }; 1.81
2.1 --- a/tools/python/xen/xend/xenstore/xsutil.py Sat Oct 08 10:22:01 2005 +0100 2.2 +++ b/tools/python/xen/xend/xenstore/xsutil.py Sat Oct 08 19:19:27 2005 +0100 2.3 @@ -7,14 +7,17 @@ 2.4 import threading 2.5 from xen.lowlevel import xs 2.6 2.7 -handles = {} 2.8 +xs_lock = threading.Lock() 2.9 +xs_handle = None 2.10 2.11 -# XXX need to g/c handles from dead threads 2.12 def xshandle(): 2.13 - if not handles.has_key(threading.currentThread()): 2.14 - handles[threading.currentThread()] = xs.open() 2.15 - return handles[threading.currentThread()] 2.16 - 2.17 + global xs_handle, xs_lock 2.18 + if not xs_handle: 2.19 + xs_lock.acquire() 2.20 + if not xs_handle: 2.21 + xs_handle = xs.open() 2.22 + xs_lock.release() 2.23 + return xs_handle 2.24 2.25 def IntroduceDomain(domid, page, port, path): 2.26 return xshandle().introduce_domain(domid, page, port, path)
3.1 --- a/tools/python/xen/xend/xenstore/xswatch.py Sat Oct 08 10:22:01 2005 +0100 3.2 +++ b/tools/python/xen/xend/xenstore/xswatch.py Sat Oct 08 19:19:27 2005 +0100 3.3 @@ -12,7 +12,6 @@ from xen.lowlevel import xs 3.4 class xswatch: 3.5 3.6 watchThread = None 3.7 - threadcond = threading.Condition() 3.8 xs = None 3.9 xslock = threading.Lock() 3.10 3.11 @@ -21,43 +20,31 @@ class xswatch: 3.12 self.args = args 3.13 self.kwargs = kwargs 3.14 xswatch.watchStart() 3.15 - xswatch.xslock.acquire() 3.16 xswatch.xs.watch(path, self) 3.17 - xswatch.xslock.release() 3.18 3.19 def watchStart(cls): 3.20 - cls.threadcond.acquire() 3.21 + cls.xslock.acquire() 3.22 if cls.watchThread: 3.23 - cls.threadcond.release() 3.24 + cls.xslock.release() 3.25 return 3.26 + # XXX: When we fix xenstored to have better watch semantics, 3.27 + # this can change to shared xshandle(). Currently that would result 3.28 + # in duplicate watch firings, thus failed extra xs.acknowledge_watch. 3.29 + cls.xs = xs.open() 3.30 cls.watchThread = threading.Thread(name="Watcher", 3.31 target=cls.watchMain) 3.32 cls.watchThread.setDaemon(True) 3.33 cls.watchThread.start() 3.34 - while cls.xs == None: 3.35 - cls.threadcond.wait() 3.36 - cls.threadcond.release() 3.37 + cls.xslock.release() 3.38 3.39 watchStart = classmethod(watchStart) 3.40 3.41 def watchMain(cls): 3.42 - cls.threadcond.acquire() 3.43 - cls.xs = xs.open() 3.44 - cls.threadcond.notifyAll() 3.45 - cls.threadcond.release() 3.46 while True: 3.47 try: 3.48 - (fd, _1, _2) = select.select([ cls.xs ], [], []) 3.49 - cls.xslock.acquire() 3.50 - # reconfirm ready to read with lock 3.51 - (fd, _1, _2) = select.select([ cls.xs ], [], [], 0.001) 3.52 - if not cls.xs in fd: 3.53 - cls.xslock.release() 3.54 - continue 3.55 we = cls.xs.read_watch() 3.56 watch = we[1] 3.57 cls.xs.acknowledge_watch(watch) 3.58 - cls.xslock.release() 3.59 except RuntimeError, ex: 3.60 print ex 3.61 raise
4.1 --- a/tools/xenstore/Makefile Sat Oct 08 10:22:01 2005 +0100 4.2 +++ b/tools/xenstore/Makefile Sat Oct 08 19:19:27 2005 +0100 4.3 @@ -34,7 +34,6 @@ testcode: xs_test xenstored_test xs_rand 4.4 xenstored: xenstored_core.o xenstored_watch.o xenstored_domain.o xenstored_transaction.o xs_lib.o talloc.o utils.o tdb.o 4.5 $(LINK.o) $^ $(LOADLIBES) $(LDLIBS) -lxenctrl -o $@ 4.6 4.7 -$(CLIENTS): libxenstore.so 4.8 $(CLIENTS): xenstore-%: xenstore_%.o 4.9 $(LINK.o) $^ $(LOADLIBES) $(LDLIBS) -lxenctrl -L. -lxenstore -o $@ 4.10 4.11 @@ -47,6 +46,7 @@ xenstored_test: xenstored_core_test.o xe 4.12 xs_tdb_dump: xs_tdb_dump.o utils.o tdb.o talloc.o 4.13 $(LINK.o) $^ $(LOADLIBES) $(LDLIBS) -o $@ 4.14 4.15 +xs_test xs_random xs_stress xs_crashme: LDFLAGS+=-lpthread 4.16 xs_test: xs_test.o xs_lib.o utils.o 4.17 xs_random: xs_random.o xs_test_lib.o xs_lib.o talloc.o utils.o 4.18 xs_stress: xs_stress.o xs_test_lib.o xs_lib.o talloc.o utils.o 4.19 @@ -69,7 +69,7 @@ talloc_test.o: talloc.c 4.20 $(COMPILE.c) -o $@ $< 4.21 4.22 libxenstore.so: xs.opic xs_lib.opic 4.23 - $(CC) $(CFLAGS) $(LDFLAGS) -Wl,-soname -Wl,libxenstore.so -shared -o $@ $^ 4.24 + $(CC) $(CFLAGS) $(LDFLAGS) -Wl,-soname -Wl,libxenstore.so -shared -o $@ $^ -lpthread 4.25 4.26 clean: testsuite-clean 4.27 rm -f *.o *.opic *.so
5.1 --- a/tools/xenstore/testsuite/12readonly.test Sat Oct 08 10:22:01 2005 +0100 5.2 +++ b/tools/xenstore/testsuite/12readonly.test Sat Oct 08 19:19:27 2005 +0100 5.3 @@ -27,8 +27,6 @@ expect setperm failed: Permission denied 5.4 setperm /test 100 NONE 5.5 expect setperm failed: Permission denied 5.6 setperm /test 100 NONE 5.7 -expect shutdown failed: Permission denied 5.8 -shutdown 5.9 expect introduce failed: Permission denied 5.10 introduce 1 100 7 /home 5.11
6.1 --- a/tools/xenstore/testsuite/15nowait.test Sat Oct 08 10:22:01 2005 +0100 6.2 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 6.3 @@ -1,25 +0,0 @@ 6.4 -# If we don't wait for an ack, we can crash daemon as it never expects to be 6.5 -# sending out two replies on top of each other. 6.6 -noackwrite /1 1 6.7 -noackwrite /2 2 6.8 -noackwrite /3 3 6.9 -noackwrite /4 4 6.10 -noackwrite /5 5 6.11 -readack 6.12 -readack 6.13 -readack 6.14 -readack 6.15 -readack 6.16 - 6.17 -expect handle is 1 6.18 -introduce 1 100 7 /my/home 6.19 -1 noackwrite /1 1 6.20 -1 noackwrite /2 2 6.21 -1 noackwrite /3 3 6.22 -1 noackwrite /4 4 6.23 -1 noackwrite /5 5 6.24 -1 readack 6.25 -1 readack 6.26 -1 readack 6.27 -1 readack 6.28 -1 readack
7.1 --- a/tools/xenstore/testsuite/16block-watch-crash.test Sat Oct 08 10:22:01 2005 +0100 7.2 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 7.3 @@ -1,14 +0,0 @@ 7.4 -# Test case where blocked connection gets sent watch. 7.5 - 7.6 -# FIXME: We no longer block connections 7.7 -# mkdir /test 7.8 -# watch /test token 7.9 -# 1 start 7.10 -# # This will block on above 7.11 -# noackwrite /test/entry contents 7.12 -# 1 write /test/entry2 contents 7.13 -# 1 commit 7.14 -# readack 7.15 -# expect /test/entry2:token 7.16 -# waitwatch 7.17 -# ackwatch token
8.1 --- a/tools/xenstore/testsuite/test.sh Sat Oct 08 10:22:01 2005 +0100 8.2 +++ b/tools/xenstore/testsuite/test.sh Sat Oct 08 19:19:27 2005 +0100 8.3 @@ -23,7 +23,8 @@ run_test() 8.4 cat testsuite/tmp/xenstored_errors 8.5 return 1 8.6 fi 8.7 - echo shutdown | ./xs_test 8.8 + kill $PID 8.9 + sleep 1 8.10 return 0 8.11 else 8.12 # In case daemon is wedged.
9.1 --- a/tools/xenstore/xenstored_core.c Sat Oct 08 10:22:01 2005 +0100 9.2 +++ b/tools/xenstore/xenstored_core.c Sat Oct 08 19:19:27 2005 +0100 9.3 @@ -150,7 +150,6 @@ static char *sockmsg_string(enum xsd_soc 9.4 { 9.5 switch (type) { 9.6 case XS_DEBUG: return "DEBUG"; 9.7 - case XS_SHUTDOWN: return "SHUTDOWN"; 9.8 case XS_DIRECTORY: return "DIRECTORY"; 9.9 case XS_READ: return "READ"; 9.10 case XS_GET_PERMS: return "GET_PERMS"; 9.11 @@ -1083,17 +1082,6 @@ static void process_message(struct conne 9.12 do_set_perms(conn, in); 9.13 break; 9.14 9.15 - case XS_SHUTDOWN: 9.16 - /* FIXME: Implement gentle shutdown too. */ 9.17 - /* Only tools can do this. */ 9.18 - if (conn->id != 0 || !conn->can_write) { 9.19 - send_error(conn, EACCES); 9.20 - break; 9.21 - } 9.22 - send_ack(conn, XS_SHUTDOWN); 9.23 - /* Everything hangs off auto-free context, freed at exit. */ 9.24 - exit(0); 9.25 - 9.26 case XS_DEBUG: 9.27 if (streq(in->buffer, "print")) 9.28 xprintf("debug: %s", in->buffer + get_string(in, 0));
10.1 --- a/tools/xenstore/xs.c Sat Oct 08 10:22:01 2005 +0100 10.2 +++ b/tools/xenstore/xs.c Sat Oct 08 19:19:27 2005 +0100 10.3 @@ -32,80 +32,151 @@ 10.4 #include <stdint.h> 10.5 #include <errno.h> 10.6 #include <sys/ioctl.h> 10.7 +#include <pthread.h> 10.8 #include "xs.h" 10.9 +#include "list.h" 10.10 #include "utils.h" 10.11 10.12 -struct xs_handle 10.13 -{ 10.14 - int fd; 10.15 +struct xs_stored_msg { 10.16 + struct list_head list; 10.17 + struct xsd_sockmsg hdr; 10.18 + char *body; 10.19 }; 10.20 10.21 -/* Get the socket from the store daemon handle. 10.22 - */ 10.23 +struct xs_handle { 10.24 + /* Communications channel to xenstore daemon. */ 10.25 + int fd; 10.26 + 10.27 + /* 10.28 + * A read thread which pulls messages off the comms channel and 10.29 + * signals waiters. 10.30 + */ 10.31 + pthread_t read_thr; 10.32 + 10.33 + /* 10.34 + * A list of fired watch messages, protected by a mutex. Users can 10.35 + * wait on the conditional variable until a watch is pending. 10.36 + */ 10.37 + struct list_head watch_list; 10.38 + pthread_mutex_t watch_mutex; 10.39 + pthread_cond_t watch_condvar; 10.40 + 10.41 + /* Clients can select() on this pipe to wait for a watch to fire. */ 10.42 + int watch_pipe[2]; 10.43 + 10.44 + /* 10.45 + * A list of replies. Currently only one will ever be outstanding 10.46 + * because we serialise requests. The requester can wait on the 10.47 + * conditional variable for its response. 10.48 + */ 10.49 + struct list_head reply_list; 10.50 + pthread_mutex_t reply_mutex; 10.51 + pthread_cond_t reply_condvar; 10.52 + 10.53 + /* One request at a time. */ 10.54 + pthread_mutex_t request_mutex; 10.55 + 10.56 + /* One transaction at a time. */ 10.57 + pthread_mutex_t transaction_mutex; 10.58 +}; 10.59 + 10.60 +static void *read_thread(void *arg); 10.61 + 10.62 int xs_fileno(struct xs_handle *h) 10.63 { 10.64 - return h->fd; 10.65 + char c = 0; 10.66 + 10.67 + pthread_mutex_lock(&h->watch_mutex); 10.68 + 10.69 + if ((h->watch_pipe[0] == -1) && (pipe(h->watch_pipe) != -1)) { 10.70 + /* Kick things off if the watch list is already non-empty. */ 10.71 + if (!list_empty(&h->watch_list)) 10.72 + while (write(h->watch_pipe[1], &c, 1) != 1) 10.73 + continue; 10.74 + } 10.75 + 10.76 + pthread_mutex_unlock(&h->watch_mutex); 10.77 + 10.78 + return h->watch_pipe[0]; 10.79 } 10.80 10.81 -static struct xs_handle *get_socket(const char *connect_to) 10.82 +static int get_socket(const char *connect_to) 10.83 { 10.84 struct sockaddr_un addr; 10.85 int sock, saved_errno; 10.86 - struct xs_handle *h = NULL; 10.87 10.88 sock = socket(PF_UNIX, SOCK_STREAM, 0); 10.89 if (sock < 0) 10.90 - return NULL; 10.91 + return -1; 10.92 10.93 addr.sun_family = AF_UNIX; 10.94 strcpy(addr.sun_path, connect_to); 10.95 10.96 - if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) == 0) { 10.97 - h = malloc(sizeof(*h)); 10.98 - if (h) { 10.99 - h->fd = sock; 10.100 - return h; 10.101 - } 10.102 + if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) != 0) { 10.103 + saved_errno = errno; 10.104 + close(sock); 10.105 + errno = saved_errno; 10.106 + return -1; 10.107 } 10.108 10.109 - saved_errno = errno; 10.110 - close(sock); 10.111 - errno = saved_errno; 10.112 - return NULL; 10.113 + return sock; 10.114 } 10.115 10.116 -static struct xs_handle *get_dev(const char *connect_to) 10.117 +static int get_dev(const char *connect_to) 10.118 { 10.119 - int fd, saved_errno; 10.120 - struct xs_handle *h; 10.121 - 10.122 - fd = open(connect_to, O_RDWR); 10.123 - if (fd < 0) 10.124 - return NULL; 10.125 - 10.126 - h = malloc(sizeof(*h)); 10.127 - if (h) { 10.128 - h->fd = fd; 10.129 - return h; 10.130 - } 10.131 - 10.132 - saved_errno = errno; 10.133 - close(fd); 10.134 - errno = saved_errno; 10.135 - return NULL; 10.136 + return open(connect_to, O_RDWR); 10.137 } 10.138 10.139 static struct xs_handle *get_handle(const char *connect_to) 10.140 { 10.141 struct stat buf; 10.142 + struct xs_handle *h = NULL; 10.143 + int fd = -1, saved_errno; 10.144 10.145 if (stat(connect_to, &buf) != 0) 10.146 - return NULL; 10.147 + goto error; 10.148 10.149 if (S_ISSOCK(buf.st_mode)) 10.150 - return get_socket(connect_to); 10.151 + fd = get_socket(connect_to); 10.152 else 10.153 - return get_dev(connect_to); 10.154 + fd = get_dev(connect_to); 10.155 + 10.156 + if (fd == -1) 10.157 + goto error; 10.158 + 10.159 + h = malloc(sizeof(*h)); 10.160 + if (h == NULL) 10.161 + goto error; 10.162 + 10.163 + h->fd = fd; 10.164 + 10.165 + /* Watch pipe is allocated on demand in xs_fileno(). */ 10.166 + h->watch_pipe[0] = h->watch_pipe[1] = -1; 10.167 + 10.168 + INIT_LIST_HEAD(&h->watch_list); 10.169 + pthread_mutex_init(&h->watch_mutex, NULL); 10.170 + pthread_cond_init(&h->watch_condvar, NULL); 10.171 + 10.172 + INIT_LIST_HEAD(&h->reply_list); 10.173 + pthread_mutex_init(&h->reply_mutex, NULL); 10.174 + pthread_cond_init(&h->reply_condvar, NULL); 10.175 + 10.176 + pthread_mutex_init(&h->request_mutex, NULL); 10.177 + pthread_mutex_init(&h->transaction_mutex, NULL); 10.178 + 10.179 + if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0) 10.180 + goto error; 10.181 + 10.182 + return h; 10.183 + 10.184 + error: 10.185 + saved_errno = errno; 10.186 + if (h != NULL) 10.187 + free(h); 10.188 + if (fd != -1) 10.189 + close(fd); 10.190 + errno = saved_errno; 10.191 + return NULL; 10.192 } 10.193 10.194 struct xs_handle *xs_daemon_open(void) 10.195 @@ -125,8 +196,39 @@ struct xs_handle *xs_domain_open(void) 10.196 10.197 void xs_daemon_close(struct xs_handle *h) 10.198 { 10.199 - if (h->fd >= 0) 10.200 - close(h->fd); 10.201 + struct xs_stored_msg *msg, *tmsg; 10.202 + 10.203 + pthread_mutex_lock(&h->transaction_mutex); 10.204 + pthread_mutex_lock(&h->request_mutex); 10.205 + pthread_mutex_lock(&h->reply_mutex); 10.206 + pthread_mutex_lock(&h->watch_mutex); 10.207 + 10.208 + /* XXX FIXME: May leak an unpublished message buffer. */ 10.209 + pthread_cancel(h->read_thr); 10.210 + pthread_join(h->read_thr, NULL); 10.211 + 10.212 + list_for_each_entry_safe(msg, tmsg, &h->reply_list, list) { 10.213 + free(msg->body); 10.214 + free(msg); 10.215 + } 10.216 + 10.217 + list_for_each_entry_safe(msg, tmsg, &h->watch_list, list) { 10.218 + free(msg->body); 10.219 + free(msg); 10.220 + } 10.221 + 10.222 + pthread_mutex_unlock(&h->transaction_mutex); 10.223 + pthread_mutex_unlock(&h->request_mutex); 10.224 + pthread_mutex_unlock(&h->reply_mutex); 10.225 + pthread_mutex_unlock(&h->watch_mutex); 10.226 + 10.227 + if (h->watch_pipe[0] != -1) { 10.228 + close(h->watch_pipe[0]); 10.229 + close(h->watch_pipe[1]); 10.230 + } 10.231 + 10.232 + close(h->fd); 10.233 + 10.234 free(h); 10.235 } 10.236 10.237 @@ -169,31 +271,28 @@ static int get_error(const char *errorst 10.238 } 10.239 10.240 /* Adds extra nul terminator, because we generally (always?) hold strings. */ 10.241 -static void *read_reply(int fd, enum xsd_sockmsg_type *type, unsigned int *len) 10.242 +static void *read_reply( 10.243 + struct xs_handle *h, enum xsd_sockmsg_type *type, unsigned int *len) 10.244 { 10.245 - struct xsd_sockmsg msg; 10.246 - void *ret; 10.247 - int saved_errno; 10.248 - 10.249 - if (!read_all(fd, &msg, sizeof(msg))) 10.250 - return NULL; 10.251 - 10.252 - ret = malloc(msg.len + 1); 10.253 - if (!ret) 10.254 - return NULL; 10.255 + struct xs_stored_msg *msg; 10.256 + char *body; 10.257 10.258 - if (!read_all(fd, ret, msg.len)) { 10.259 - saved_errno = errno; 10.260 - free(ret); 10.261 - errno = saved_errno; 10.262 - return NULL; 10.263 - } 10.264 + pthread_mutex_lock(&h->reply_mutex); 10.265 + while (list_empty(&h->reply_list)) 10.266 + pthread_cond_wait(&h->reply_condvar, &h->reply_mutex); 10.267 + msg = list_top(&h->reply_list, struct xs_stored_msg, list); 10.268 + list_del(&msg->list); 10.269 + assert(list_empty(&h->reply_list)); 10.270 + pthread_mutex_unlock(&h->reply_mutex); 10.271 10.272 - *type = msg.type; 10.273 + *type = msg->hdr.type; 10.274 if (len) 10.275 - *len = msg.len; 10.276 - ((char *)ret)[msg.len] = '\0'; 10.277 - return ret; 10.278 + *len = msg->hdr.len; 10.279 + body = msg->body; 10.280 + 10.281 + free(msg); 10.282 + 10.283 + return body; 10.284 } 10.285 10.286 /* Send message to xs, get malloc'ed reply. NULL and set errno on error. */ 10.287 @@ -217,6 +316,8 @@ static void *xs_talkv(struct xs_handle * 10.288 ignorepipe.sa_flags = 0; 10.289 sigaction(SIGPIPE, &ignorepipe, &oldact); 10.290 10.291 + pthread_mutex_lock(&h->request_mutex); 10.292 + 10.293 if (!xs_write_all(h->fd, &msg, sizeof(msg))) 10.294 goto fail; 10.295 10.296 @@ -224,14 +325,11 @@ static void *xs_talkv(struct xs_handle * 10.297 if (!xs_write_all(h->fd, iovec[i].iov_base, iovec[i].iov_len)) 10.298 goto fail; 10.299 10.300 - /* Watches can have fired before reply comes: daemon detects 10.301 - * and re-transmits, so we can ignore this. */ 10.302 - do { 10.303 - free(ret); 10.304 - ret = read_reply(h->fd, &msg.type, len); 10.305 - if (!ret) 10.306 - goto fail; 10.307 - } while (msg.type == XS_WATCH_EVENT); 10.308 + ret = read_reply(h, &msg.type, len); 10.309 + if (!ret) 10.310 + goto fail; 10.311 + 10.312 + pthread_mutex_unlock(&h->request_mutex); 10.313 10.314 sigaction(SIGPIPE, &oldact, NULL); 10.315 if (msg.type == XS_ERROR) { 10.316 @@ -252,6 +350,7 @@ static void *xs_talkv(struct xs_handle * 10.317 fail: 10.318 /* We're in a bad state, so close fd. */ 10.319 saved_errno = errno; 10.320 + pthread_mutex_unlock(&h->request_mutex); 10.321 sigaction(SIGPIPE, &oldact, NULL); 10.322 close_fd: 10.323 close(h->fd); 10.324 @@ -449,39 +548,45 @@ bool xs_watch(struct xs_handle *h, const 10.325 */ 10.326 char **xs_read_watch(struct xs_handle *h, unsigned int *num) 10.327 { 10.328 - struct xsd_sockmsg msg; 10.329 - char **ret; 10.330 - char *strings; 10.331 + struct xs_stored_msg *msg; 10.332 + char **ret, *strings, c = 0; 10.333 unsigned int num_strings, i; 10.334 10.335 - if (!read_all(h->fd, &msg, sizeof(msg))) 10.336 - return NULL; 10.337 + pthread_mutex_lock(&h->watch_mutex); 10.338 10.339 - assert(msg.type == XS_WATCH_EVENT); 10.340 - strings = malloc(msg.len); 10.341 - if (!strings) 10.342 - return NULL; 10.343 + /* Wait on the condition variable for a watch to fire. */ 10.344 + while (list_empty(&h->watch_list)) 10.345 + pthread_cond_wait(&h->watch_condvar, &h->watch_mutex); 10.346 + msg = list_top(&h->watch_list, struct xs_stored_msg, list); 10.347 + list_del(&msg->list); 10.348 10.349 - if (!read_all(h->fd, strings, msg.len)) { 10.350 - free_no_errno(strings); 10.351 - return NULL; 10.352 - } 10.353 + /* Clear the pipe token if there are no more pending watches. */ 10.354 + if (list_empty(&h->watch_list) && (h->watch_pipe[0] != -1)) 10.355 + while (read(h->watch_pipe[0], &c, 1) != 1) 10.356 + continue; 10.357 + 10.358 + pthread_mutex_unlock(&h->watch_mutex); 10.359 10.360 - num_strings = xs_count_strings(strings, msg.len); 10.361 + assert(msg->hdr.type == XS_WATCH_EVENT); 10.362 10.363 - ret = malloc(sizeof(char*) * num_strings + msg.len); 10.364 + strings = msg->body; 10.365 + num_strings = xs_count_strings(strings, msg->hdr.len); 10.366 + 10.367 + ret = malloc(sizeof(char*) * num_strings + msg->hdr.len); 10.368 if (!ret) { 10.369 free_no_errno(strings); 10.370 + free_no_errno(msg); 10.371 return NULL; 10.372 } 10.373 10.374 ret[0] = (char *)(ret + num_strings); 10.375 - memcpy(ret[0], strings, msg.len); 10.376 - free(strings); 10.377 + memcpy(ret[0], strings, msg->hdr.len); 10.378 10.379 - for (i = 1; i < num_strings; i++) { 10.380 + free(strings); 10.381 + free(msg); 10.382 + 10.383 + for (i = 1; i < num_strings; i++) 10.384 ret[i] = ret[i - 1] + strlen(ret[i - 1]) + 1; 10.385 - } 10.386 10.387 *num = num_strings; 10.388 10.389 @@ -519,6 +624,7 @@ bool xs_unwatch(struct xs_handle *h, con 10.390 */ 10.391 bool xs_transaction_start(struct xs_handle *h) 10.392 { 10.393 + pthread_mutex_lock(&h->transaction_mutex); 10.394 return xs_bool(xs_single(h, XS_TRANSACTION_START, "", NULL)); 10.395 } 10.396 10.397 @@ -530,12 +636,18 @@ bool xs_transaction_start(struct xs_hand 10.398 bool xs_transaction_end(struct xs_handle *h, bool abort) 10.399 { 10.400 char abortstr[2]; 10.401 + bool rc; 10.402 10.403 if (abort) 10.404 strcpy(abortstr, "F"); 10.405 else 10.406 strcpy(abortstr, "T"); 10.407 - return xs_bool(xs_single(h, XS_TRANSACTION_END, abortstr, NULL)); 10.408 + 10.409 + rc = xs_bool(xs_single(h, XS_TRANSACTION_END, abortstr, NULL)); 10.410 + 10.411 + pthread_mutex_unlock(&h->transaction_mutex); 10.412 + 10.413 + return rc; 10.414 } 10.415 10.416 /* Introduce a new domain. 10.417 @@ -584,18 +696,6 @@ char *xs_get_domain_path(struct xs_handl 10.418 return xs_single(h, XS_GET_DOMAIN_PATH, domid_str, NULL); 10.419 } 10.420 10.421 -bool xs_shutdown(struct xs_handle *h) 10.422 -{ 10.423 - bool ret = xs_bool(xs_single(h, XS_SHUTDOWN, "", NULL)); 10.424 - if (ret) { 10.425 - char c; 10.426 - /* Wait for it to actually shutdown. */ 10.427 - while ((read(h->fd, &c, 1) < 0) && (errno == EINTR)) 10.428 - continue; 10.429 - } 10.430 - return ret; 10.431 -} 10.432 - 10.433 /* Only useful for DEBUG versions */ 10.434 char *xs_debug_command(struct xs_handle *h, const char *cmd, 10.435 void *data, unsigned int len) 10.436 @@ -609,3 +709,75 @@ char *xs_debug_command(struct xs_handle 10.437 10.438 return xs_talkv(h, XS_DEBUG, iov, ARRAY_SIZE(iov), NULL); 10.439 } 10.440 + 10.441 +static void *read_thread(void *arg) 10.442 +{ 10.443 + struct xs_handle *h = arg; 10.444 + struct xs_stored_msg *msg = NULL; 10.445 + char *body = NULL; 10.446 + 10.447 + for (;;) { 10.448 + msg = NULL; 10.449 + body = NULL; 10.450 + 10.451 + /* Allocate message structure and read the message header. */ 10.452 + msg = malloc(sizeof(*msg)); 10.453 + if (msg == NULL) 10.454 + goto error; 10.455 + if (!read_all(h->fd, &msg->hdr, sizeof(msg->hdr))) 10.456 + goto error; 10.457 + 10.458 + /* Allocate and read the message body. */ 10.459 + body = msg->body = malloc(msg->hdr.len + 1); 10.460 + if (body == NULL) 10.461 + goto error; 10.462 + if (!read_all(h->fd, body, msg->hdr.len)) 10.463 + goto error; 10.464 + body[msg->hdr.len] = '\0'; 10.465 + 10.466 + if (msg->hdr.type == XS_WATCH_EVENT) { 10.467 + pthread_mutex_lock(&h->watch_mutex); 10.468 + 10.469 + /* Kick users out of their select() loop. */ 10.470 + if (list_empty(&h->watch_list) && 10.471 + (h->watch_pipe[1] != -1)) 10.472 + while (write(h->watch_pipe[1], body, 1) != 1) 10.473 + continue; 10.474 + 10.475 + list_add_tail(&msg->list, &h->watch_list); 10.476 + pthread_cond_signal(&h->watch_condvar); 10.477 + 10.478 + pthread_mutex_unlock(&h->watch_mutex); 10.479 + } else { 10.480 + pthread_mutex_lock(&h->reply_mutex); 10.481 + 10.482 + /* There should only ever be one response pending! */ 10.483 + if (!list_empty(&h->reply_list)) { 10.484 + pthread_mutex_unlock(&h->reply_mutex); 10.485 + goto error; 10.486 + } 10.487 + 10.488 + list_add_tail(&msg->list, &h->reply_list); 10.489 + pthread_cond_signal(&h->reply_condvar); 10.490 + 10.491 + pthread_mutex_unlock(&h->reply_mutex); 10.492 + } 10.493 + } 10.494 + 10.495 + error: 10.496 + if (body != NULL) 10.497 + free(body); 10.498 + if (msg != NULL) 10.499 + free(msg); 10.500 + return NULL; 10.501 +} 10.502 + 10.503 +/* 10.504 + * Local variables: 10.505 + * c-file-style: "linux" 10.506 + * indent-tabs-mode: t 10.507 + * c-indent-level: 8 10.508 + * c-basic-offset: 8 10.509 + * tab-width: 8 10.510 + * End: 10.511 + */
11.1 --- a/tools/xenstore/xs.h Sat Oct 08 10:22:01 2005 +0100 11.2 +++ b/tools/xenstore/xs.h Sat Oct 08 19:19:27 2005 +0100 11.3 @@ -141,7 +141,4 @@ char *xs_get_domain_path(struct xs_handl 11.4 char *xs_debug_command(struct xs_handle *h, const char *cmd, 11.5 void *data, unsigned int len); 11.6 11.7 -/* Shut down the daemon. */ 11.8 -bool xs_shutdown(struct xs_handle *h); 11.9 - 11.10 #endif /* _XS_H */
12.1 --- a/tools/xenstore/xs_random.c Sat Oct 08 10:22:01 2005 +0100 12.2 +++ b/tools/xenstore/xs_random.c Sat Oct 08 19:19:27 2005 +0100 12.3 @@ -879,20 +879,11 @@ static int daemon_pid; 12.4 static void cleanup_xs_ops(void) 12.5 { 12.6 char *cmd; 12.7 + 12.8 if (daemon_pid) { 12.9 - struct xs_handle *h; 12.10 - h = xs_daemon_open(); 12.11 - if (h) { 12.12 - if (xs_shutdown(h)) { 12.13 - waitpid(daemon_pid, NULL, 0); 12.14 - daemon_pid = 0; 12.15 - } 12.16 - xs_daemon_close(h); 12.17 - } 12.18 - if (daemon_pid) { 12.19 - kill(daemon_pid, SIGTERM); 12.20 - waitpid(daemon_pid, NULL, 0); 12.21 - } 12.22 + kill(daemon_pid, SIGTERM); 12.23 + waitpid(daemon_pid, NULL, 0); 12.24 + daemon_pid = 0; 12.25 } 12.26 12.27 cmd = talloc_asprintf(NULL, "rm -rf testsuite/tmp/*");
13.1 --- a/tools/xenstore/xs_test.c Sat Oct 08 10:22:01 2005 +0100 13.2 +++ b/tools/xenstore/xs_test.c Sat Oct 08 19:19:27 2005 +0100 13.3 @@ -198,7 +198,6 @@ static void __attribute__((noreturn)) us 13.4 " rm <path>\n" 13.5 " getperm <path>\n" 13.6 " setperm <path> <id> <flags> ...\n" 13.7 - " shutdown\n" 13.8 " watch <path> <token>\n" 13.9 " watchnoack <path> <token>\n" 13.10 " waitwatch\n" 13.11 @@ -214,8 +213,6 @@ static void __attribute__((noreturn)) us 13.12 " notimeout\n" 13.13 " readonly\n" 13.14 " readwrite\n" 13.15 - " noackwrite <path> <value>...\n" 13.16 - " readack\n" 13.17 " dump\n"); 13.18 } 13.19 13.20 @@ -355,37 +352,6 @@ static void do_write(unsigned int handle 13.21 failed(handle); 13.22 } 13.23 13.24 -static void do_noackwrite(unsigned int handle, 13.25 - char *path, char *data) 13.26 -{ 13.27 - struct xsd_sockmsg msg; 13.28 - 13.29 - msg.len = strlen(path) + 1 + strlen(data); 13.30 - msg.type = XS_WRITE; 13.31 - if (!write_all_choice(handles[handle]->fd, &msg, sizeof(msg))) 13.32 - failed(handle); 13.33 - if (!write_all_choice(handles[handle]->fd, path, strlen(path) + 1)) 13.34 - failed(handle); 13.35 - if (!write_all_choice(handles[handle]->fd, data, strlen(data))) 13.36 - failed(handle); 13.37 - /* Do not wait for ack. */ 13.38 -} 13.39 - 13.40 -static void do_readack(unsigned int handle) 13.41 -{ 13.42 - enum xsd_sockmsg_type type; 13.43 - char *ret = NULL; 13.44 - 13.45 - /* Watches can have fired before reply comes: daemon detects 13.46 - * and re-transmits, so we can ignore this. */ 13.47 - do { 13.48 - free(ret); 13.49 - ret = read_reply(handles[handle]->fd, &type, NULL); 13.50 - if (!ret) 13.51 - failed(handle); 13.52 - } while (type == XS_WATCH_EVENT); 13.53 -} 13.54 - 13.55 static void do_setid(unsigned int handle, char *id) 13.56 { 13.57 if (!xs_bool(xs_debug_command(handles[handle], "setid", id, 13.58 @@ -475,12 +441,6 @@ static void do_setperm(unsigned int hand 13.59 failed(handle); 13.60 } 13.61 13.62 -static void do_shutdown(unsigned int handle) 13.63 -{ 13.64 - if (!xs_shutdown(handles[handle])) 13.65 - failed(handle); 13.66 -} 13.67 - 13.68 static void do_watch(unsigned int handle, const char *node, const char *token, 13.69 bool swallow_event) 13.70 { 13.71 @@ -780,8 +740,6 @@ static void do_command(unsigned int defa 13.72 do_getperm(handle, arg(line, 1)); 13.73 else if (streq(command, "setperm")) 13.74 do_setperm(handle, arg(line, 1), line); 13.75 - else if (streq(command, "shutdown")) 13.76 - do_shutdown(handle); 13.77 else if (streq(command, "watch")) 13.78 do_watch(handle, arg(line, 1), arg(line, 2), true); 13.79 else if (streq(command, "watchnoack")) 13.80 @@ -823,11 +781,7 @@ static void do_command(unsigned int defa 13.81 readonly = false; 13.82 xs_daemon_close(handles[handle]); 13.83 handles[handle] = NULL; 13.84 - } else if (streq(command, "noackwrite")) 13.85 - do_noackwrite(handle, arg(line,1), arg(line,2)); 13.86 - else if (streq(command, "readack")) 13.87 - do_readack(handle); 13.88 - else 13.89 + } else 13.90 barf("Unknown command %s", command); 13.91 fflush(stdout); 13.92 disarm_timeout();