ia64/xen-unstable

view tools/xenstore/xenstored_watch.c @ 5867:932fc8a1b38d

# HG changeset patch
# User Rusty Russell <rusty@rustcorp.com.au>
# Node ID a92163adedcfcff0d05c965d09da747f3c8aa13e
# Parent 63ab20781afa311300f3a8e832744292014ea7f6

Remove ill-conceived concept of watches blocking reply on connection which did write/mkdir/rm/setperm etc.
This causes deadlocks in real life, and I can't see a sane way of avoiding them: it is reasonable for someone to ignore watch notifications while doing other actions, and that means that we can do other writes. These writes can block pending other watchers; if one of these is the process blocked awaiting our ack, we deadlock.

diff -r 63ab20781afa -r a92163adedcf tools/xenstore/xenstored_core.c
author cl349@firebug.cl.cam.ac.uk
date Tue Jul 26 13:11:01 2005 +0000 (2005-07-26)
parents a83ac0806d6b
children 99366b44c421
line source
1 /*
2 Watch code for Xen Store Daemon.
3 Copyright (C) 2005 Rusty Russell IBM Corporation
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
20 #include <stdio.h>
21 #include <sys/types.h>
22 #include <stdarg.h>
23 #include <stdlib.h>
24 #include <sys/time.h>
25 #include <time.h>
26 #include <assert.h>
27 #include "talloc.h"
28 #include "list.h"
29 #include "xenstored_watch.h"
30 #include "xs_lib.h"
31 #include "utils.h"
32 #include "xenstored_test.h"
33 #include "xenstored_domain.h"
35 /* FIXME: time out unacked watches. */
37 /* We create this if anyone is interested "node", then we pass it from
38 * watch to watch as each connection acks it.
39 */
40 struct watch_event
41 {
42 /* The watch we are firing for (watch->events) */
43 struct list_head list;
45 /* Watches we need to fire for (watches[0]->events == this). */
46 struct watch **watches;
47 unsigned int num_watches;
49 struct timeval timeout;
51 /* Name of node which changed. */
52 char *node;
54 /* For remove, we trigger on all the children of this node too. */
55 bool recurse;
56 };
58 struct watch
59 {
60 struct list_head list;
61 unsigned int priority;
63 /* Current outstanding events applying to this watch. */
64 struct list_head events;
66 /* Is this relative to connnection's implicit path? */
67 bool relative;
69 char *token;
70 char *node;
71 struct connection *conn;
72 };
73 static LIST_HEAD(watches);
75 static struct watch_event *get_first_event(struct connection *conn)
76 {
77 struct watch *watch;
78 struct watch_event *event;
80 /* Find first watch with an event. */
81 list_for_each_entry(watch, &watches, list) {
82 if (watch->conn != conn)
83 continue;
85 event = list_top(&watch->events, struct watch_event, list);
86 if (event)
87 return event;
88 }
89 return NULL;
90 }
92 /* Look through our watches: if any of them have an event, queue it. */
93 void queue_next_event(struct connection *conn)
94 {
95 struct watch_event *event;
96 const char *node;
97 char *buffer;
98 unsigned int len;
100 /* We had a reply queued already? Send it: other end will
101 * discard watch. */
102 if (conn->waiting_reply) {
103 conn->out = conn->waiting_reply;
104 conn->waiting_reply = NULL;
105 conn->waiting_for_ack = false;
106 return;
107 }
109 /* If we're already waiting for ack, don't queue more. */
110 if (conn->waiting_for_ack)
111 return;
113 event = get_first_event(conn);
114 if (!event)
115 return;
117 /* If we decide to cancel, we will reset this. */
118 conn->waiting_for_ack = true;
120 /* If we deleted /foo and they're watching /foo/bar, that's what we
121 * tell them has changed. */
122 if (!is_child(event->node, event->watches[0]->node)) {
123 assert(event->recurse);
124 node = event->watches[0]->node;
125 } else
126 node = event->node;
128 /* If watch placed using relative path, give them relative answer. */
129 if (event->watches[0]->relative) {
130 node += strlen(get_implicit_path(conn));
131 if (node[0] == '/') /* Could be "". */
132 node++;
133 }
135 /* Create reply from path and token */
136 len = strlen(node) + 1 + strlen(event->watches[0]->token) + 1;
137 buffer = talloc_array(conn, char, len);
138 strcpy(buffer, node);
139 strcpy(buffer+strlen(node)+1, event->watches[0]->token);
140 send_reply(conn, XS_WATCH_EVENT, buffer, len);
141 talloc_free(buffer);
142 }
144 static struct watch **find_watches(const char *node, bool recurse,
145 unsigned int *num)
146 {
147 struct watch *i;
148 struct watch **ret = NULL;
150 *num = 0;
152 /* We include children too if this is an rm. */
153 list_for_each_entry(i, &watches, list) {
154 if (is_child(node, i->node) ||
155 (recurse && is_child(i->node, node))) {
156 (*num)++;
157 ret = talloc_realloc(node, ret, struct watch *, *num);
158 ret[*num - 1] = i;
159 }
160 }
161 return ret;
162 }
164 /* FIXME: we fail to fire on out of memory. Should drop connections. */
165 void fire_watches(struct transaction *trans, const char *node, bool recurse)
166 {
167 struct watch **watches;
168 struct watch_event *event;
169 unsigned int num_watches;
171 /* During transactions, don't fire watches. */
172 if (trans)
173 return;
175 watches = find_watches(node, recurse, &num_watches);
176 if (!watches)
177 return;
179 /* Create and fill in info about event. */
180 event = talloc(talloc_autofree_context(), struct watch_event);
181 event->node = talloc_strdup(event, node);
183 /* Tie event to this watch. */
184 event->watches = watches;
185 talloc_steal(event, watches);
186 event->num_watches = num_watches;
187 event->recurse = recurse;
188 list_add_tail(&event->list, &watches[0]->events);
190 /* Warn if not finished after thirty seconds. */
191 gettimeofday(&event->timeout, NULL);
192 event->timeout.tv_sec += 30;
194 /* If connection not doing anything, queue this. */
195 if (!watches[0]->conn->out)
196 queue_next_event(watches[0]->conn);
197 }
199 /* We're done with this event: see if anyone else wants it. */
200 static void move_event_onwards(struct watch_event *event)
201 {
202 list_del(&event->list);
204 event->num_watches--;
205 event->watches++;
206 if (!event->num_watches) {
207 talloc_free(event);
208 return;
209 }
211 list_add_tail(&event->list, &event->watches[0]->events);
213 /* If connection not doing anything, queue this. */
214 if (!event->watches[0]->conn->out)
215 queue_next_event(event->watches[0]->conn);
216 }
218 static void remove_watch_from_events(struct watch *dying_watch)
219 {
220 struct watch *watch;
221 struct watch_event *event;
222 unsigned int i;
224 list_for_each_entry(watch, &watches, list) {
225 list_for_each_entry(event, &watch->events, list) {
226 for (i = 0; i < event->num_watches; i++) {
227 if (event->watches[i] != dying_watch)
228 continue;
230 assert(i != 0);
231 memmove(event->watches+i,
232 event->watches+i+1,
233 (event->num_watches - (i+1))
234 * sizeof(struct watch *));
235 event->num_watches--;
236 }
237 }
238 }
239 }
241 static int destroy_watch(void *_watch)
242 {
243 struct watch *watch = _watch;
244 struct watch_event *event;
246 /* If we have pending events, pass them on to others. */
247 while ((event = list_top(&watch->events, struct watch_event, list)))
248 move_event_onwards(event);
250 /* Remove from global list. */
251 list_del(&watch->list);
253 /* Other events which match this watch must be cleared. */
254 remove_watch_from_events(watch);
256 trace_destroy(watch, "watch");
257 return 0;
258 }
260 /* We keep watches in priority order. */
261 static void insert_watch(struct watch *watch)
262 {
263 struct watch *i;
265 list_for_each_entry(i, &watches, list) {
266 if (i->priority <= watch->priority) {
267 list_add_tail(&watch->list, &i->list);
268 return;
269 }
270 }
272 list_add_tail(&watch->list, &watches);
273 }
275 void shortest_watch_ack_timeout(struct timeval *tv)
276 {
277 struct watch *watch;
279 list_for_each_entry(watch, &watches, list) {
280 struct watch_event *i;
281 list_for_each_entry(i, &watch->events, list) {
282 if (!timerisset(&i->timeout))
283 continue;
284 if (!timerisset(tv) || timercmp(&i->timeout, tv, <))
285 *tv = i->timeout;
286 }
287 }
288 }
290 void check_watch_ack_timeout(void)
291 {
292 struct watch *watch;
293 struct timeval now;
295 gettimeofday(&now, NULL);
296 list_for_each_entry(watch, &watches, list) {
297 struct watch_event *i, *tmp;
298 list_for_each_entry_safe(i, tmp, &watch->events, list) {
299 if (!timerisset(&i->timeout))
300 continue;
301 if (timercmp(&i->timeout, &now, <)) {
302 xprintf("Warning: timeout on watch event %s"
303 " token %s\n",
304 i->node, watch->token);
305 trace_watch_timeout(watch->conn, i->node,
306 watch->token);
307 timerclear(&i->timeout);
308 }
309 }
310 }
311 }
313 bool do_watch(struct connection *conn, struct buffered_data *in)
314 {
315 struct watch *watch;
316 char *vec[3];
317 bool relative;
319 if (get_strings(in, vec, ARRAY_SIZE(vec)) != ARRAY_SIZE(vec))
320 return send_error(conn, EINVAL);
322 relative = !strstarts(vec[0], "/");
323 vec[0] = canonicalize(conn, vec[0]);
324 if (!check_node_perms(conn, vec[0], XS_PERM_READ))
325 return send_error(conn, errno);
327 watch = talloc(conn, struct watch);
328 watch->node = talloc_strdup(watch, vec[0]);
329 watch->token = talloc_strdup(watch, vec[1]);
330 watch->conn = conn;
331 watch->priority = strtoul(vec[2], NULL, 0);
332 watch->relative = relative;
333 INIT_LIST_HEAD(&watch->events);
335 insert_watch(watch);
336 talloc_set_destructor(watch, destroy_watch);
337 trace_create(watch, "watch");
338 return send_ack(conn, XS_WATCH);
339 }
341 bool do_watch_ack(struct connection *conn, const char *token)
342 {
343 struct watch_event *event;
345 if (!token)
346 return send_error(conn, EINVAL);
348 if (!conn->waiting_for_ack)
349 return send_error(conn, ENOENT);
351 event = get_first_event(conn);
352 if (!streq(event->watches[0]->token, token))
353 return send_error(conn, EINVAL);
355 move_event_onwards(event);
356 conn->waiting_for_ack = false;
357 return send_ack(conn, XS_WATCH_ACK);
358 }
360 bool do_unwatch(struct connection *conn, struct buffered_data *in)
361 {
362 struct watch *watch;
363 char *node, *vec[2];
365 if (get_strings(in, vec, ARRAY_SIZE(vec)) != ARRAY_SIZE(vec))
366 return send_error(conn, EINVAL);
368 /* We don't need to worry if we're waiting for an ack for the
369 * watch we're deleting: conn->waiting_for_ack was reset by
370 * this command in consider_message anyway. */
371 node = canonicalize(conn, vec[0]);
372 list_for_each_entry(watch, &watches, list) {
373 if (watch->conn != conn)
374 continue;
376 if (streq(watch->node, node) && streq(watch->token, vec[1])) {
377 talloc_free(watch);
378 return send_ack(conn, XS_UNWATCH);
379 }
380 }
381 return send_error(conn, ENOENT);
382 }
384 #ifdef TESTING
385 void dump_watches(struct connection *conn)
386 {
387 struct watch *watch;
388 struct watch_event *event;
390 /* Find first watch with an event. */
391 list_for_each_entry(watch, &watches, list) {
392 if (watch->conn != conn)
393 continue;
395 printf(" watch on %s token %s prio %i\n",
396 watch->node, watch->token, watch->priority);
397 list_for_each_entry(event, &watch->events, list)
398 printf(" event: %s\n", event->node);
399 }
400 }
401 #endif