direct-io.hg

changeset 7306:46bd7564125d

Xenstore client library spawns a reader thread the first
time a watch is registered. Before this it is fine for
caller threads to read the comms channel directly as no
async messages will be received.

This avoids various user tools needlessly creating three
threads where one will do the job.

Signed-off-by: Keir Fraser <keir@xensource.com>
author kaf24@firebug.cl.cam.ac.uk
date Tue Oct 11 13:02:59 2005 +0100 (2005-10-11)
parents 5cca372aec05
children 2d2414d6f938 903d88857972
files tools/xenstore/xs.c
line diff
     1.1 --- a/tools/xenstore/xs.c	Tue Oct 11 12:39:03 2005 +0100
     1.2 +++ b/tools/xenstore/xs.c	Tue Oct 11 13:02:59 2005 +0100
     1.3 @@ -52,6 +52,7 @@ struct xs_handle {
     1.4           * signals waiters.
     1.5           */
     1.6  	pthread_t read_thr;
     1.7 +	int read_thr_exists;
     1.8  
     1.9  	/*
    1.10           * A list of fired watch messages, protected by a mutex. Users can
    1.11 @@ -77,6 +78,7 @@ struct xs_handle {
    1.12  	pthread_mutex_t request_mutex;
    1.13  };
    1.14  
    1.15 +static int read_message(struct xs_handle *h);
    1.16  static void *read_thread(void *arg);
    1.17  
    1.18  int xs_fileno(struct xs_handle *h)
    1.19 @@ -131,7 +133,7 @@ static struct xs_handle *get_handle(cons
    1.20  	int fd = -1, saved_errno;
    1.21  
    1.22  	if (stat(connect_to, &buf) != 0)
    1.23 -		goto error;
    1.24 +		return NULL;
    1.25  
    1.26  	if (S_ISSOCK(buf.st_mode))
    1.27  		fd = get_socket(connect_to);
    1.28 @@ -139,11 +141,17 @@ static struct xs_handle *get_handle(cons
    1.29  		fd = get_dev(connect_to);
    1.30  
    1.31  	if (fd == -1)
    1.32 -		goto error;
    1.33 +		return NULL;
    1.34  
    1.35  	h = malloc(sizeof(*h));
    1.36 -	if (h == NULL)
    1.37 -		goto error;
    1.38 +	if (h == NULL) {
    1.39 +		saved_errno = errno;
    1.40 +		close(fd);
    1.41 +		errno = saved_errno;
    1.42 +		return NULL;
    1.43 +	}
    1.44 +
    1.45 +	memset(h, 0, sizeof(*h));
    1.46  
    1.47  	h->fd = fd;
    1.48  
    1.49 @@ -160,19 +168,7 @@ static struct xs_handle *get_handle(cons
    1.50  
    1.51  	pthread_mutex_init(&h->request_mutex, NULL);
    1.52  
    1.53 -	if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0)
    1.54 -		goto error;
    1.55 -
    1.56  	return h;
    1.57 -
    1.58 - error:
    1.59 -	saved_errno = errno;
    1.60 -	if (h != NULL)
    1.61 -		free(h);
    1.62 -	if (fd != -1)
    1.63 -		close(fd);
    1.64 -	errno = saved_errno;
    1.65 -	return NULL;
    1.66  }
    1.67  
    1.68  struct xs_handle *xs_daemon_open(void)
    1.69 @@ -198,9 +194,11 @@ void xs_daemon_close(struct xs_handle *h
    1.70  	pthread_mutex_lock(&h->reply_mutex);
    1.71  	pthread_mutex_lock(&h->watch_mutex);
    1.72  
    1.73 -	/* XXX FIXME: May leak an unpublished message buffer. */
    1.74 -	pthread_cancel(h->read_thr);
    1.75 -	pthread_join(h->read_thr, NULL);
    1.76 +	if (h->read_thr_exists) {
    1.77 +		/* XXX FIXME: May leak an unpublished message buffer. */
    1.78 +		pthread_cancel(h->read_thr);
    1.79 +		pthread_join(h->read_thr, NULL);
    1.80 +	}
    1.81  
    1.82  	list_for_each_entry_safe(msg, tmsg, &h->reply_list, list) {
    1.83  		free(msg->body);
    1.84 @@ -271,6 +269,10 @@ static void *read_reply(
    1.85  	struct xs_stored_msg *msg;
    1.86  	char *body;
    1.87  
    1.88 +	/* Read from comms channel ourselves if there is no reader thread. */
    1.89 +	if (!h->read_thr_exists && (read_message(h) == -1))
    1.90 +		return NULL;
    1.91 +
    1.92  	pthread_mutex_lock(&h->reply_mutex);
    1.93  	while (list_empty(&h->reply_list))
    1.94  		pthread_cond_wait(&h->reply_condvar, &h->reply_mutex);
    1.95 @@ -541,6 +543,17 @@ bool xs_watch(struct xs_handle *h, const
    1.96  {
    1.97  	struct iovec iov[2];
    1.98  
    1.99 +	/* We dynamically create a reader thread on demand. */
   1.100 +	pthread_mutex_lock(&h->request_mutex);
   1.101 +	if (!h->read_thr_exists) {
   1.102 +		if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0) {
   1.103 +			pthread_mutex_unlock(&h->request_mutex);
   1.104 +			return false;
   1.105 +		}
   1.106 +		h->read_thr_exists = 1;
   1.107 +	}
   1.108 +	pthread_mutex_unlock(&h->request_mutex);
   1.109 +
   1.110  	iov[0].iov_base = (void *)path;
   1.111  	iov[0].iov_len = strlen(path) + 1;
   1.112  	iov[1].iov_base = (void *)token;
   1.113 @@ -717,65 +730,72 @@ char *xs_debug_command(struct xs_handle 
   1.114  			ARRAY_SIZE(iov), NULL);
   1.115  }
   1.116  
   1.117 +static int read_message(struct xs_handle *h)
   1.118 +{
   1.119 +	struct xs_stored_msg *msg = NULL;
   1.120 +	char *body = NULL;
   1.121 +	int saved_errno;
   1.122 +
   1.123 +	/* Allocate message structure and read the message header. */
   1.124 +	msg = malloc(sizeof(*msg));
   1.125 +	if (msg == NULL)
   1.126 +		goto error;
   1.127 +	if (!read_all(h->fd, &msg->hdr, sizeof(msg->hdr)))
   1.128 +		goto error;
   1.129 +
   1.130 +	/* Allocate and read the message body. */
   1.131 +	body = msg->body = malloc(msg->hdr.len + 1);
   1.132 +	if (body == NULL)
   1.133 +		goto error;
   1.134 +	if (!read_all(h->fd, body, msg->hdr.len))
   1.135 +		goto error;
   1.136 +	body[msg->hdr.len] = '\0';
   1.137 +
   1.138 +	if (msg->hdr.type == XS_WATCH_EVENT) {
   1.139 +		pthread_mutex_lock(&h->watch_mutex);
   1.140 +
   1.141 +		/* Kick users out of their select() loop. */
   1.142 +		if (list_empty(&h->watch_list) &&
   1.143 +		    (h->watch_pipe[1] != -1))
   1.144 +			while (write(h->watch_pipe[1], body, 1) != 1)
   1.145 +				continue;
   1.146 +
   1.147 +		list_add_tail(&msg->list, &h->watch_list);
   1.148 +		pthread_cond_signal(&h->watch_condvar);
   1.149 +
   1.150 +		pthread_mutex_unlock(&h->watch_mutex);
   1.151 +	} else {
   1.152 +		pthread_mutex_lock(&h->reply_mutex);
   1.153 +
   1.154 +		/* There should only ever be one response pending! */
   1.155 +		if (!list_empty(&h->reply_list)) {
   1.156 +			pthread_mutex_unlock(&h->reply_mutex);
   1.157 +			goto error;
   1.158 +		}
   1.159 +
   1.160 +		list_add_tail(&msg->list, &h->reply_list);
   1.161 +		pthread_cond_signal(&h->reply_condvar);
   1.162 +
   1.163 +		pthread_mutex_unlock(&h->reply_mutex);
   1.164 +	}
   1.165 +
   1.166 +	return 0;
   1.167 +
   1.168 + error:
   1.169 +	saved_errno = errno;
   1.170 +	free(msg);
   1.171 +	free(body);
   1.172 +	errno = saved_errno;
   1.173 +	return -1;
   1.174 +}
   1.175 +
   1.176  static void *read_thread(void *arg)
   1.177  {
   1.178  	struct xs_handle *h = arg;
   1.179 -	struct xs_stored_msg *msg = NULL;
   1.180 -	char *body = NULL;
   1.181 -
   1.182 -	for (;;) {
   1.183 -		msg = NULL;
   1.184 -		body = NULL;
   1.185 -
   1.186 -		/* Allocate message structure and read the message header. */
   1.187 -		msg = malloc(sizeof(*msg));
   1.188 -		if (msg == NULL)
   1.189 -			goto error;
   1.190 -		if (!read_all(h->fd, &msg->hdr, sizeof(msg->hdr)))
   1.191 -			goto error;
   1.192 -
   1.193 -		/* Allocate and read the message body. */
   1.194 -		body = msg->body = malloc(msg->hdr.len + 1);
   1.195 -		if (body == NULL)
   1.196 -			goto error;
   1.197 -		if (!read_all(h->fd, body, msg->hdr.len))
   1.198 -			goto error;
   1.199 -		body[msg->hdr.len] = '\0';
   1.200 -
   1.201 -		if (msg->hdr.type == XS_WATCH_EVENT) {
   1.202 -			pthread_mutex_lock(&h->watch_mutex);
   1.203  
   1.204 -			/* Kick users out of their select() loop. */
   1.205 -			if (list_empty(&h->watch_list) &&
   1.206 -			    (h->watch_pipe[1] != -1))
   1.207 -				while (write(h->watch_pipe[1], body, 1) != 1)
   1.208 -					continue;
   1.209 -
   1.210 -			list_add_tail(&msg->list, &h->watch_list);
   1.211 -			pthread_cond_signal(&h->watch_condvar);
   1.212 -
   1.213 -			pthread_mutex_unlock(&h->watch_mutex);
   1.214 -		} else {
   1.215 -			pthread_mutex_lock(&h->reply_mutex);
   1.216 +	while (read_message(h) != -1)
   1.217 +		continue;
   1.218  
   1.219 -			/* There should only ever be one response pending! */
   1.220 -			if (!list_empty(&h->reply_list)) {
   1.221 -				pthread_mutex_unlock(&h->reply_mutex);
   1.222 -				goto error;
   1.223 -			}
   1.224 -
   1.225 -			list_add_tail(&msg->list, &h->reply_list);
   1.226 -			pthread_cond_signal(&h->reply_condvar);
   1.227 -
   1.228 -			pthread_mutex_unlock(&h->reply_mutex);
   1.229 -		}
   1.230 -	}
   1.231 -
   1.232 - error:
   1.233 -	if (body != NULL)
   1.234 -		free(body);
   1.235 -	if (msg != NULL)
   1.236 -		free(msg);
   1.237  	return NULL;
   1.238  }
   1.239