ia64/xen-unstable

view tools/xenstore/xs.c @ 15870:2635119a1766

xenstore: Remove incorrect comment.
Signed-off-by: Keir Fraser <keir@xensource.com>
author kfraser@localhost.localdomain
date Tue Sep 11 08:34:04 2007 +0100 (2007-09-11)
parents 9a1809ce711b
children 61ef23e45e9c
line source
1 /*
2 Xen Store Daemon interface providing simple tree-like database.
3 Copyright (C) 2005 Rusty Russell IBM Corporation
5 This library is free software; you can redistribute it and/or
6 modify it under the terms of the GNU Lesser General Public
7 License as published by the Free Software Foundation; either
8 version 2.1 of the License, or (at your option) any later version.
10 This library is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Lesser General Public License for more details.
15 You should have received a copy of the GNU Lesser General Public
16 License along with this library; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 */
20 #include <sys/types.h>
21 #include <sys/stat.h>
22 #include <fcntl.h>
23 #include <sys/socket.h>
24 #include <sys/un.h>
25 #include <string.h>
26 #include <unistd.h>
27 #include <stdbool.h>
28 #include <stdlib.h>
29 #include <assert.h>
30 #include <stdio.h>
31 #include <signal.h>
32 #include <stdint.h>
33 #include <errno.h>
34 #include <pthread.h>
35 #include "xs.h"
36 #include "list.h"
37 #include "utils.h"
39 struct xs_stored_msg {
40 struct list_head list;
41 struct xsd_sockmsg hdr;
42 char *body;
43 };
45 struct xs_handle {
46 /* Communications channel to xenstore daemon. */
47 int fd;
49 /*
50 * A read thread which pulls messages off the comms channel and
51 * signals waiters.
52 */
53 pthread_t read_thr;
54 int read_thr_exists;
56 /*
57 * A list of fired watch messages, protected by a mutex. Users can
58 * wait on the conditional variable until a watch is pending.
59 */
60 struct list_head watch_list;
61 pthread_mutex_t watch_mutex;
62 pthread_cond_t watch_condvar;
64 /* Clients can select() on this pipe to wait for a watch to fire. */
65 int watch_pipe[2];
67 /*
68 * A list of replies. Currently only one will ever be outstanding
69 * because we serialise requests. The requester can wait on the
70 * conditional variable for its response.
71 */
72 struct list_head reply_list;
73 pthread_mutex_t reply_mutex;
74 pthread_cond_t reply_condvar;
76 /* One request at a time. */
77 pthread_mutex_t request_mutex;
78 };
80 static int read_message(struct xs_handle *h);
81 static void *read_thread(void *arg);
83 int xs_fileno(struct xs_handle *h)
84 {
85 char c = 0;
87 pthread_mutex_lock(&h->watch_mutex);
89 if ((h->watch_pipe[0] == -1) && (pipe(h->watch_pipe) != -1)) {
90 /* Kick things off if the watch list is already non-empty. */
91 if (!list_empty(&h->watch_list))
92 while (write(h->watch_pipe[1], &c, 1) != 1)
93 continue;
94 }
96 pthread_mutex_unlock(&h->watch_mutex);
98 return h->watch_pipe[0];
99 }
101 static int get_socket(const char *connect_to)
102 {
103 struct sockaddr_un addr;
104 int sock, saved_errno, flags;
106 sock = socket(PF_UNIX, SOCK_STREAM, 0);
107 if (sock < 0)
108 return -1;
110 if ((flags = fcntl(sock, F_GETFD)) < 0)
111 goto error;
112 flags |= FD_CLOEXEC;
113 if (fcntl(sock, F_SETFD, flags) < 0)
114 goto error;
116 addr.sun_family = AF_UNIX;
117 strcpy(addr.sun_path, connect_to);
119 if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) != 0)
120 goto error;
122 return sock;
124 error:
125 saved_errno = errno;
126 close(sock);
127 errno = saved_errno;
128 return -1;
129 }
131 static int get_dev(const char *connect_to)
132 {
133 return open(connect_to, O_RDWR);
134 }
136 static struct xs_handle *get_handle(const char *connect_to)
137 {
138 struct stat buf;
139 struct xs_handle *h = NULL;
140 int fd = -1, saved_errno;
142 if (stat(connect_to, &buf) != 0)
143 return NULL;
145 if (S_ISSOCK(buf.st_mode))
146 fd = get_socket(connect_to);
147 else
148 fd = get_dev(connect_to);
150 if (fd == -1)
151 return NULL;
153 h = malloc(sizeof(*h));
154 if (h == NULL) {
155 saved_errno = errno;
156 close(fd);
157 errno = saved_errno;
158 return NULL;
159 }
161 memset(h, 0, sizeof(*h));
163 h->fd = fd;
165 /* Watch pipe is allocated on demand in xs_fileno(). */
166 h->watch_pipe[0] = h->watch_pipe[1] = -1;
168 INIT_LIST_HEAD(&h->watch_list);
169 pthread_mutex_init(&h->watch_mutex, NULL);
170 pthread_cond_init(&h->watch_condvar, NULL);
172 INIT_LIST_HEAD(&h->reply_list);
173 pthread_mutex_init(&h->reply_mutex, NULL);
174 pthread_cond_init(&h->reply_condvar, NULL);
176 pthread_mutex_init(&h->request_mutex, NULL);
178 return h;
179 }
181 struct xs_handle *xs_daemon_open(void)
182 {
183 return get_handle(xs_daemon_socket());
184 }
186 struct xs_handle *xs_daemon_open_readonly(void)
187 {
188 return get_handle(xs_daemon_socket_ro());
189 }
191 struct xs_handle *xs_domain_open(void)
192 {
193 return get_handle(xs_domain_dev());
194 }
196 void xs_daemon_close(struct xs_handle *h)
197 {
198 struct xs_stored_msg *msg, *tmsg;
200 pthread_mutex_lock(&h->request_mutex);
201 pthread_mutex_lock(&h->reply_mutex);
202 pthread_mutex_lock(&h->watch_mutex);
204 if (h->read_thr_exists) {
205 /* XXX FIXME: May leak an unpublished message buffer. */
206 pthread_cancel(h->read_thr);
207 pthread_join(h->read_thr, NULL);
208 }
210 list_for_each_entry_safe(msg, tmsg, &h->reply_list, list) {
211 free(msg->body);
212 free(msg);
213 }
215 list_for_each_entry_safe(msg, tmsg, &h->watch_list, list) {
216 free(msg->body);
217 free(msg);
218 }
220 pthread_mutex_unlock(&h->request_mutex);
221 pthread_mutex_unlock(&h->reply_mutex);
222 pthread_mutex_unlock(&h->watch_mutex);
224 if (h->watch_pipe[0] != -1) {
225 close(h->watch_pipe[0]);
226 close(h->watch_pipe[1]);
227 }
229 close(h->fd);
231 free(h);
232 }
234 static bool read_all(int fd, void *data, unsigned int len)
235 {
236 while (len) {
237 int done;
239 done = read(fd, data, len);
240 if (done < 0) {
241 if (errno == EINTR)
242 continue;
243 return false;
244 }
245 if (done == 0) {
246 /* It closed fd on us? EBADF is appropriate. */
247 errno = EBADF;
248 return false;
249 }
250 data += done;
251 len -= done;
252 }
254 return true;
255 }
257 #ifdef XSTEST
258 #define read_all read_all_choice
259 #define xs_write_all write_all_choice
260 #endif
262 static int get_error(const char *errorstring)
263 {
264 unsigned int i;
266 for (i = 0; !streq(errorstring, xsd_errors[i].errstring); i++)
267 if (i == ARRAY_SIZE(xsd_errors) - 1)
268 return EINVAL;
269 return xsd_errors[i].errnum;
270 }
272 /* Adds extra nul terminator, because we generally (always?) hold strings. */
273 static void *read_reply(
274 struct xs_handle *h, enum xsd_sockmsg_type *type, unsigned int *len)
275 {
276 struct xs_stored_msg *msg;
277 char *body;
279 /* Read from comms channel ourselves if there is no reader thread. */
280 if (!h->read_thr_exists && (read_message(h) == -1))
281 return NULL;
283 pthread_mutex_lock(&h->reply_mutex);
284 while (list_empty(&h->reply_list))
285 pthread_cond_wait(&h->reply_condvar, &h->reply_mutex);
286 msg = list_top(&h->reply_list, struct xs_stored_msg, list);
287 list_del(&msg->list);
288 assert(list_empty(&h->reply_list));
289 pthread_mutex_unlock(&h->reply_mutex);
291 *type = msg->hdr.type;
292 if (len)
293 *len = msg->hdr.len;
294 body = msg->body;
296 free(msg);
298 return body;
299 }
301 /* Send message to xs, get malloc'ed reply. NULL and set errno on error. */
302 static void *xs_talkv(struct xs_handle *h, xs_transaction_t t,
303 enum xsd_sockmsg_type type,
304 const struct iovec *iovec,
305 unsigned int num_vecs,
306 unsigned int *len)
307 {
308 struct xsd_sockmsg msg;
309 void *ret = NULL;
310 int saved_errno;
311 unsigned int i;
312 struct sigaction ignorepipe, oldact;
314 msg.tx_id = t;
315 msg.req_id = 0;
316 msg.type = type;
317 msg.len = 0;
318 for (i = 0; i < num_vecs; i++)
319 msg.len += iovec[i].iov_len;
321 ignorepipe.sa_handler = SIG_IGN;
322 sigemptyset(&ignorepipe.sa_mask);
323 ignorepipe.sa_flags = 0;
324 sigaction(SIGPIPE, &ignorepipe, &oldact);
326 pthread_mutex_lock(&h->request_mutex);
328 if (!xs_write_all(h->fd, &msg, sizeof(msg)))
329 goto fail;
331 for (i = 0; i < num_vecs; i++)
332 if (!xs_write_all(h->fd, iovec[i].iov_base, iovec[i].iov_len))
333 goto fail;
335 ret = read_reply(h, &msg.type, len);
336 if (!ret)
337 goto fail;
339 pthread_mutex_unlock(&h->request_mutex);
341 sigaction(SIGPIPE, &oldact, NULL);
342 if (msg.type == XS_ERROR) {
343 saved_errno = get_error(ret);
344 free(ret);
345 errno = saved_errno;
346 return NULL;
347 }
349 if (msg.type != type) {
350 free(ret);
351 saved_errno = EBADF;
352 goto close_fd;
353 }
354 return ret;
356 fail:
357 /* We're in a bad state, so close fd. */
358 saved_errno = errno;
359 pthread_mutex_unlock(&h->request_mutex);
360 sigaction(SIGPIPE, &oldact, NULL);
361 close_fd:
362 close(h->fd);
363 h->fd = -1;
364 errno = saved_errno;
365 return NULL;
366 }
368 /* free(), but don't change errno. */
369 static void free_no_errno(void *p)
370 {
371 int saved_errno = errno;
372 free(p);
373 errno = saved_errno;
374 }
376 /* Simplified version of xs_talkv: single message. */
377 static void *xs_single(struct xs_handle *h, xs_transaction_t t,
378 enum xsd_sockmsg_type type,
379 const char *string,
380 unsigned int *len)
381 {
382 struct iovec iovec;
384 iovec.iov_base = (void *)string;
385 iovec.iov_len = strlen(string) + 1;
386 return xs_talkv(h, t, type, &iovec, 1, len);
387 }
389 static bool xs_bool(char *reply)
390 {
391 if (!reply)
392 return false;
393 free(reply);
394 return true;
395 }
397 char **xs_directory(struct xs_handle *h, xs_transaction_t t,
398 const char *path, unsigned int *num)
399 {
400 char *strings, *p, **ret;
401 unsigned int len;
403 strings = xs_single(h, t, XS_DIRECTORY, path, &len);
404 if (!strings)
405 return NULL;
407 /* Count the strings. */
408 *num = xs_count_strings(strings, len);
410 /* Transfer to one big alloc for easy freeing. */
411 ret = malloc(*num * sizeof(char *) + len);
412 if (!ret) {
413 free_no_errno(strings);
414 return NULL;
415 }
416 memcpy(&ret[*num], strings, len);
417 free_no_errno(strings);
419 strings = (char *)&ret[*num];
420 for (p = strings, *num = 0; p < strings + len; p += strlen(p) + 1)
421 ret[(*num)++] = p;
422 return ret;
423 }
425 /* Get the value of a single file, nul terminated.
426 * Returns a malloced value: call free() on it after use.
427 * len indicates length in bytes, not including the nul.
428 */
429 void *xs_read(struct xs_handle *h, xs_transaction_t t,
430 const char *path, unsigned int *len)
431 {
432 return xs_single(h, t, XS_READ, path, len);
433 }
435 /* Write the value of a single file.
436 * Returns false on failure.
437 */
438 bool xs_write(struct xs_handle *h, xs_transaction_t t,
439 const char *path, const void *data, unsigned int len)
440 {
441 struct iovec iovec[2];
443 iovec[0].iov_base = (void *)path;
444 iovec[0].iov_len = strlen(path) + 1;
445 iovec[1].iov_base = (void *)data;
446 iovec[1].iov_len = len;
448 return xs_bool(xs_talkv(h, t, XS_WRITE, iovec,
449 ARRAY_SIZE(iovec), NULL));
450 }
452 /* Create a new directory.
453 * Returns false on failure, or success if it already exists.
454 */
455 bool xs_mkdir(struct xs_handle *h, xs_transaction_t t,
456 const char *path)
457 {
458 return xs_bool(xs_single(h, t, XS_MKDIR, path, NULL));
459 }
461 /* Destroy a file or directory (directories must be empty).
462 * Returns false on failure, or success if it doesn't exist.
463 */
464 bool xs_rm(struct xs_handle *h, xs_transaction_t t,
465 const char *path)
466 {
467 return xs_bool(xs_single(h, t, XS_RM, path, NULL));
468 }
470 /* Get permissions of node (first element is owner).
471 * Returns malloced array, or NULL: call free() after use.
472 */
473 struct xs_permissions *xs_get_permissions(struct xs_handle *h,
474 xs_transaction_t t,
475 const char *path, unsigned int *num)
476 {
477 char *strings;
478 unsigned int len;
479 struct xs_permissions *ret;
481 strings = xs_single(h, t, XS_GET_PERMS, path, &len);
482 if (!strings)
483 return NULL;
485 /* Count the strings: each one perms then domid. */
486 *num = xs_count_strings(strings, len);
488 /* Transfer to one big alloc for easy freeing. */
489 ret = malloc(*num * sizeof(struct xs_permissions));
490 if (!ret) {
491 free_no_errno(strings);
492 return NULL;
493 }
495 if (!xs_strings_to_perms(ret, *num, strings)) {
496 free_no_errno(ret);
497 ret = NULL;
498 }
500 free(strings);
501 return ret;
502 }
504 /* Set permissions of node (must be owner).
505 * Returns false on failure.
506 */
507 bool xs_set_permissions(struct xs_handle *h,
508 xs_transaction_t t,
509 const char *path,
510 struct xs_permissions *perms,
511 unsigned int num_perms)
512 {
513 unsigned int i;
514 struct iovec iov[1+num_perms];
516 iov[0].iov_base = (void *)path;
517 iov[0].iov_len = strlen(path) + 1;
519 for (i = 0; i < num_perms; i++) {
520 char buffer[MAX_STRLEN(unsigned int)+1];
522 if (!xs_perm_to_string(&perms[i], buffer))
523 goto unwind;
525 iov[i+1].iov_base = strdup(buffer);
526 iov[i+1].iov_len = strlen(buffer) + 1;
527 if (!iov[i+1].iov_base)
528 goto unwind;
529 }
531 if (!xs_bool(xs_talkv(h, t, XS_SET_PERMS, iov, 1+num_perms, NULL)))
532 goto unwind;
533 for (i = 0; i < num_perms; i++)
534 free(iov[i+1].iov_base);
535 return true;
537 unwind:
538 num_perms = i;
539 for (i = 0; i < num_perms; i++)
540 free_no_errno(iov[i+1].iov_base);
541 return false;
542 }
544 /* Watch a node for changes (poll on fd to detect, or call read_watch()).
545 * When the node (or any child) changes, fd will become readable.
546 * Token is returned when watch is read, to allow matching.
547 * Returns false on failure.
548 */
549 bool xs_watch(struct xs_handle *h, const char *path, const char *token)
550 {
551 struct iovec iov[2];
553 /* We dynamically create a reader thread on demand. */
554 pthread_mutex_lock(&h->request_mutex);
555 if (!h->read_thr_exists) {
556 if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0) {
557 pthread_mutex_unlock(&h->request_mutex);
558 return false;
559 }
560 h->read_thr_exists = 1;
561 }
562 pthread_mutex_unlock(&h->request_mutex);
564 iov[0].iov_base = (void *)path;
565 iov[0].iov_len = strlen(path) + 1;
566 iov[1].iov_base = (void *)token;
567 iov[1].iov_len = strlen(token) + 1;
569 return xs_bool(xs_talkv(h, XBT_NULL, XS_WATCH, iov,
570 ARRAY_SIZE(iov), NULL));
571 }
573 /* Find out what node change was on (will block if nothing pending).
574 * Returns array of two pointers: path and token, or NULL.
575 * Call free() after use.
576 */
577 char **xs_read_watch(struct xs_handle *h, unsigned int *num)
578 {
579 struct xs_stored_msg *msg;
580 char **ret, *strings, c = 0;
581 unsigned int num_strings, i;
583 pthread_mutex_lock(&h->watch_mutex);
585 /* Wait on the condition variable for a watch to fire. */
586 while (list_empty(&h->watch_list))
587 pthread_cond_wait(&h->watch_condvar, &h->watch_mutex);
588 msg = list_top(&h->watch_list, struct xs_stored_msg, list);
589 list_del(&msg->list);
591 /* Clear the pipe token if there are no more pending watches. */
592 if (list_empty(&h->watch_list) && (h->watch_pipe[0] != -1))
593 while (read(h->watch_pipe[0], &c, 1) != 1)
594 continue;
596 pthread_mutex_unlock(&h->watch_mutex);
598 assert(msg->hdr.type == XS_WATCH_EVENT);
600 strings = msg->body;
601 num_strings = xs_count_strings(strings, msg->hdr.len);
603 ret = malloc(sizeof(char*) * num_strings + msg->hdr.len);
604 if (!ret) {
605 free_no_errno(strings);
606 free_no_errno(msg);
607 return NULL;
608 }
610 ret[0] = (char *)(ret + num_strings);
611 memcpy(ret[0], strings, msg->hdr.len);
613 free(strings);
614 free(msg);
616 for (i = 1; i < num_strings; i++)
617 ret[i] = ret[i - 1] + strlen(ret[i - 1]) + 1;
619 *num = num_strings;
621 return ret;
622 }
624 /* Remove a watch on a node.
625 * Returns false on failure (no watch on that node).
626 */
627 bool xs_unwatch(struct xs_handle *h, const char *path, const char *token)
628 {
629 struct iovec iov[2];
631 iov[0].iov_base = (char *)path;
632 iov[0].iov_len = strlen(path) + 1;
633 iov[1].iov_base = (char *)token;
634 iov[1].iov_len = strlen(token) + 1;
636 return xs_bool(xs_talkv(h, XBT_NULL, XS_UNWATCH, iov,
637 ARRAY_SIZE(iov), NULL));
638 }
640 /* Start a transaction: changes by others will not be seen during this
641 * transaction, and changes will not be visible to others until end.
642 * Returns XBT_NULL on failure.
643 */
644 xs_transaction_t xs_transaction_start(struct xs_handle *h)
645 {
646 char *id_str;
647 xs_transaction_t id;
649 id_str = xs_single(h, XBT_NULL, XS_TRANSACTION_START, "", NULL);
650 if (id_str == NULL)
651 return XBT_NULL;
653 id = strtoul(id_str, NULL, 0);
654 free(id_str);
656 return id;
657 }
659 /* End a transaction.
660 * If abandon is true, transaction is discarded instead of committed.
661 * Returns false on failure, which indicates an error: transactions will
662 * not fail spuriously.
663 */
664 bool xs_transaction_end(struct xs_handle *h, xs_transaction_t t,
665 bool abort)
666 {
667 char abortstr[2];
669 if (abort)
670 strcpy(abortstr, "F");
671 else
672 strcpy(abortstr, "T");
674 return xs_bool(xs_single(h, t, XS_TRANSACTION_END, abortstr, NULL));
675 }
677 /* Introduce a new domain.
678 * This tells the store daemon about a shared memory page and event channel
679 * associated with a domain: the domain uses these to communicate.
680 */
681 bool xs_introduce_domain(struct xs_handle *h,
682 unsigned int domid, unsigned long mfn,
683 unsigned int eventchn)
684 {
685 char domid_str[MAX_STRLEN(domid)];
686 char mfn_str[MAX_STRLEN(mfn)];
687 char eventchn_str[MAX_STRLEN(eventchn)];
688 struct iovec iov[3];
690 sprintf(domid_str, "%u", domid);
691 sprintf(mfn_str, "%lu", mfn);
692 sprintf(eventchn_str, "%u", eventchn);
694 iov[0].iov_base = domid_str;
695 iov[0].iov_len = strlen(domid_str) + 1;
696 iov[1].iov_base = mfn_str;
697 iov[1].iov_len = strlen(mfn_str) + 1;
698 iov[2].iov_base = eventchn_str;
699 iov[2].iov_len = strlen(eventchn_str) + 1;
701 return xs_bool(xs_talkv(h, XBT_NULL, XS_INTRODUCE, iov,
702 ARRAY_SIZE(iov), NULL));
703 }
705 static void * single_with_domid(struct xs_handle *h,
706 enum xsd_sockmsg_type type,
707 unsigned int domid)
708 {
709 char domid_str[MAX_STRLEN(domid)];
711 sprintf(domid_str, "%u", domid);
713 return xs_single(h, XBT_NULL, type, domid_str, NULL);
714 }
716 bool xs_release_domain(struct xs_handle *h, unsigned int domid)
717 {
718 return xs_bool(single_with_domid(h, XS_RELEASE, domid));
719 }
721 /* clear the shutdown bit for the given domain */
722 bool xs_resume_domain(struct xs_handle *h, unsigned int domid)
723 {
724 return xs_bool(single_with_domid(h, XS_RESUME, domid));
725 }
727 char *xs_get_domain_path(struct xs_handle *h, unsigned int domid)
728 {
729 char domid_str[MAX_STRLEN(domid)];
731 sprintf(domid_str, "%u", domid);
733 return xs_single(h, XBT_NULL, XS_GET_DOMAIN_PATH, domid_str, NULL);
734 }
736 bool xs_is_domain_introduced(struct xs_handle *h, unsigned int domid)
737 {
738 return strcmp("F",
739 single_with_domid(h, XS_IS_DOMAIN_INTRODUCED, domid));
740 }
742 /* Only useful for DEBUG versions */
743 char *xs_debug_command(struct xs_handle *h, const char *cmd,
744 void *data, unsigned int len)
745 {
746 struct iovec iov[2];
748 iov[0].iov_base = (void *)cmd;
749 iov[0].iov_len = strlen(cmd) + 1;
750 iov[1].iov_base = data;
751 iov[1].iov_len = len;
753 return xs_talkv(h, XBT_NULL, XS_DEBUG, iov,
754 ARRAY_SIZE(iov), NULL);
755 }
757 static int read_message(struct xs_handle *h)
758 {
759 struct xs_stored_msg *msg = NULL;
760 char *body = NULL;
761 int saved_errno;
763 /* Allocate message structure and read the message header. */
764 msg = malloc(sizeof(*msg));
765 if (msg == NULL)
766 goto error;
767 if (!read_all(h->fd, &msg->hdr, sizeof(msg->hdr)))
768 goto error;
770 /* Allocate and read the message body. */
771 body = msg->body = malloc(msg->hdr.len + 1);
772 if (body == NULL)
773 goto error;
774 if (!read_all(h->fd, body, msg->hdr.len))
775 goto error;
776 body[msg->hdr.len] = '\0';
778 if (msg->hdr.type == XS_WATCH_EVENT) {
779 pthread_mutex_lock(&h->watch_mutex);
781 /* Kick users out of their select() loop. */
782 if (list_empty(&h->watch_list) &&
783 (h->watch_pipe[1] != -1))
784 while (write(h->watch_pipe[1], body, 1) != 1)
785 continue;
787 list_add_tail(&msg->list, &h->watch_list);
788 pthread_cond_signal(&h->watch_condvar);
790 pthread_mutex_unlock(&h->watch_mutex);
791 } else {
792 pthread_mutex_lock(&h->reply_mutex);
794 /* There should only ever be one response pending! */
795 if (!list_empty(&h->reply_list)) {
796 pthread_mutex_unlock(&h->reply_mutex);
797 goto error;
798 }
800 list_add_tail(&msg->list, &h->reply_list);
801 pthread_cond_signal(&h->reply_condvar);
803 pthread_mutex_unlock(&h->reply_mutex);
804 }
806 return 0;
808 error:
809 saved_errno = errno;
810 free(msg);
811 free(body);
812 errno = saved_errno;
813 return -1;
814 }
816 static void *read_thread(void *arg)
817 {
818 struct xs_handle *h = arg;
820 while (read_message(h) != -1)
821 continue;
823 return NULL;
824 }
826 /*
827 * Local variables:
828 * c-file-style: "linux"
829 * indent-tabs-mode: t
830 * c-indent-level: 8
831 * c-basic-offset: 8
832 * tab-width: 8
833 * End:
834 */