ia64/xen-unstable

view tools/xenstore/xs.c @ 9075:d8451bb6278c

Remove unused #include <sys/ioctl.h>.

Signed-off-by: Ewan Mellor <ewan@xensource.com>
author emellor@leeni.uk.xensource.com
date Wed Mar 01 17:52:37 2006 +0100 (2006-03-01)
parents e629bb62c63e
children fdc4531aefe0
line source
1 /*
2 Xen Store Daemon interface providing simple tree-like database.
3 Copyright (C) 2005 Rusty Russell IBM Corporation
5 This library is free software; you can redistribute it and/or
6 modify it under the terms of the GNU Lesser General Public
7 License as published by the Free Software Foundation; either
8 version 2.1 of the License, or (at your option) any later version.
10 This library is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Lesser General Public License for more details.
15 You should have received a copy of the GNU Lesser General Public
16 License along with this library; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 */
20 #include <sys/types.h>
21 #include <sys/stat.h>
22 #include <fcntl.h>
23 #include <sys/socket.h>
24 #include <sys/un.h>
25 #include <string.h>
26 #include <unistd.h>
27 #include <stdbool.h>
28 #include <stdlib.h>
29 #include <assert.h>
30 #include <stdio.h>
31 #include <signal.h>
32 #include <stdint.h>
33 #include <errno.h>
34 #include <pthread.h>
35 #include "xs.h"
36 #include "list.h"
37 #include "utils.h"
39 struct xs_stored_msg {
40 struct list_head list;
41 struct xsd_sockmsg hdr;
42 char *body;
43 };
45 struct xs_handle {
46 /* Communications channel to xenstore daemon. */
47 int fd;
49 /*
50 * A read thread which pulls messages off the comms channel and
51 * signals waiters.
52 */
53 pthread_t read_thr;
54 int read_thr_exists;
56 /*
57 * A list of fired watch messages, protected by a mutex. Users can
58 * wait on the conditional variable until a watch is pending.
59 */
60 struct list_head watch_list;
61 pthread_mutex_t watch_mutex;
62 pthread_cond_t watch_condvar;
64 /* Clients can select() on this pipe to wait for a watch to fire. */
65 int watch_pipe[2];
67 /*
68 * A list of replies. Currently only one will ever be outstanding
69 * because we serialise requests. The requester can wait on the
70 * conditional variable for its response.
71 */
72 struct list_head reply_list;
73 pthread_mutex_t reply_mutex;
74 pthread_cond_t reply_condvar;
76 /* One request at a time. */
77 pthread_mutex_t request_mutex;
78 };
80 static int read_message(struct xs_handle *h);
81 static void *read_thread(void *arg);
83 int xs_fileno(struct xs_handle *h)
84 {
85 char c = 0;
87 pthread_mutex_lock(&h->watch_mutex);
89 if ((h->watch_pipe[0] == -1) && (pipe(h->watch_pipe) != -1)) {
90 /* Kick things off if the watch list is already non-empty. */
91 if (!list_empty(&h->watch_list))
92 while (write(h->watch_pipe[1], &c, 1) != 1)
93 continue;
94 }
96 pthread_mutex_unlock(&h->watch_mutex);
98 return h->watch_pipe[0];
99 }
101 static int get_socket(const char *connect_to)
102 {
103 struct sockaddr_un addr;
104 int sock, saved_errno;
106 sock = socket(PF_UNIX, SOCK_STREAM, 0);
107 if (sock < 0)
108 return -1;
110 addr.sun_family = AF_UNIX;
111 strcpy(addr.sun_path, connect_to);
113 if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) != 0) {
114 saved_errno = errno;
115 close(sock);
116 errno = saved_errno;
117 return -1;
118 }
120 return sock;
121 }
123 static int get_dev(const char *connect_to)
124 {
125 return open(connect_to, O_RDWR);
126 }
128 static struct xs_handle *get_handle(const char *connect_to)
129 {
130 struct stat buf;
131 struct xs_handle *h = NULL;
132 int fd = -1, saved_errno;
134 if (stat(connect_to, &buf) != 0)
135 return NULL;
137 if (S_ISSOCK(buf.st_mode))
138 fd = get_socket(connect_to);
139 else
140 fd = get_dev(connect_to);
142 if (fd == -1)
143 return NULL;
145 h = malloc(sizeof(*h));
146 if (h == NULL) {
147 saved_errno = errno;
148 close(fd);
149 errno = saved_errno;
150 return NULL;
151 }
153 memset(h, 0, sizeof(*h));
155 h->fd = fd;
157 /* Watch pipe is allocated on demand in xs_fileno(). */
158 h->watch_pipe[0] = h->watch_pipe[1] = -1;
160 INIT_LIST_HEAD(&h->watch_list);
161 pthread_mutex_init(&h->watch_mutex, NULL);
162 pthread_cond_init(&h->watch_condvar, NULL);
164 INIT_LIST_HEAD(&h->reply_list);
165 pthread_mutex_init(&h->reply_mutex, NULL);
166 pthread_cond_init(&h->reply_condvar, NULL);
168 pthread_mutex_init(&h->request_mutex, NULL);
170 return h;
171 }
173 struct xs_handle *xs_daemon_open(void)
174 {
175 return get_handle(xs_daemon_socket());
176 }
178 struct xs_handle *xs_daemon_open_readonly(void)
179 {
180 return get_handle(xs_daemon_socket_ro());
181 }
183 struct xs_handle *xs_domain_open(void)
184 {
185 return get_handle(xs_domain_dev());
186 }
188 void xs_daemon_close(struct xs_handle *h)
189 {
190 struct xs_stored_msg *msg, *tmsg;
192 pthread_mutex_lock(&h->request_mutex);
193 pthread_mutex_lock(&h->reply_mutex);
194 pthread_mutex_lock(&h->watch_mutex);
196 if (h->read_thr_exists) {
197 /* XXX FIXME: May leak an unpublished message buffer. */
198 pthread_cancel(h->read_thr);
199 pthread_join(h->read_thr, NULL);
200 }
202 list_for_each_entry_safe(msg, tmsg, &h->reply_list, list) {
203 free(msg->body);
204 free(msg);
205 }
207 list_for_each_entry_safe(msg, tmsg, &h->watch_list, list) {
208 free(msg->body);
209 free(msg);
210 }
212 pthread_mutex_unlock(&h->request_mutex);
213 pthread_mutex_unlock(&h->reply_mutex);
214 pthread_mutex_unlock(&h->watch_mutex);
216 if (h->watch_pipe[0] != -1) {
217 close(h->watch_pipe[0]);
218 close(h->watch_pipe[1]);
219 }
221 close(h->fd);
223 free(h);
224 }
226 static bool read_all(int fd, void *data, unsigned int len)
227 {
228 while (len) {
229 int done;
231 done = read(fd, data, len);
232 if (done < 0) {
233 if (errno == EINTR)
234 continue;
235 return false;
236 }
237 if (done == 0) {
238 /* It closed fd on us? EBADF is appropriate. */
239 errno = EBADF;
240 return false;
241 }
242 data += done;
243 len -= done;
244 }
246 return true;
247 }
249 #ifdef XSTEST
250 #define read_all read_all_choice
251 #define xs_write_all write_all_choice
252 #endif
254 static int get_error(const char *errorstring)
255 {
256 unsigned int i;
258 for (i = 0; !streq(errorstring, xsd_errors[i].errstring); i++)
259 if (i == ARRAY_SIZE(xsd_errors) - 1)
260 return EINVAL;
261 return xsd_errors[i].errnum;
262 }
264 /* Adds extra nul terminator, because we generally (always?) hold strings. */
265 static void *read_reply(
266 struct xs_handle *h, enum xsd_sockmsg_type *type, unsigned int *len)
267 {
268 struct xs_stored_msg *msg;
269 char *body;
271 /* Read from comms channel ourselves if there is no reader thread. */
272 if (!h->read_thr_exists && (read_message(h) == -1))
273 return NULL;
275 pthread_mutex_lock(&h->reply_mutex);
276 while (list_empty(&h->reply_list))
277 pthread_cond_wait(&h->reply_condvar, &h->reply_mutex);
278 msg = list_top(&h->reply_list, struct xs_stored_msg, list);
279 list_del(&msg->list);
280 assert(list_empty(&h->reply_list));
281 pthread_mutex_unlock(&h->reply_mutex);
283 *type = msg->hdr.type;
284 if (len)
285 *len = msg->hdr.len;
286 body = msg->body;
288 free(msg);
290 return body;
291 }
293 /* Send message to xs, get malloc'ed reply. NULL and set errno on error. */
294 static void *xs_talkv(struct xs_handle *h, xs_transaction_t t,
295 enum xsd_sockmsg_type type,
296 const struct iovec *iovec,
297 unsigned int num_vecs,
298 unsigned int *len)
299 {
300 struct xsd_sockmsg msg;
301 void *ret = NULL;
302 int saved_errno;
303 unsigned int i;
304 struct sigaction ignorepipe, oldact;
306 msg.tx_id = t;
307 msg.req_id = 0;
308 msg.type = type;
309 msg.len = 0;
310 for (i = 0; i < num_vecs; i++)
311 msg.len += iovec[i].iov_len;
313 ignorepipe.sa_handler = SIG_IGN;
314 sigemptyset(&ignorepipe.sa_mask);
315 ignorepipe.sa_flags = 0;
316 sigaction(SIGPIPE, &ignorepipe, &oldact);
318 pthread_mutex_lock(&h->request_mutex);
320 if (!xs_write_all(h->fd, &msg, sizeof(msg)))
321 goto fail;
323 for (i = 0; i < num_vecs; i++)
324 if (!xs_write_all(h->fd, iovec[i].iov_base, iovec[i].iov_len))
325 goto fail;
327 ret = read_reply(h, &msg.type, len);
328 if (!ret)
329 goto fail;
331 pthread_mutex_unlock(&h->request_mutex);
333 sigaction(SIGPIPE, &oldact, NULL);
334 if (msg.type == XS_ERROR) {
335 saved_errno = get_error(ret);
336 free(ret);
337 errno = saved_errno;
338 return NULL;
339 }
341 if (msg.type != type) {
342 free(ret);
343 saved_errno = EBADF;
344 goto close_fd;
345 }
346 return ret;
348 fail:
349 /* We're in a bad state, so close fd. */
350 saved_errno = errno;
351 pthread_mutex_unlock(&h->request_mutex);
352 sigaction(SIGPIPE, &oldact, NULL);
353 close_fd:
354 close(h->fd);
355 h->fd = -1;
356 errno = saved_errno;
357 return NULL;
358 }
360 /* free(), but don't change errno. */
361 static void free_no_errno(void *p)
362 {
363 int saved_errno = errno;
364 free(p);
365 errno = saved_errno;
366 }
368 /* Simplified version of xs_talkv: single message. */
369 static void *xs_single(struct xs_handle *h, xs_transaction_t t,
370 enum xsd_sockmsg_type type,
371 const char *string,
372 unsigned int *len)
373 {
374 struct iovec iovec;
376 iovec.iov_base = (void *)string;
377 iovec.iov_len = strlen(string) + 1;
378 return xs_talkv(h, t, type, &iovec, 1, len);
379 }
381 static bool xs_bool(char *reply)
382 {
383 if (!reply)
384 return false;
385 free(reply);
386 return true;
387 }
389 char **xs_directory(struct xs_handle *h, xs_transaction_t t,
390 const char *path, unsigned int *num)
391 {
392 char *strings, *p, **ret;
393 unsigned int len;
395 strings = xs_single(h, t, XS_DIRECTORY, path, &len);
396 if (!strings)
397 return NULL;
399 /* Count the strings. */
400 *num = xs_count_strings(strings, len);
402 /* Transfer to one big alloc for easy freeing. */
403 ret = malloc(*num * sizeof(char *) + len);
404 if (!ret) {
405 free_no_errno(strings);
406 return NULL;
407 }
408 memcpy(&ret[*num], strings, len);
409 free_no_errno(strings);
411 strings = (char *)&ret[*num];
412 for (p = strings, *num = 0; p < strings + len; p += strlen(p) + 1)
413 ret[(*num)++] = p;
414 return ret;
415 }
417 /* Get the value of a single file, nul terminated.
418 * Returns a malloced value: call free() on it after use.
419 * len indicates length in bytes, not including the nul.
420 */
421 void *xs_read(struct xs_handle *h, xs_transaction_t t,
422 const char *path, unsigned int *len)
423 {
424 return xs_single(h, t, XS_READ, path, len);
425 }
427 /* Write the value of a single file.
428 * Returns false on failure.
429 */
430 bool xs_write(struct xs_handle *h, xs_transaction_t t,
431 const char *path, const void *data, unsigned int len)
432 {
433 struct iovec iovec[2];
435 iovec[0].iov_base = (void *)path;
436 iovec[0].iov_len = strlen(path) + 1;
437 iovec[1].iov_base = (void *)data;
438 iovec[1].iov_len = len;
440 return xs_bool(xs_talkv(h, t, XS_WRITE, iovec,
441 ARRAY_SIZE(iovec), NULL));
442 }
444 /* Create a new directory.
445 * Returns false on failure, or success if it already exists.
446 */
447 bool xs_mkdir(struct xs_handle *h, xs_transaction_t t,
448 const char *path)
449 {
450 return xs_bool(xs_single(h, t, XS_MKDIR, path, NULL));
451 }
453 /* Destroy a file or directory (directories must be empty).
454 * Returns false on failure, or success if it doesn't exist.
455 */
456 bool xs_rm(struct xs_handle *h, xs_transaction_t t,
457 const char *path)
458 {
459 return xs_bool(xs_single(h, t, XS_RM, path, NULL));
460 }
462 /* Get permissions of node (first element is owner).
463 * Returns malloced array, or NULL: call free() after use.
464 */
465 struct xs_permissions *xs_get_permissions(struct xs_handle *h,
466 xs_transaction_t t,
467 const char *path, unsigned int *num)
468 {
469 char *strings;
470 unsigned int len;
471 struct xs_permissions *ret;
473 strings = xs_single(h, t, XS_GET_PERMS, path, &len);
474 if (!strings)
475 return NULL;
477 /* Count the strings: each one perms then domid. */
478 *num = xs_count_strings(strings, len);
480 /* Transfer to one big alloc for easy freeing. */
481 ret = malloc(*num * sizeof(struct xs_permissions));
482 if (!ret) {
483 free_no_errno(strings);
484 return NULL;
485 }
487 if (!xs_strings_to_perms(ret, *num, strings)) {
488 free_no_errno(ret);
489 ret = NULL;
490 }
492 free(strings);
493 return ret;
494 }
496 /* Set permissions of node (must be owner).
497 * Returns false on failure.
498 */
499 bool xs_set_permissions(struct xs_handle *h,
500 xs_transaction_t t,
501 const char *path,
502 struct xs_permissions *perms,
503 unsigned int num_perms)
504 {
505 unsigned int i;
506 struct iovec iov[1+num_perms];
508 iov[0].iov_base = (void *)path;
509 iov[0].iov_len = strlen(path) + 1;
511 for (i = 0; i < num_perms; i++) {
512 char buffer[MAX_STRLEN(unsigned int)+1];
514 if (!xs_perm_to_string(&perms[i], buffer))
515 goto unwind;
517 iov[i+1].iov_base = strdup(buffer);
518 iov[i+1].iov_len = strlen(buffer) + 1;
519 if (!iov[i+1].iov_base)
520 goto unwind;
521 }
523 if (!xs_bool(xs_talkv(h, t, XS_SET_PERMS, iov, 1+num_perms, NULL)))
524 goto unwind;
525 for (i = 0; i < num_perms; i++)
526 free(iov[i+1].iov_base);
527 return true;
529 unwind:
530 num_perms = i;
531 for (i = 0; i < num_perms; i++)
532 free_no_errno(iov[i+1].iov_base);
533 return false;
534 }
536 /* Watch a node for changes (poll on fd to detect, or call read_watch()).
537 * When the node (or any child) changes, fd will become readable.
538 * Token is returned when watch is read, to allow matching.
539 * Returns false on failure.
540 */
541 bool xs_watch(struct xs_handle *h, const char *path, const char *token)
542 {
543 struct iovec iov[2];
545 /* We dynamically create a reader thread on demand. */
546 pthread_mutex_lock(&h->request_mutex);
547 if (!h->read_thr_exists) {
548 if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0) {
549 pthread_mutex_unlock(&h->request_mutex);
550 return false;
551 }
552 h->read_thr_exists = 1;
553 }
554 pthread_mutex_unlock(&h->request_mutex);
556 iov[0].iov_base = (void *)path;
557 iov[0].iov_len = strlen(path) + 1;
558 iov[1].iov_base = (void *)token;
559 iov[1].iov_len = strlen(token) + 1;
561 return xs_bool(xs_talkv(h, XBT_NULL, XS_WATCH, iov,
562 ARRAY_SIZE(iov), NULL));
563 }
565 /* Find out what node change was on (will block if nothing pending).
566 * Returns array of two pointers: path and token, or NULL.
567 * Call free() after use.
568 */
569 char **xs_read_watch(struct xs_handle *h, unsigned int *num)
570 {
571 struct xs_stored_msg *msg;
572 char **ret, *strings, c = 0;
573 unsigned int num_strings, i;
575 pthread_mutex_lock(&h->watch_mutex);
577 /* Wait on the condition variable for a watch to fire. */
578 while (list_empty(&h->watch_list))
579 pthread_cond_wait(&h->watch_condvar, &h->watch_mutex);
580 msg = list_top(&h->watch_list, struct xs_stored_msg, list);
581 list_del(&msg->list);
583 /* Clear the pipe token if there are no more pending watches. */
584 if (list_empty(&h->watch_list) && (h->watch_pipe[0] != -1))
585 while (read(h->watch_pipe[0], &c, 1) != 1)
586 continue;
588 pthread_mutex_unlock(&h->watch_mutex);
590 assert(msg->hdr.type == XS_WATCH_EVENT);
592 strings = msg->body;
593 num_strings = xs_count_strings(strings, msg->hdr.len);
595 ret = malloc(sizeof(char*) * num_strings + msg->hdr.len);
596 if (!ret) {
597 free_no_errno(strings);
598 free_no_errno(msg);
599 return NULL;
600 }
602 ret[0] = (char *)(ret + num_strings);
603 memcpy(ret[0], strings, msg->hdr.len);
605 free(strings);
606 free(msg);
608 for (i = 1; i < num_strings; i++)
609 ret[i] = ret[i - 1] + strlen(ret[i - 1]) + 1;
611 *num = num_strings;
613 return ret;
614 }
616 /* Remove a watch on a node.
617 * Returns false on failure (no watch on that node).
618 */
619 bool xs_unwatch(struct xs_handle *h, const char *path, const char *token)
620 {
621 struct iovec iov[2];
623 iov[0].iov_base = (char *)path;
624 iov[0].iov_len = strlen(path) + 1;
625 iov[1].iov_base = (char *)token;
626 iov[1].iov_len = strlen(token) + 1;
628 return xs_bool(xs_talkv(h, XBT_NULL, XS_UNWATCH, iov,
629 ARRAY_SIZE(iov), NULL));
630 }
632 /* Start a transaction: changes by others will not be seen during this
633 * transaction, and changes will not be visible to others until end.
634 * You can only have one transaction at any time.
635 * Returns XBT_NULL on failure.
636 */
637 xs_transaction_t xs_transaction_start(struct xs_handle *h)
638 {
639 char *id_str;
640 xs_transaction_t id;
642 id_str = xs_single(h, XBT_NULL, XS_TRANSACTION_START, "", NULL);
643 if (id_str == NULL)
644 return XBT_NULL;
646 id = strtoul(id_str, NULL, 0);
647 free(id_str);
649 return id;
650 }
652 /* End a transaction.
653 * If abandon is true, transaction is discarded instead of committed.
654 * Returns false on failure, which indicates an error: transactions will
655 * not fail spuriously.
656 */
657 bool xs_transaction_end(struct xs_handle *h, xs_transaction_t t,
658 bool abort)
659 {
660 char abortstr[2];
662 if (abort)
663 strcpy(abortstr, "F");
664 else
665 strcpy(abortstr, "T");
667 return xs_bool(xs_single(h, t, XS_TRANSACTION_END, abortstr, NULL));
668 }
670 /* Introduce a new domain.
671 * This tells the store daemon about a shared memory page and event channel
672 * associated with a domain: the domain uses these to communicate.
673 */
674 bool xs_introduce_domain(struct xs_handle *h,
675 unsigned int domid, unsigned long mfn,
676 unsigned int eventchn)
677 {
678 char domid_str[MAX_STRLEN(domid)];
679 char mfn_str[MAX_STRLEN(mfn)];
680 char eventchn_str[MAX_STRLEN(eventchn)];
681 struct iovec iov[3];
683 sprintf(domid_str, "%u", domid);
684 sprintf(mfn_str, "%lu", mfn);
685 sprintf(eventchn_str, "%u", eventchn);
687 iov[0].iov_base = domid_str;
688 iov[0].iov_len = strlen(domid_str) + 1;
689 iov[1].iov_base = mfn_str;
690 iov[1].iov_len = strlen(mfn_str) + 1;
691 iov[2].iov_base = eventchn_str;
692 iov[2].iov_len = strlen(eventchn_str) + 1;
694 return xs_bool(xs_talkv(h, XBT_NULL, XS_INTRODUCE, iov,
695 ARRAY_SIZE(iov), NULL));
696 }
698 static void * single_with_domid(struct xs_handle *h,
699 enum xsd_sockmsg_type type,
700 unsigned int domid)
701 {
702 char domid_str[MAX_STRLEN(domid)];
704 sprintf(domid_str, "%u", domid);
706 return xs_single(h, XBT_NULL, type, domid_str, NULL);
707 }
709 bool xs_release_domain(struct xs_handle *h, unsigned int domid)
710 {
711 return xs_bool(single_with_domid(h, XS_RELEASE, domid));
712 }
714 char *xs_get_domain_path(struct xs_handle *h, unsigned int domid)
715 {
716 char domid_str[MAX_STRLEN(domid)];
718 sprintf(domid_str, "%u", domid);
720 return xs_single(h, XBT_NULL, XS_GET_DOMAIN_PATH, domid_str, NULL);
721 }
723 bool xs_is_domain_introduced(struct xs_handle *h, unsigned int domid)
724 {
725 return strcmp("F",
726 single_with_domid(h, XS_IS_DOMAIN_INTRODUCED, domid));
727 }
729 /* Only useful for DEBUG versions */
730 char *xs_debug_command(struct xs_handle *h, const char *cmd,
731 void *data, unsigned int len)
732 {
733 struct iovec iov[2];
735 iov[0].iov_base = (void *)cmd;
736 iov[0].iov_len = strlen(cmd) + 1;
737 iov[1].iov_base = data;
738 iov[1].iov_len = len;
740 return xs_talkv(h, XBT_NULL, XS_DEBUG, iov,
741 ARRAY_SIZE(iov), NULL);
742 }
744 static int read_message(struct xs_handle *h)
745 {
746 struct xs_stored_msg *msg = NULL;
747 char *body = NULL;
748 int saved_errno;
750 /* Allocate message structure and read the message header. */
751 msg = malloc(sizeof(*msg));
752 if (msg == NULL)
753 goto error;
754 if (!read_all(h->fd, &msg->hdr, sizeof(msg->hdr)))
755 goto error;
757 /* Allocate and read the message body. */
758 body = msg->body = malloc(msg->hdr.len + 1);
759 if (body == NULL)
760 goto error;
761 if (!read_all(h->fd, body, msg->hdr.len))
762 goto error;
763 body[msg->hdr.len] = '\0';
765 if (msg->hdr.type == XS_WATCH_EVENT) {
766 pthread_mutex_lock(&h->watch_mutex);
768 /* Kick users out of their select() loop. */
769 if (list_empty(&h->watch_list) &&
770 (h->watch_pipe[1] != -1))
771 while (write(h->watch_pipe[1], body, 1) != 1)
772 continue;
774 list_add_tail(&msg->list, &h->watch_list);
775 pthread_cond_signal(&h->watch_condvar);
777 pthread_mutex_unlock(&h->watch_mutex);
778 } else {
779 pthread_mutex_lock(&h->reply_mutex);
781 /* There should only ever be one response pending! */
782 if (!list_empty(&h->reply_list)) {
783 pthread_mutex_unlock(&h->reply_mutex);
784 goto error;
785 }
787 list_add_tail(&msg->list, &h->reply_list);
788 pthread_cond_signal(&h->reply_condvar);
790 pthread_mutex_unlock(&h->reply_mutex);
791 }
793 return 0;
795 error:
796 saved_errno = errno;
797 free(msg);
798 free(body);
799 errno = saved_errno;
800 return -1;
801 }
803 static void *read_thread(void *arg)
804 {
805 struct xs_handle *h = arg;
807 while (read_message(h) != -1)
808 continue;
810 return NULL;
811 }
813 /*
814 * Local variables:
815 * c-file-style: "linux"
816 * indent-tabs-mode: t
817 * c-indent-level: 8
818 * c-basic-offset: 8
819 * tab-width: 8
820 * End:
821 */