direct-io.hg

view tools/xenstore/xs.c @ 7480:a90d670c98b9

Change the semantics of GetDomainPath so that it always succeeds, regardless of
whether a domain has been introduced to the store. Added a separate message
XS_IS_DOMAIN_INTRODUCED and API for that (xs_is_domain_introduced) to determine
whether the domain has really been introduced. This change means that the
tools can determine the correct domain path earlier in the domain creation
process, which is particularly a factor with live migration, as it allows us
to create the devices earlier in the process, and unpause the new domain before
performing the introduce. Until recently we already had these features, but
the simplification of the interface between xend and xenstored caused breakage.

No longer clear out the domain path when a domain is introduced -- this was a
hack to work around the recent problematic semantics of GetDomainPath.

Do not write the contents of the info block to the store. All the configuration
info is written to the /vm path, and anything else in the info block is either
dealt with explicitly or is ephemeral and has no place in the store.

Signed-off-by: Ewan Mellor <ewan@xensource.com>
author emellor@leeni.uk.xensource.com
date Sun Oct 23 22:45:15 2005 +0100 (2005-10-23)
parents 75ec60b67f64
children b7afc0be59b2
line source
1 /*
2 Xen Store Daemon interface providing simple tree-like database.
3 Copyright (C) 2005 Rusty Russell IBM Corporation
5 This library is free software; you can redistribute it and/or
6 modify it under the terms of the GNU Lesser General Public
7 License as published by the Free Software Foundation; either
8 version 2.1 of the License, or (at your option) any later version.
10 This library is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Lesser General Public License for more details.
15 You should have received a copy of the GNU Lesser General Public
16 License along with this library; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 */
20 #include <sys/types.h>
21 #include <sys/stat.h>
22 #include <fcntl.h>
23 #include <sys/socket.h>
24 #include <sys/un.h>
25 #include <string.h>
26 #include <unistd.h>
27 #include <stdbool.h>
28 #include <stdlib.h>
29 #include <assert.h>
30 #include <stdio.h>
31 #include <signal.h>
32 #include <stdint.h>
33 #include <errno.h>
34 #include <sys/ioctl.h>
35 #include <pthread.h>
36 #include "xs.h"
37 #include "list.h"
38 #include "utils.h"
40 struct xs_stored_msg {
41 struct list_head list;
42 struct xsd_sockmsg hdr;
43 char *body;
44 };
46 struct xs_handle {
47 /* Communications channel to xenstore daemon. */
48 int fd;
50 /*
51 * A read thread which pulls messages off the comms channel and
52 * signals waiters.
53 */
54 pthread_t read_thr;
55 int read_thr_exists;
57 /*
58 * A list of fired watch messages, protected by a mutex. Users can
59 * wait on the conditional variable until a watch is pending.
60 */
61 struct list_head watch_list;
62 pthread_mutex_t watch_mutex;
63 pthread_cond_t watch_condvar;
65 /* Clients can select() on this pipe to wait for a watch to fire. */
66 int watch_pipe[2];
68 /*
69 * A list of replies. Currently only one will ever be outstanding
70 * because we serialise requests. The requester can wait on the
71 * conditional variable for its response.
72 */
73 struct list_head reply_list;
74 pthread_mutex_t reply_mutex;
75 pthread_cond_t reply_condvar;
77 /* One request at a time. */
78 pthread_mutex_t request_mutex;
79 };
81 static int read_message(struct xs_handle *h);
82 static void *read_thread(void *arg);
84 int xs_fileno(struct xs_handle *h)
85 {
86 char c = 0;
88 pthread_mutex_lock(&h->watch_mutex);
90 if ((h->watch_pipe[0] == -1) && (pipe(h->watch_pipe) != -1)) {
91 /* Kick things off if the watch list is already non-empty. */
92 if (!list_empty(&h->watch_list))
93 while (write(h->watch_pipe[1], &c, 1) != 1)
94 continue;
95 }
97 pthread_mutex_unlock(&h->watch_mutex);
99 return h->watch_pipe[0];
100 }
102 static int get_socket(const char *connect_to)
103 {
104 struct sockaddr_un addr;
105 int sock, saved_errno;
107 sock = socket(PF_UNIX, SOCK_STREAM, 0);
108 if (sock < 0)
109 return -1;
111 addr.sun_family = AF_UNIX;
112 strcpy(addr.sun_path, connect_to);
114 if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) != 0) {
115 saved_errno = errno;
116 close(sock);
117 errno = saved_errno;
118 return -1;
119 }
121 return sock;
122 }
124 static int get_dev(const char *connect_to)
125 {
126 return open(connect_to, O_RDWR);
127 }
129 static struct xs_handle *get_handle(const char *connect_to)
130 {
131 struct stat buf;
132 struct xs_handle *h = NULL;
133 int fd = -1, saved_errno;
135 if (stat(connect_to, &buf) != 0)
136 return NULL;
138 if (S_ISSOCK(buf.st_mode))
139 fd = get_socket(connect_to);
140 else
141 fd = get_dev(connect_to);
143 if (fd == -1)
144 return NULL;
146 h = malloc(sizeof(*h));
147 if (h == NULL) {
148 saved_errno = errno;
149 close(fd);
150 errno = saved_errno;
151 return NULL;
152 }
154 memset(h, 0, sizeof(*h));
156 h->fd = fd;
158 /* Watch pipe is allocated on demand in xs_fileno(). */
159 h->watch_pipe[0] = h->watch_pipe[1] = -1;
161 INIT_LIST_HEAD(&h->watch_list);
162 pthread_mutex_init(&h->watch_mutex, NULL);
163 pthread_cond_init(&h->watch_condvar, NULL);
165 INIT_LIST_HEAD(&h->reply_list);
166 pthread_mutex_init(&h->reply_mutex, NULL);
167 pthread_cond_init(&h->reply_condvar, NULL);
169 pthread_mutex_init(&h->request_mutex, NULL);
171 return h;
172 }
174 struct xs_handle *xs_daemon_open(void)
175 {
176 return get_handle(xs_daemon_socket());
177 }
179 struct xs_handle *xs_daemon_open_readonly(void)
180 {
181 return get_handle(xs_daemon_socket_ro());
182 }
184 struct xs_handle *xs_domain_open(void)
185 {
186 return get_handle(xs_domain_dev());
187 }
189 void xs_daemon_close(struct xs_handle *h)
190 {
191 struct xs_stored_msg *msg, *tmsg;
193 pthread_mutex_lock(&h->request_mutex);
194 pthread_mutex_lock(&h->reply_mutex);
195 pthread_mutex_lock(&h->watch_mutex);
197 if (h->read_thr_exists) {
198 /* XXX FIXME: May leak an unpublished message buffer. */
199 pthread_cancel(h->read_thr);
200 pthread_join(h->read_thr, NULL);
201 }
203 list_for_each_entry_safe(msg, tmsg, &h->reply_list, list) {
204 free(msg->body);
205 free(msg);
206 }
208 list_for_each_entry_safe(msg, tmsg, &h->watch_list, list) {
209 free(msg->body);
210 free(msg);
211 }
213 pthread_mutex_unlock(&h->request_mutex);
214 pthread_mutex_unlock(&h->reply_mutex);
215 pthread_mutex_unlock(&h->watch_mutex);
217 if (h->watch_pipe[0] != -1) {
218 close(h->watch_pipe[0]);
219 close(h->watch_pipe[1]);
220 }
222 close(h->fd);
224 free(h);
225 }
227 static bool read_all(int fd, void *data, unsigned int len)
228 {
229 while (len) {
230 int done;
232 done = read(fd, data, len);
233 if (done < 0) {
234 if (errno == EINTR)
235 continue;
236 return false;
237 }
238 if (done == 0) {
239 /* It closed fd on us? EBADF is appropriate. */
240 errno = EBADF;
241 return false;
242 }
243 data += done;
244 len -= done;
245 }
247 return true;
248 }
250 #ifdef XSTEST
251 #define read_all read_all_choice
252 #define xs_write_all write_all_choice
253 #endif
255 static int get_error(const char *errorstring)
256 {
257 unsigned int i;
259 for (i = 0; !streq(errorstring, xsd_errors[i].errstring); i++)
260 if (i == ARRAY_SIZE(xsd_errors) - 1)
261 return EINVAL;
262 return xsd_errors[i].errnum;
263 }
265 /* Adds extra nul terminator, because we generally (always?) hold strings. */
266 static void *read_reply(
267 struct xs_handle *h, enum xsd_sockmsg_type *type, unsigned int *len)
268 {
269 struct xs_stored_msg *msg;
270 char *body;
272 /* Read from comms channel ourselves if there is no reader thread. */
273 if (!h->read_thr_exists && (read_message(h) == -1))
274 return NULL;
276 pthread_mutex_lock(&h->reply_mutex);
277 while (list_empty(&h->reply_list))
278 pthread_cond_wait(&h->reply_condvar, &h->reply_mutex);
279 msg = list_top(&h->reply_list, struct xs_stored_msg, list);
280 list_del(&msg->list);
281 assert(list_empty(&h->reply_list));
282 pthread_mutex_unlock(&h->reply_mutex);
284 *type = msg->hdr.type;
285 if (len)
286 *len = msg->hdr.len;
287 body = msg->body;
289 free(msg);
291 return body;
292 }
294 /* Send message to xs, get malloc'ed reply. NULL and set errno on error. */
295 static void *xs_talkv(struct xs_handle *h, struct xs_transaction_handle *t,
296 enum xsd_sockmsg_type type,
297 const struct iovec *iovec,
298 unsigned int num_vecs,
299 unsigned int *len)
300 {
301 struct xsd_sockmsg msg;
302 void *ret = NULL;
303 int saved_errno;
304 unsigned int i;
305 struct sigaction ignorepipe, oldact;
307 msg.tx_id = (uint32_t)(unsigned long)t;
308 msg.type = type;
309 msg.len = 0;
310 for (i = 0; i < num_vecs; i++)
311 msg.len += iovec[i].iov_len;
313 ignorepipe.sa_handler = SIG_IGN;
314 sigemptyset(&ignorepipe.sa_mask);
315 ignorepipe.sa_flags = 0;
316 sigaction(SIGPIPE, &ignorepipe, &oldact);
318 pthread_mutex_lock(&h->request_mutex);
320 if (!xs_write_all(h->fd, &msg, sizeof(msg)))
321 goto fail;
323 for (i = 0; i < num_vecs; i++)
324 if (!xs_write_all(h->fd, iovec[i].iov_base, iovec[i].iov_len))
325 goto fail;
327 ret = read_reply(h, &msg.type, len);
328 if (!ret)
329 goto fail;
331 pthread_mutex_unlock(&h->request_mutex);
333 sigaction(SIGPIPE, &oldact, NULL);
334 if (msg.type == XS_ERROR) {
335 saved_errno = get_error(ret);
336 free(ret);
337 errno = saved_errno;
338 return NULL;
339 }
341 if (msg.type != type) {
342 free(ret);
343 saved_errno = EBADF;
344 goto close_fd;
346 }
347 return ret;
349 fail:
350 /* We're in a bad state, so close fd. */
351 saved_errno = errno;
352 pthread_mutex_unlock(&h->request_mutex);
353 sigaction(SIGPIPE, &oldact, NULL);
354 close_fd:
355 close(h->fd);
356 h->fd = -1;
357 errno = saved_errno;
358 return NULL;
359 }
361 /* free(), but don't change errno. */
362 static void free_no_errno(void *p)
363 {
364 int saved_errno = errno;
365 free(p);
366 errno = saved_errno;
367 }
369 /* Simplified version of xs_talkv: single message. */
370 static void *xs_single(struct xs_handle *h, struct xs_transaction_handle *t,
371 enum xsd_sockmsg_type type,
372 const char *string,
373 unsigned int *len)
374 {
375 struct iovec iovec;
377 iovec.iov_base = (void *)string;
378 iovec.iov_len = strlen(string) + 1;
379 return xs_talkv(h, t, type, &iovec, 1, len);
380 }
382 static bool xs_bool(char *reply)
383 {
384 if (!reply)
385 return false;
386 free(reply);
387 return true;
388 }
390 char **xs_directory(struct xs_handle *h, struct xs_transaction_handle *t,
391 const char *path, unsigned int *num)
392 {
393 char *strings, *p, **ret;
394 unsigned int len;
396 strings = xs_single(h, t, XS_DIRECTORY, path, &len);
397 if (!strings)
398 return NULL;
400 /* Count the strings. */
401 *num = xs_count_strings(strings, len);
403 /* Transfer to one big alloc for easy freeing. */
404 ret = malloc(*num * sizeof(char *) + len);
405 if (!ret) {
406 free_no_errno(strings);
407 return NULL;
408 }
409 memcpy(&ret[*num], strings, len);
410 free_no_errno(strings);
412 strings = (char *)&ret[*num];
413 for (p = strings, *num = 0; p < strings + len; p += strlen(p) + 1)
414 ret[(*num)++] = p;
415 return ret;
416 }
418 /* Get the value of a single file, nul terminated.
419 * Returns a malloced value: call free() on it after use.
420 * len indicates length in bytes, not including the nul.
421 */
422 void *xs_read(struct xs_handle *h, struct xs_transaction_handle *t,
423 const char *path, unsigned int *len)
424 {
425 return xs_single(h, t, XS_READ, path, len);
426 }
428 /* Write the value of a single file.
429 * Returns false on failure.
430 */
431 bool xs_write(struct xs_handle *h, struct xs_transaction_handle *t,
432 const char *path, const void *data, unsigned int len)
433 {
434 struct iovec iovec[2];
436 iovec[0].iov_base = (void *)path;
437 iovec[0].iov_len = strlen(path) + 1;
438 iovec[1].iov_base = (void *)data;
439 iovec[1].iov_len = len;
441 return xs_bool(xs_talkv(h, t, XS_WRITE, iovec,
442 ARRAY_SIZE(iovec), NULL));
443 }
445 /* Create a new directory.
446 * Returns false on failure, or success if it already exists.
447 */
448 bool xs_mkdir(struct xs_handle *h, struct xs_transaction_handle *t,
449 const char *path)
450 {
451 return xs_bool(xs_single(h, t, XS_MKDIR, path, NULL));
452 }
454 /* Destroy a file or directory (directories must be empty).
455 * Returns false on failure, or success if it doesn't exist.
456 */
457 bool xs_rm(struct xs_handle *h, struct xs_transaction_handle *t,
458 const char *path)
459 {
460 return xs_bool(xs_single(h, t, XS_RM, path, NULL));
461 }
463 /* Get permissions of node (first element is owner).
464 * Returns malloced array, or NULL: call free() after use.
465 */
466 struct xs_permissions *xs_get_permissions(struct xs_handle *h,
467 struct xs_transaction_handle *t,
468 const char *path, unsigned int *num)
469 {
470 char *strings;
471 unsigned int len;
472 struct xs_permissions *ret;
474 strings = xs_single(h, t, XS_GET_PERMS, path, &len);
475 if (!strings)
476 return NULL;
478 /* Count the strings: each one perms then domid. */
479 *num = xs_count_strings(strings, len);
481 /* Transfer to one big alloc for easy freeing. */
482 ret = malloc(*num * sizeof(struct xs_permissions));
483 if (!ret) {
484 free_no_errno(strings);
485 return NULL;
486 }
488 if (!xs_strings_to_perms(ret, *num, strings)) {
489 free_no_errno(ret);
490 ret = NULL;
491 }
493 free(strings);
494 return ret;
495 }
497 /* Set permissions of node (must be owner).
498 * Returns false on failure.
499 */
500 bool xs_set_permissions(struct xs_handle *h,
501 struct xs_transaction_handle *t,
502 const char *path,
503 struct xs_permissions *perms,
504 unsigned int num_perms)
505 {
506 unsigned int i;
507 struct iovec iov[1+num_perms];
509 iov[0].iov_base = (void *)path;
510 iov[0].iov_len = strlen(path) + 1;
512 for (i = 0; i < num_perms; i++) {
513 char buffer[MAX_STRLEN(unsigned int)+1];
515 if (!xs_perm_to_string(&perms[i], buffer))
516 goto unwind;
518 iov[i+1].iov_base = strdup(buffer);
519 iov[i+1].iov_len = strlen(buffer) + 1;
520 if (!iov[i+1].iov_base)
521 goto unwind;
522 }
524 if (!xs_bool(xs_talkv(h, t, XS_SET_PERMS, iov, 1+num_perms, NULL)))
525 goto unwind;
526 for (i = 0; i < num_perms; i++)
527 free(iov[i+1].iov_base);
528 return true;
530 unwind:
531 num_perms = i;
532 for (i = 0; i < num_perms; i++)
533 free_no_errno(iov[i+1].iov_base);
534 return false;
535 }
537 /* Watch a node for changes (poll on fd to detect, or call read_watch()).
538 * When the node (or any child) changes, fd will become readable.
539 * Token is returned when watch is read, to allow matching.
540 * Returns false on failure.
541 */
542 bool xs_watch(struct xs_handle *h, const char *path, const char *token)
543 {
544 struct iovec iov[2];
546 /* We dynamically create a reader thread on demand. */
547 pthread_mutex_lock(&h->request_mutex);
548 if (!h->read_thr_exists) {
549 if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0) {
550 pthread_mutex_unlock(&h->request_mutex);
551 return false;
552 }
553 h->read_thr_exists = 1;
554 }
555 pthread_mutex_unlock(&h->request_mutex);
557 iov[0].iov_base = (void *)path;
558 iov[0].iov_len = strlen(path) + 1;
559 iov[1].iov_base = (void *)token;
560 iov[1].iov_len = strlen(token) + 1;
562 return xs_bool(xs_talkv(h, NULL, XS_WATCH, iov,
563 ARRAY_SIZE(iov), NULL));
564 }
566 /* Find out what node change was on (will block if nothing pending).
567 * Returns array of two pointers: path and token, or NULL.
568 * Call free() after use.
569 */
570 char **xs_read_watch(struct xs_handle *h, unsigned int *num)
571 {
572 struct xs_stored_msg *msg;
573 char **ret, *strings, c = 0;
574 unsigned int num_strings, i;
576 pthread_mutex_lock(&h->watch_mutex);
578 /* Wait on the condition variable for a watch to fire. */
579 while (list_empty(&h->watch_list))
580 pthread_cond_wait(&h->watch_condvar, &h->watch_mutex);
581 msg = list_top(&h->watch_list, struct xs_stored_msg, list);
582 list_del(&msg->list);
584 /* Clear the pipe token if there are no more pending watches. */
585 if (list_empty(&h->watch_list) && (h->watch_pipe[0] != -1))
586 while (read(h->watch_pipe[0], &c, 1) != 1)
587 continue;
589 pthread_mutex_unlock(&h->watch_mutex);
591 assert(msg->hdr.type == XS_WATCH_EVENT);
593 strings = msg->body;
594 num_strings = xs_count_strings(strings, msg->hdr.len);
596 ret = malloc(sizeof(char*) * num_strings + msg->hdr.len);
597 if (!ret) {
598 free_no_errno(strings);
599 free_no_errno(msg);
600 return NULL;
601 }
603 ret[0] = (char *)(ret + num_strings);
604 memcpy(ret[0], strings, msg->hdr.len);
606 free(strings);
607 free(msg);
609 for (i = 1; i < num_strings; i++)
610 ret[i] = ret[i - 1] + strlen(ret[i - 1]) + 1;
612 *num = num_strings;
614 return ret;
615 }
617 /* Remove a watch on a node.
618 * Returns false on failure (no watch on that node).
619 */
620 bool xs_unwatch(struct xs_handle *h, const char *path, const char *token)
621 {
622 struct iovec iov[2];
624 iov[0].iov_base = (char *)path;
625 iov[0].iov_len = strlen(path) + 1;
626 iov[1].iov_base = (char *)token;
627 iov[1].iov_len = strlen(token) + 1;
629 return xs_bool(xs_talkv(h, NULL, XS_UNWATCH, iov,
630 ARRAY_SIZE(iov), NULL));
631 }
633 /* Start a transaction: changes by others will not be seen during this
634 * transaction, and changes will not be visible to others until end.
635 * You can only have one transaction at any time.
636 * Returns NULL on failure.
637 */
638 struct xs_transaction_handle *xs_transaction_start(struct xs_handle *h)
639 {
640 char *id_str;
641 unsigned long id;
643 id_str = xs_single(h, NULL, XS_TRANSACTION_START, "", NULL);
644 if (id_str == NULL)
645 return NULL;
647 id = strtoul(id_str, NULL, 0);
648 free(id_str);
650 return (struct xs_transaction_handle *)id;
651 }
653 /* End a transaction.
654 * If abandon is true, transaction is discarded instead of committed.
655 * Returns false on failure, which indicates an error: transactions will
656 * not fail spuriously.
657 */
658 bool xs_transaction_end(struct xs_handle *h, struct xs_transaction_handle *t,
659 bool abort)
660 {
661 char abortstr[2];
663 if (abort)
664 strcpy(abortstr, "F");
665 else
666 strcpy(abortstr, "T");
668 return xs_bool(xs_single(h, t, XS_TRANSACTION_END, abortstr, NULL));
669 }
671 /* Introduce a new domain.
672 * This tells the store daemon about a shared memory page and event channel
673 * associated with a domain: the domain uses these to communicate.
674 */
675 bool xs_introduce_domain(struct xs_handle *h,
676 unsigned int domid, unsigned long mfn,
677 unsigned int eventchn)
678 {
679 char domid_str[MAX_STRLEN(domid)];
680 char mfn_str[MAX_STRLEN(mfn)];
681 char eventchn_str[MAX_STRLEN(eventchn)];
682 struct iovec iov[3];
684 sprintf(domid_str, "%u", domid);
685 sprintf(mfn_str, "%lu", mfn);
686 sprintf(eventchn_str, "%u", eventchn);
688 iov[0].iov_base = domid_str;
689 iov[0].iov_len = strlen(domid_str) + 1;
690 iov[1].iov_base = mfn_str;
691 iov[1].iov_len = strlen(mfn_str) + 1;
692 iov[2].iov_base = eventchn_str;
693 iov[2].iov_len = strlen(eventchn_str) + 1;
695 return xs_bool(xs_talkv(h, NULL, XS_INTRODUCE, iov,
696 ARRAY_SIZE(iov), NULL));
697 }
699 static void * single_with_domid(struct xs_handle *h,
700 enum xsd_sockmsg_type type,
701 unsigned int domid)
702 {
703 char domid_str[MAX_STRLEN(domid)];
705 sprintf(domid_str, "%u", domid);
707 return xs_single(h, NULL, type, domid_str, NULL);
708 }
710 bool xs_release_domain(struct xs_handle *h, unsigned int domid)
711 {
712 return xs_bool(single_with_domid(h, XS_RELEASE, domid));
713 }
715 char *xs_get_domain_path(struct xs_handle *h, unsigned int domid)
716 {
717 char domid_str[MAX_STRLEN(domid)];
719 sprintf(domid_str, "%u", domid);
721 return xs_single(h, NULL, XS_GET_DOMAIN_PATH, domid_str, NULL);
722 }
724 bool xs_is_domain_introduced(struct xs_handle *h, unsigned int domid)
725 {
726 return strcmp("F",
727 single_with_domid(h, XS_IS_DOMAIN_INTRODUCED, domid));
728 }
730 /* Only useful for DEBUG versions */
731 char *xs_debug_command(struct xs_handle *h, const char *cmd,
732 void *data, unsigned int len)
733 {
734 struct iovec iov[2];
736 iov[0].iov_base = (void *)cmd;
737 iov[0].iov_len = strlen(cmd) + 1;
738 iov[1].iov_base = data;
739 iov[1].iov_len = len;
741 return xs_talkv(h, NULL, XS_DEBUG, iov,
742 ARRAY_SIZE(iov), NULL);
743 }
745 static int read_message(struct xs_handle *h)
746 {
747 struct xs_stored_msg *msg = NULL;
748 char *body = NULL;
749 int saved_errno;
751 /* Allocate message structure and read the message header. */
752 msg = malloc(sizeof(*msg));
753 if (msg == NULL)
754 goto error;
755 if (!read_all(h->fd, &msg->hdr, sizeof(msg->hdr)))
756 goto error;
758 /* Allocate and read the message body. */
759 body = msg->body = malloc(msg->hdr.len + 1);
760 if (body == NULL)
761 goto error;
762 if (!read_all(h->fd, body, msg->hdr.len))
763 goto error;
764 body[msg->hdr.len] = '\0';
766 if (msg->hdr.type == XS_WATCH_EVENT) {
767 pthread_mutex_lock(&h->watch_mutex);
769 /* Kick users out of their select() loop. */
770 if (list_empty(&h->watch_list) &&
771 (h->watch_pipe[1] != -1))
772 while (write(h->watch_pipe[1], body, 1) != 1)
773 continue;
775 list_add_tail(&msg->list, &h->watch_list);
776 pthread_cond_signal(&h->watch_condvar);
778 pthread_mutex_unlock(&h->watch_mutex);
779 } else {
780 pthread_mutex_lock(&h->reply_mutex);
782 /* There should only ever be one response pending! */
783 if (!list_empty(&h->reply_list)) {
784 pthread_mutex_unlock(&h->reply_mutex);
785 goto error;
786 }
788 list_add_tail(&msg->list, &h->reply_list);
789 pthread_cond_signal(&h->reply_condvar);
791 pthread_mutex_unlock(&h->reply_mutex);
792 }
794 return 0;
796 error:
797 saved_errno = errno;
798 free(msg);
799 free(body);
800 errno = saved_errno;
801 return -1;
802 }
804 static void *read_thread(void *arg)
805 {
806 struct xs_handle *h = arg;
808 while (read_message(h) != -1)
809 continue;
811 return NULL;
812 }
814 /*
815 * Local variables:
816 * c-file-style: "linux"
817 * indent-tabs-mode: t
818 * c-indent-level: 8
819 * c-basic-offset: 8
820 * tab-width: 8
821 * End:
822 */