ia64/xen-unstable

view tools/xenstore/xs.c @ 7901:b7afc0be59b2

Initialise the request ID in xs_talkv, to silence valgrind. We're not actually
using this value at the moment -- it's only there for use by future clients.

Signed-off-by: Ewan Mellor <ewan@xensource.com>
author emellor@leeni.uk.xensource.com
date Thu Nov 17 20:25:47 2005 +0100 (2005-11-17)
parents a90d670c98b9
children 991ccc24bf2e
line source
1 /*
2 Xen Store Daemon interface providing simple tree-like database.
3 Copyright (C) 2005 Rusty Russell IBM Corporation
5 This library is free software; you can redistribute it and/or
6 modify it under the terms of the GNU Lesser General Public
7 License as published by the Free Software Foundation; either
8 version 2.1 of the License, or (at your option) any later version.
10 This library is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Lesser General Public License for more details.
15 You should have received a copy of the GNU Lesser General Public
16 License along with this library; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 */
20 #include <sys/types.h>
21 #include <sys/stat.h>
22 #include <fcntl.h>
23 #include <sys/socket.h>
24 #include <sys/un.h>
25 #include <string.h>
26 #include <unistd.h>
27 #include <stdbool.h>
28 #include <stdlib.h>
29 #include <assert.h>
30 #include <stdio.h>
31 #include <signal.h>
32 #include <stdint.h>
33 #include <errno.h>
34 #include <sys/ioctl.h>
35 #include <pthread.h>
36 #include "xs.h"
37 #include "list.h"
38 #include "utils.h"
40 struct xs_stored_msg {
41 struct list_head list;
42 struct xsd_sockmsg hdr;
43 char *body;
44 };
46 struct xs_handle {
47 /* Communications channel to xenstore daemon. */
48 int fd;
50 /*
51 * A read thread which pulls messages off the comms channel and
52 * signals waiters.
53 */
54 pthread_t read_thr;
55 int read_thr_exists;
57 /*
58 * A list of fired watch messages, protected by a mutex. Users can
59 * wait on the conditional variable until a watch is pending.
60 */
61 struct list_head watch_list;
62 pthread_mutex_t watch_mutex;
63 pthread_cond_t watch_condvar;
65 /* Clients can select() on this pipe to wait for a watch to fire. */
66 int watch_pipe[2];
68 /*
69 * A list of replies. Currently only one will ever be outstanding
70 * because we serialise requests. The requester can wait on the
71 * conditional variable for its response.
72 */
73 struct list_head reply_list;
74 pthread_mutex_t reply_mutex;
75 pthread_cond_t reply_condvar;
77 /* One request at a time. */
78 pthread_mutex_t request_mutex;
79 };
81 static int read_message(struct xs_handle *h);
82 static void *read_thread(void *arg);
84 int xs_fileno(struct xs_handle *h)
85 {
86 char c = 0;
88 pthread_mutex_lock(&h->watch_mutex);
90 if ((h->watch_pipe[0] == -1) && (pipe(h->watch_pipe) != -1)) {
91 /* Kick things off if the watch list is already non-empty. */
92 if (!list_empty(&h->watch_list))
93 while (write(h->watch_pipe[1], &c, 1) != 1)
94 continue;
95 }
97 pthread_mutex_unlock(&h->watch_mutex);
99 return h->watch_pipe[0];
100 }
102 static int get_socket(const char *connect_to)
103 {
104 struct sockaddr_un addr;
105 int sock, saved_errno;
107 sock = socket(PF_UNIX, SOCK_STREAM, 0);
108 if (sock < 0)
109 return -1;
111 addr.sun_family = AF_UNIX;
112 strcpy(addr.sun_path, connect_to);
114 if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) != 0) {
115 saved_errno = errno;
116 close(sock);
117 errno = saved_errno;
118 return -1;
119 }
121 return sock;
122 }
124 static int get_dev(const char *connect_to)
125 {
126 return open(connect_to, O_RDWR);
127 }
129 static struct xs_handle *get_handle(const char *connect_to)
130 {
131 struct stat buf;
132 struct xs_handle *h = NULL;
133 int fd = -1, saved_errno;
135 if (stat(connect_to, &buf) != 0)
136 return NULL;
138 if (S_ISSOCK(buf.st_mode))
139 fd = get_socket(connect_to);
140 else
141 fd = get_dev(connect_to);
143 if (fd == -1)
144 return NULL;
146 h = malloc(sizeof(*h));
147 if (h == NULL) {
148 saved_errno = errno;
149 close(fd);
150 errno = saved_errno;
151 return NULL;
152 }
154 memset(h, 0, sizeof(*h));
156 h->fd = fd;
158 /* Watch pipe is allocated on demand in xs_fileno(). */
159 h->watch_pipe[0] = h->watch_pipe[1] = -1;
161 INIT_LIST_HEAD(&h->watch_list);
162 pthread_mutex_init(&h->watch_mutex, NULL);
163 pthread_cond_init(&h->watch_condvar, NULL);
165 INIT_LIST_HEAD(&h->reply_list);
166 pthread_mutex_init(&h->reply_mutex, NULL);
167 pthread_cond_init(&h->reply_condvar, NULL);
169 pthread_mutex_init(&h->request_mutex, NULL);
171 return h;
172 }
174 struct xs_handle *xs_daemon_open(void)
175 {
176 return get_handle(xs_daemon_socket());
177 }
179 struct xs_handle *xs_daemon_open_readonly(void)
180 {
181 return get_handle(xs_daemon_socket_ro());
182 }
184 struct xs_handle *xs_domain_open(void)
185 {
186 return get_handle(xs_domain_dev());
187 }
189 void xs_daemon_close(struct xs_handle *h)
190 {
191 struct xs_stored_msg *msg, *tmsg;
193 pthread_mutex_lock(&h->request_mutex);
194 pthread_mutex_lock(&h->reply_mutex);
195 pthread_mutex_lock(&h->watch_mutex);
197 if (h->read_thr_exists) {
198 /* XXX FIXME: May leak an unpublished message buffer. */
199 pthread_cancel(h->read_thr);
200 pthread_join(h->read_thr, NULL);
201 }
203 list_for_each_entry_safe(msg, tmsg, &h->reply_list, list) {
204 free(msg->body);
205 free(msg);
206 }
208 list_for_each_entry_safe(msg, tmsg, &h->watch_list, list) {
209 free(msg->body);
210 free(msg);
211 }
213 pthread_mutex_unlock(&h->request_mutex);
214 pthread_mutex_unlock(&h->reply_mutex);
215 pthread_mutex_unlock(&h->watch_mutex);
217 if (h->watch_pipe[0] != -1) {
218 close(h->watch_pipe[0]);
219 close(h->watch_pipe[1]);
220 }
222 close(h->fd);
224 free(h);
225 }
227 static bool read_all(int fd, void *data, unsigned int len)
228 {
229 while (len) {
230 int done;
232 done = read(fd, data, len);
233 if (done < 0) {
234 if (errno == EINTR)
235 continue;
236 return false;
237 }
238 if (done == 0) {
239 /* It closed fd on us? EBADF is appropriate. */
240 errno = EBADF;
241 return false;
242 }
243 data += done;
244 len -= done;
245 }
247 return true;
248 }
250 #ifdef XSTEST
251 #define read_all read_all_choice
252 #define xs_write_all write_all_choice
253 #endif
255 static int get_error(const char *errorstring)
256 {
257 unsigned int i;
259 for (i = 0; !streq(errorstring, xsd_errors[i].errstring); i++)
260 if (i == ARRAY_SIZE(xsd_errors) - 1)
261 return EINVAL;
262 return xsd_errors[i].errnum;
263 }
265 /* Adds extra nul terminator, because we generally (always?) hold strings. */
266 static void *read_reply(
267 struct xs_handle *h, enum xsd_sockmsg_type *type, unsigned int *len)
268 {
269 struct xs_stored_msg *msg;
270 char *body;
272 /* Read from comms channel ourselves if there is no reader thread. */
273 if (!h->read_thr_exists && (read_message(h) == -1))
274 return NULL;
276 pthread_mutex_lock(&h->reply_mutex);
277 while (list_empty(&h->reply_list))
278 pthread_cond_wait(&h->reply_condvar, &h->reply_mutex);
279 msg = list_top(&h->reply_list, struct xs_stored_msg, list);
280 list_del(&msg->list);
281 assert(list_empty(&h->reply_list));
282 pthread_mutex_unlock(&h->reply_mutex);
284 *type = msg->hdr.type;
285 if (len)
286 *len = msg->hdr.len;
287 body = msg->body;
289 free(msg);
291 return body;
292 }
294 /* Send message to xs, get malloc'ed reply. NULL and set errno on error. */
295 static void *xs_talkv(struct xs_handle *h, struct xs_transaction_handle *t,
296 enum xsd_sockmsg_type type,
297 const struct iovec *iovec,
298 unsigned int num_vecs,
299 unsigned int *len)
300 {
301 struct xsd_sockmsg msg;
302 void *ret = NULL;
303 int saved_errno;
304 unsigned int i;
305 struct sigaction ignorepipe, oldact;
307 msg.tx_id = (uint32_t)(unsigned long)t;
308 msg.req_id = 0;
309 msg.type = type;
310 msg.len = 0;
311 for (i = 0; i < num_vecs; i++)
312 msg.len += iovec[i].iov_len;
314 ignorepipe.sa_handler = SIG_IGN;
315 sigemptyset(&ignorepipe.sa_mask);
316 ignorepipe.sa_flags = 0;
317 sigaction(SIGPIPE, &ignorepipe, &oldact);
319 pthread_mutex_lock(&h->request_mutex);
321 if (!xs_write_all(h->fd, &msg, sizeof(msg)))
322 goto fail;
324 for (i = 0; i < num_vecs; i++)
325 if (!xs_write_all(h->fd, iovec[i].iov_base, iovec[i].iov_len))
326 goto fail;
328 ret = read_reply(h, &msg.type, len);
329 if (!ret)
330 goto fail;
332 pthread_mutex_unlock(&h->request_mutex);
334 sigaction(SIGPIPE, &oldact, NULL);
335 if (msg.type == XS_ERROR) {
336 saved_errno = get_error(ret);
337 free(ret);
338 errno = saved_errno;
339 return NULL;
340 }
342 if (msg.type != type) {
343 free(ret);
344 saved_errno = EBADF;
345 goto close_fd;
347 }
348 return ret;
350 fail:
351 /* We're in a bad state, so close fd. */
352 saved_errno = errno;
353 pthread_mutex_unlock(&h->request_mutex);
354 sigaction(SIGPIPE, &oldact, NULL);
355 close_fd:
356 close(h->fd);
357 h->fd = -1;
358 errno = saved_errno;
359 return NULL;
360 }
362 /* free(), but don't change errno. */
363 static void free_no_errno(void *p)
364 {
365 int saved_errno = errno;
366 free(p);
367 errno = saved_errno;
368 }
370 /* Simplified version of xs_talkv: single message. */
371 static void *xs_single(struct xs_handle *h, struct xs_transaction_handle *t,
372 enum xsd_sockmsg_type type,
373 const char *string,
374 unsigned int *len)
375 {
376 struct iovec iovec;
378 iovec.iov_base = (void *)string;
379 iovec.iov_len = strlen(string) + 1;
380 return xs_talkv(h, t, type, &iovec, 1, len);
381 }
383 static bool xs_bool(char *reply)
384 {
385 if (!reply)
386 return false;
387 free(reply);
388 return true;
389 }
391 char **xs_directory(struct xs_handle *h, struct xs_transaction_handle *t,
392 const char *path, unsigned int *num)
393 {
394 char *strings, *p, **ret;
395 unsigned int len;
397 strings = xs_single(h, t, XS_DIRECTORY, path, &len);
398 if (!strings)
399 return NULL;
401 /* Count the strings. */
402 *num = xs_count_strings(strings, len);
404 /* Transfer to one big alloc for easy freeing. */
405 ret = malloc(*num * sizeof(char *) + len);
406 if (!ret) {
407 free_no_errno(strings);
408 return NULL;
409 }
410 memcpy(&ret[*num], strings, len);
411 free_no_errno(strings);
413 strings = (char *)&ret[*num];
414 for (p = strings, *num = 0; p < strings + len; p += strlen(p) + 1)
415 ret[(*num)++] = p;
416 return ret;
417 }
419 /* Get the value of a single file, nul terminated.
420 * Returns a malloced value: call free() on it after use.
421 * len indicates length in bytes, not including the nul.
422 */
423 void *xs_read(struct xs_handle *h, struct xs_transaction_handle *t,
424 const char *path, unsigned int *len)
425 {
426 return xs_single(h, t, XS_READ, path, len);
427 }
429 /* Write the value of a single file.
430 * Returns false on failure.
431 */
432 bool xs_write(struct xs_handle *h, struct xs_transaction_handle *t,
433 const char *path, const void *data, unsigned int len)
434 {
435 struct iovec iovec[2];
437 iovec[0].iov_base = (void *)path;
438 iovec[0].iov_len = strlen(path) + 1;
439 iovec[1].iov_base = (void *)data;
440 iovec[1].iov_len = len;
442 return xs_bool(xs_talkv(h, t, XS_WRITE, iovec,
443 ARRAY_SIZE(iovec), NULL));
444 }
446 /* Create a new directory.
447 * Returns false on failure, or success if it already exists.
448 */
449 bool xs_mkdir(struct xs_handle *h, struct xs_transaction_handle *t,
450 const char *path)
451 {
452 return xs_bool(xs_single(h, t, XS_MKDIR, path, NULL));
453 }
455 /* Destroy a file or directory (directories must be empty).
456 * Returns false on failure, or success if it doesn't exist.
457 */
458 bool xs_rm(struct xs_handle *h, struct xs_transaction_handle *t,
459 const char *path)
460 {
461 return xs_bool(xs_single(h, t, XS_RM, path, NULL));
462 }
464 /* Get permissions of node (first element is owner).
465 * Returns malloced array, or NULL: call free() after use.
466 */
467 struct xs_permissions *xs_get_permissions(struct xs_handle *h,
468 struct xs_transaction_handle *t,
469 const char *path, unsigned int *num)
470 {
471 char *strings;
472 unsigned int len;
473 struct xs_permissions *ret;
475 strings = xs_single(h, t, XS_GET_PERMS, path, &len);
476 if (!strings)
477 return NULL;
479 /* Count the strings: each one perms then domid. */
480 *num = xs_count_strings(strings, len);
482 /* Transfer to one big alloc for easy freeing. */
483 ret = malloc(*num * sizeof(struct xs_permissions));
484 if (!ret) {
485 free_no_errno(strings);
486 return NULL;
487 }
489 if (!xs_strings_to_perms(ret, *num, strings)) {
490 free_no_errno(ret);
491 ret = NULL;
492 }
494 free(strings);
495 return ret;
496 }
498 /* Set permissions of node (must be owner).
499 * Returns false on failure.
500 */
501 bool xs_set_permissions(struct xs_handle *h,
502 struct xs_transaction_handle *t,
503 const char *path,
504 struct xs_permissions *perms,
505 unsigned int num_perms)
506 {
507 unsigned int i;
508 struct iovec iov[1+num_perms];
510 iov[0].iov_base = (void *)path;
511 iov[0].iov_len = strlen(path) + 1;
513 for (i = 0; i < num_perms; i++) {
514 char buffer[MAX_STRLEN(unsigned int)+1];
516 if (!xs_perm_to_string(&perms[i], buffer))
517 goto unwind;
519 iov[i+1].iov_base = strdup(buffer);
520 iov[i+1].iov_len = strlen(buffer) + 1;
521 if (!iov[i+1].iov_base)
522 goto unwind;
523 }
525 if (!xs_bool(xs_talkv(h, t, XS_SET_PERMS, iov, 1+num_perms, NULL)))
526 goto unwind;
527 for (i = 0; i < num_perms; i++)
528 free(iov[i+1].iov_base);
529 return true;
531 unwind:
532 num_perms = i;
533 for (i = 0; i < num_perms; i++)
534 free_no_errno(iov[i+1].iov_base);
535 return false;
536 }
538 /* Watch a node for changes (poll on fd to detect, or call read_watch()).
539 * When the node (or any child) changes, fd will become readable.
540 * Token is returned when watch is read, to allow matching.
541 * Returns false on failure.
542 */
543 bool xs_watch(struct xs_handle *h, const char *path, const char *token)
544 {
545 struct iovec iov[2];
547 /* We dynamically create a reader thread on demand. */
548 pthread_mutex_lock(&h->request_mutex);
549 if (!h->read_thr_exists) {
550 if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0) {
551 pthread_mutex_unlock(&h->request_mutex);
552 return false;
553 }
554 h->read_thr_exists = 1;
555 }
556 pthread_mutex_unlock(&h->request_mutex);
558 iov[0].iov_base = (void *)path;
559 iov[0].iov_len = strlen(path) + 1;
560 iov[1].iov_base = (void *)token;
561 iov[1].iov_len = strlen(token) + 1;
563 return xs_bool(xs_talkv(h, NULL, XS_WATCH, iov,
564 ARRAY_SIZE(iov), NULL));
565 }
567 /* Find out what node change was on (will block if nothing pending).
568 * Returns array of two pointers: path and token, or NULL.
569 * Call free() after use.
570 */
571 char **xs_read_watch(struct xs_handle *h, unsigned int *num)
572 {
573 struct xs_stored_msg *msg;
574 char **ret, *strings, c = 0;
575 unsigned int num_strings, i;
577 pthread_mutex_lock(&h->watch_mutex);
579 /* Wait on the condition variable for a watch to fire. */
580 while (list_empty(&h->watch_list))
581 pthread_cond_wait(&h->watch_condvar, &h->watch_mutex);
582 msg = list_top(&h->watch_list, struct xs_stored_msg, list);
583 list_del(&msg->list);
585 /* Clear the pipe token if there are no more pending watches. */
586 if (list_empty(&h->watch_list) && (h->watch_pipe[0] != -1))
587 while (read(h->watch_pipe[0], &c, 1) != 1)
588 continue;
590 pthread_mutex_unlock(&h->watch_mutex);
592 assert(msg->hdr.type == XS_WATCH_EVENT);
594 strings = msg->body;
595 num_strings = xs_count_strings(strings, msg->hdr.len);
597 ret = malloc(sizeof(char*) * num_strings + msg->hdr.len);
598 if (!ret) {
599 free_no_errno(strings);
600 free_no_errno(msg);
601 return NULL;
602 }
604 ret[0] = (char *)(ret + num_strings);
605 memcpy(ret[0], strings, msg->hdr.len);
607 free(strings);
608 free(msg);
610 for (i = 1; i < num_strings; i++)
611 ret[i] = ret[i - 1] + strlen(ret[i - 1]) + 1;
613 *num = num_strings;
615 return ret;
616 }
618 /* Remove a watch on a node.
619 * Returns false on failure (no watch on that node).
620 */
621 bool xs_unwatch(struct xs_handle *h, const char *path, const char *token)
622 {
623 struct iovec iov[2];
625 iov[0].iov_base = (char *)path;
626 iov[0].iov_len = strlen(path) + 1;
627 iov[1].iov_base = (char *)token;
628 iov[1].iov_len = strlen(token) + 1;
630 return xs_bool(xs_talkv(h, NULL, XS_UNWATCH, iov,
631 ARRAY_SIZE(iov), NULL));
632 }
634 /* Start a transaction: changes by others will not be seen during this
635 * transaction, and changes will not be visible to others until end.
636 * You can only have one transaction at any time.
637 * Returns NULL on failure.
638 */
639 struct xs_transaction_handle *xs_transaction_start(struct xs_handle *h)
640 {
641 char *id_str;
642 unsigned long id;
644 id_str = xs_single(h, NULL, XS_TRANSACTION_START, "", NULL);
645 if (id_str == NULL)
646 return NULL;
648 id = strtoul(id_str, NULL, 0);
649 free(id_str);
651 return (struct xs_transaction_handle *)id;
652 }
654 /* End a transaction.
655 * If abandon is true, transaction is discarded instead of committed.
656 * Returns false on failure, which indicates an error: transactions will
657 * not fail spuriously.
658 */
659 bool xs_transaction_end(struct xs_handle *h, struct xs_transaction_handle *t,
660 bool abort)
661 {
662 char abortstr[2];
664 if (abort)
665 strcpy(abortstr, "F");
666 else
667 strcpy(abortstr, "T");
669 return xs_bool(xs_single(h, t, XS_TRANSACTION_END, abortstr, NULL));
670 }
672 /* Introduce a new domain.
673 * This tells the store daemon about a shared memory page and event channel
674 * associated with a domain: the domain uses these to communicate.
675 */
676 bool xs_introduce_domain(struct xs_handle *h,
677 unsigned int domid, unsigned long mfn,
678 unsigned int eventchn)
679 {
680 char domid_str[MAX_STRLEN(domid)];
681 char mfn_str[MAX_STRLEN(mfn)];
682 char eventchn_str[MAX_STRLEN(eventchn)];
683 struct iovec iov[3];
685 sprintf(domid_str, "%u", domid);
686 sprintf(mfn_str, "%lu", mfn);
687 sprintf(eventchn_str, "%u", eventchn);
689 iov[0].iov_base = domid_str;
690 iov[0].iov_len = strlen(domid_str) + 1;
691 iov[1].iov_base = mfn_str;
692 iov[1].iov_len = strlen(mfn_str) + 1;
693 iov[2].iov_base = eventchn_str;
694 iov[2].iov_len = strlen(eventchn_str) + 1;
696 return xs_bool(xs_talkv(h, NULL, XS_INTRODUCE, iov,
697 ARRAY_SIZE(iov), NULL));
698 }
700 static void * single_with_domid(struct xs_handle *h,
701 enum xsd_sockmsg_type type,
702 unsigned int domid)
703 {
704 char domid_str[MAX_STRLEN(domid)];
706 sprintf(domid_str, "%u", domid);
708 return xs_single(h, NULL, type, domid_str, NULL);
709 }
711 bool xs_release_domain(struct xs_handle *h, unsigned int domid)
712 {
713 return xs_bool(single_with_domid(h, XS_RELEASE, domid));
714 }
716 char *xs_get_domain_path(struct xs_handle *h, unsigned int domid)
717 {
718 char domid_str[MAX_STRLEN(domid)];
720 sprintf(domid_str, "%u", domid);
722 return xs_single(h, NULL, XS_GET_DOMAIN_PATH, domid_str, NULL);
723 }
725 bool xs_is_domain_introduced(struct xs_handle *h, unsigned int domid)
726 {
727 return strcmp("F",
728 single_with_domid(h, XS_IS_DOMAIN_INTRODUCED, domid));
729 }
731 /* Only useful for DEBUG versions */
732 char *xs_debug_command(struct xs_handle *h, const char *cmd,
733 void *data, unsigned int len)
734 {
735 struct iovec iov[2];
737 iov[0].iov_base = (void *)cmd;
738 iov[0].iov_len = strlen(cmd) + 1;
739 iov[1].iov_base = data;
740 iov[1].iov_len = len;
742 return xs_talkv(h, NULL, XS_DEBUG, iov,
743 ARRAY_SIZE(iov), NULL);
744 }
746 static int read_message(struct xs_handle *h)
747 {
748 struct xs_stored_msg *msg = NULL;
749 char *body = NULL;
750 int saved_errno;
752 /* Allocate message structure and read the message header. */
753 msg = malloc(sizeof(*msg));
754 if (msg == NULL)
755 goto error;
756 if (!read_all(h->fd, &msg->hdr, sizeof(msg->hdr)))
757 goto error;
759 /* Allocate and read the message body. */
760 body = msg->body = malloc(msg->hdr.len + 1);
761 if (body == NULL)
762 goto error;
763 if (!read_all(h->fd, body, msg->hdr.len))
764 goto error;
765 body[msg->hdr.len] = '\0';
767 if (msg->hdr.type == XS_WATCH_EVENT) {
768 pthread_mutex_lock(&h->watch_mutex);
770 /* Kick users out of their select() loop. */
771 if (list_empty(&h->watch_list) &&
772 (h->watch_pipe[1] != -1))
773 while (write(h->watch_pipe[1], body, 1) != 1)
774 continue;
776 list_add_tail(&msg->list, &h->watch_list);
777 pthread_cond_signal(&h->watch_condvar);
779 pthread_mutex_unlock(&h->watch_mutex);
780 } else {
781 pthread_mutex_lock(&h->reply_mutex);
783 /* There should only ever be one response pending! */
784 if (!list_empty(&h->reply_list)) {
785 pthread_mutex_unlock(&h->reply_mutex);
786 goto error;
787 }
789 list_add_tail(&msg->list, &h->reply_list);
790 pthread_cond_signal(&h->reply_condvar);
792 pthread_mutex_unlock(&h->reply_mutex);
793 }
795 return 0;
797 error:
798 saved_errno = errno;
799 free(msg);
800 free(body);
801 errno = saved_errno;
802 return -1;
803 }
805 static void *read_thread(void *arg)
806 {
807 struct xs_handle *h = arg;
809 while (read_message(h) != -1)
810 continue;
812 return NULL;
813 }
815 /*
816 * Local variables:
817 * c-file-style: "linux"
818 * indent-tabs-mode: t
819 * c-indent-level: 8
820 * c-basic-offset: 8
821 * tab-width: 8
822 * End:
823 */