direct-io.hg

changeset 7289:5134f3c512c8

Fix cancellation of pending watch events on watch unregistration.
Use wait_event_interruptible() so that our kernel threads spend
their time in the more acceptable 'S' state rather than the more
worrying 'D' state.

Signed-off-by: Keir Fraser <keir@xensource.com>
author kaf24@firebug.cl.cam.ac.uk
date Mon Oct 10 18:16:03 2005 +0100 (2005-10-10)
parents 03d69dbea152
children 3a341763d8b8
files linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_comms.c linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_probe.c linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c
line diff
     1.1 --- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_comms.c	Mon Oct 10 16:57:41 2005 +0100
     1.2 +++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_comms.c	Mon Oct 10 18:16:03 2005 +0100
     1.3 @@ -128,7 +128,7 @@ int xb_write(const void *data, unsigned 
     1.4  		void *dst;
     1.5  		unsigned int avail;
     1.6  
     1.7 -		wait_event(xb_waitq, output_avail(out));
     1.8 +		wait_event_interruptible(xb_waitq, output_avail(out));
     1.9  
    1.10  		mb();
    1.11  		h = *out;
    1.12 @@ -136,6 +136,8 @@ int xb_write(const void *data, unsigned 
    1.13  			return -EIO;
    1.14  
    1.15  		dst = get_output_chunk(&h, out->buf, &avail);
    1.16 +		if (avail == 0)
    1.17 +			continue;
    1.18  		if (avail > len)
    1.19  			avail = len;
    1.20  		memcpy(dst, data, avail);
    1.21 @@ -167,7 +169,7 @@ int xb_read(void *data, unsigned len)
    1.22  		unsigned int avail;
    1.23  		const char *src;
    1.24  
    1.25 -		wait_event(xb_waitq, xs_input_avail());
    1.26 +		wait_event_interruptible(xb_waitq, xs_input_avail());
    1.27  
    1.28  		mb();
    1.29  		h = *in;
    1.30 @@ -175,6 +177,8 @@ int xb_read(void *data, unsigned len)
    1.31  			return -EIO;
    1.32  
    1.33  		src = get_input_chunk(&h, in->buf, &avail);
    1.34 +		if (avail == 0)
    1.35 +			continue;
    1.36  		if (avail > len)
    1.37  			avail = len;
    1.38  		was_full = !output_avail(&h);
     2.1 --- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_probe.c	Mon Oct 10 16:57:41 2005 +0100
     2.2 +++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_probe.c	Mon Oct 10 18:16:03 2005 +0100
     2.3 @@ -43,9 +43,6 @@
     2.4  
     2.5  static struct notifier_block *xenstore_chain;
     2.6  
     2.7 -/* Now used to protect xenbus probes against save/restore. */
     2.8 -static DECLARE_MUTEX(xenbus_lock);
     2.9 -
    2.10  /* If something in array of ids matches this device, return it. */
    2.11  static const struct xenbus_device_id *
    2.12  match_device(const struct xenbus_device_id *arr, struct xenbus_device *dev)
    2.13 @@ -232,18 +229,13 @@ static int xenbus_dev_remove(struct devi
    2.14  static int xenbus_register_driver_common(struct xenbus_driver *drv,
    2.15  					 struct xen_bus_type *bus)
    2.16  {
    2.17 -	int err;
    2.18 -
    2.19  	drv->driver.name = drv->name;
    2.20  	drv->driver.bus = &bus->bus;
    2.21  	drv->driver.owner = drv->owner;
    2.22  	drv->driver.probe = xenbus_dev_probe;
    2.23  	drv->driver.remove = xenbus_dev_remove;
    2.24  
    2.25 -	down(&xenbus_lock);
    2.26 -	err = driver_register(&drv->driver);
    2.27 -	up(&xenbus_lock);
    2.28 -	return err;
    2.29 +	return driver_register(&drv->driver);
    2.30  }
    2.31  
    2.32  int xenbus_register_driver(struct xenbus_driver *drv)
    2.33 @@ -259,9 +251,7 @@ int xenbus_register_backend(struct xenbu
    2.34  
    2.35  void xenbus_unregister_driver(struct xenbus_driver *drv)
    2.36  {
    2.37 -	down(&xenbus_lock);
    2.38  	driver_unregister(&drv->driver);
    2.39 -	up(&xenbus_lock);
    2.40  }
    2.41  EXPORT_SYMBOL(xenbus_unregister_driver);
    2.42  
    2.43 @@ -624,8 +614,6 @@ static int resume_dev(struct device *dev
    2.44  
    2.45  void xenbus_suspend(void)
    2.46  {
    2.47 -	/* We keep lock, so no comms can happen as page moves. */
    2.48 -	down(&xenbus_lock);
    2.49  	bus_for_each_dev(&xenbus_frontend.bus, NULL, NULL, suspend_dev);
    2.50  	bus_for_each_dev(&xenbus_backend.bus, NULL, NULL, suspend_dev);
    2.51  	xs_suspend();
    2.52 @@ -637,32 +625,25 @@ void xenbus_resume(void)
    2.53  	xs_resume();
    2.54  	bus_for_each_dev(&xenbus_frontend.bus, NULL, NULL, resume_dev);
    2.55  	bus_for_each_dev(&xenbus_backend.bus, NULL, NULL, resume_dev);
    2.56 -	up(&xenbus_lock);
    2.57  }
    2.58  
    2.59  int register_xenstore_notifier(struct notifier_block *nb)
    2.60  {
    2.61  	int ret = 0;
    2.62  
    2.63 -	down(&xenbus_lock);
    2.64 -
    2.65  	if (xen_start_info->store_evtchn) {
    2.66  		ret = nb->notifier_call(nb, 0, NULL);
    2.67  	} else {
    2.68  		notifier_chain_register(&xenstore_chain, nb);
    2.69  	}
    2.70  
    2.71 -	up(&xenbus_lock);
    2.72 -
    2.73  	return ret;
    2.74  }
    2.75  EXPORT_SYMBOL(register_xenstore_notifier);
    2.76  
    2.77  void unregister_xenstore_notifier(struct notifier_block *nb)
    2.78  {
    2.79 -	down(&xenbus_lock);
    2.80  	notifier_chain_unregister(&xenstore_chain, nb);
    2.81 -	up(&xenbus_lock);
    2.82  }
    2.83  EXPORT_SYMBOL(unregister_xenstore_notifier);
    2.84  
    2.85 @@ -683,16 +664,16 @@ int do_xenbus_probe(void *unused)
    2.86  		return err;
    2.87  	}
    2.88  
    2.89 -	down(&xenbus_lock);
    2.90  	/* Enumerate devices in xenstore. */
    2.91  	xenbus_probe_devices(&xenbus_frontend);
    2.92  	xenbus_probe_devices(&xenbus_backend);
    2.93 +
    2.94  	/* Watch for changes. */
    2.95  	register_xenbus_watch(&fe_watch);
    2.96  	register_xenbus_watch(&be_watch);
    2.97 +
    2.98  	/* Notify others that xenstore is up */
    2.99  	notifier_call_chain(&xenstore_chain, 0, 0);
   2.100 -	up(&xenbus_lock);
   2.101  
   2.102  	return 0;
   2.103  }
     3.1 --- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c	Mon Oct 10 16:57:41 2005 +0100
     3.2 +++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c	Mon Oct 10 18:16:03 2005 +0100
     3.3 @@ -43,18 +43,18 @@
     3.4  #define streq(a, b) (strcmp((a), (b)) == 0)
     3.5  
     3.6  struct xs_stored_msg {
     3.7 +	struct list_head list;
     3.8 +
     3.9  	struct xsd_sockmsg hdr;
    3.10  
    3.11  	union {
    3.12 -		/* Stored replies. */
    3.13 +		/* Queued replies. */
    3.14  		struct {
    3.15 -			struct list_head list;
    3.16  			char *body;
    3.17  		} reply;
    3.18  
    3.19 -		/* Queued watch callbacks. */
    3.20 +		/* Queued watch events. */
    3.21  		struct {
    3.22 -			struct work_struct work;
    3.23  			struct xenbus_watch *handle;
    3.24  			char **vec;
    3.25  			unsigned int vec_size;
    3.26 @@ -77,9 +77,23 @@ struct xs_handle {
    3.27  
    3.28  static struct xs_handle xs_state;
    3.29  
    3.30 +/* List of registered watches, and a lock to protect it. */
    3.31  static LIST_HEAD(watches);
    3.32  static DEFINE_SPINLOCK(watches_lock);
    3.33 -static struct workqueue_struct *watches_workq;
    3.34 +
    3.35 +/* List of pending watch calbback events, and a lock to protect it. */
    3.36 +static LIST_HEAD(watch_events);
    3.37 +static DEFINE_SPINLOCK(watch_events_lock);
    3.38 +
    3.39 +/*
    3.40 + * Details of the xenwatch callback kernel thread. The thread waits on the
    3.41 + * watch_events_waitq for work to do (queued on watch_events list). When it
    3.42 + * wakes up it acquires the xenwatch_mutex before reading the list and
    3.43 + * carrying out work.
    3.44 + */
    3.45 +static pid_t xenwatch_pid;
    3.46 +static DECLARE_MUTEX(xenwatch_mutex);
    3.47 +static DECLARE_WAIT_QUEUE_HEAD(watch_events_waitq);
    3.48  
    3.49  static int get_error(const char *errorstring)
    3.50  {
    3.51 @@ -105,14 +119,14 @@ static void *read_reply(enum xsd_sockmsg
    3.52  
    3.53  	while (list_empty(&xs_state.reply_list)) {
    3.54  		spin_unlock(&xs_state.reply_lock);
    3.55 -		wait_event(xs_state.reply_waitq,
    3.56 -			   !list_empty(&xs_state.reply_list));
    3.57 +		wait_event_interruptible(xs_state.reply_waitq,
    3.58 +					 !list_empty(&xs_state.reply_list));
    3.59  		spin_lock(&xs_state.reply_lock);
    3.60  	}
    3.61  
    3.62  	msg = list_entry(xs_state.reply_list.next,
    3.63 -			 struct xs_stored_msg, u.reply.list);
    3.64 -	list_del(&msg->u.reply.list);
    3.65 +			 struct xs_stored_msg, list);
    3.66 +	list_del(&msg->list);
    3.67  
    3.68  	spin_unlock(&xs_state.reply_lock);
    3.69  
    3.70 @@ -606,6 +620,7 @@ EXPORT_SYMBOL(register_xenbus_watch);
    3.71  
    3.72  void unregister_xenbus_watch(struct xenbus_watch *watch)
    3.73  {
    3.74 +	struct xs_stored_msg *msg, *tmp;
    3.75  	char token[sizeof(watch) * 2 + 1];
    3.76  	int err;
    3.77  
    3.78 @@ -626,8 +641,22 @@ void unregister_xenbus_watch(struct xenb
    3.79  
    3.80  	up_read(&xs_state.suspend_mutex);
    3.81  
    3.82 -	/* Make sure watch is not in use. */
    3.83 -	flush_workqueue(watches_workq);
    3.84 +	/* Cancel pending watch events. */
    3.85 +	spin_lock(&watch_events_lock);
    3.86 +	list_for_each_entry_safe(msg, tmp, &watch_events, list) {
    3.87 +		if (msg->u.watch.handle != watch)
    3.88 +			continue;
    3.89 +		list_del(&msg->list);
    3.90 +		kfree(msg->u.watch.vec);
    3.91 +		kfree(msg);
    3.92 +	}
    3.93 +	spin_unlock(&watch_events_lock);
    3.94 +
    3.95 +	/* Flush any currently-executing callback, unless we are it. :-) */
    3.96 +	if (current->pid != xenwatch_pid) {
    3.97 +		down(&xenwatch_mutex);
    3.98 +		up(&xenwatch_mutex);
    3.99 +	}
   3.100  }
   3.101  EXPORT_SYMBOL(unregister_xenbus_watch);
   3.102  
   3.103 @@ -653,16 +682,35 @@ void xs_resume(void)
   3.104  	up_write(&xs_state.suspend_mutex);
   3.105  }
   3.106  
   3.107 -static void xenbus_fire_watch(void *arg)
   3.108 +static int xenwatch_thread(void *unused)
   3.109  {
   3.110 -	struct xs_stored_msg *msg = arg;
   3.111 +	struct list_head *ent;
   3.112 +	struct xs_stored_msg *msg;
   3.113 +
   3.114 +	for (;;) {
   3.115 +		wait_event_interruptible(watch_events_waitq,
   3.116 +					 !list_empty(&watch_events));
   3.117 +
   3.118 +		down(&xenwatch_mutex);
   3.119  
   3.120 -	msg->u.watch.handle->callback(msg->u.watch.handle,
   3.121 -				      (const char **)msg->u.watch.vec,
   3.122 -				      msg->u.watch.vec_size);
   3.123 +		spin_lock(&watch_events_lock);
   3.124 +		ent = watch_events.next;
   3.125 +		if (ent != &watch_events)
   3.126 +			list_del(ent);
   3.127 +		spin_unlock(&watch_events_lock);
   3.128  
   3.129 -	kfree(msg->u.watch.vec);
   3.130 -	kfree(msg);
   3.131 +		if (ent != &watch_events) {
   3.132 +			msg = list_entry(ent, struct xs_stored_msg, list);
   3.133 +			msg->u.watch.handle->callback(
   3.134 +				msg->u.watch.handle,
   3.135 +				(const char **)msg->u.watch.vec,
   3.136 +				msg->u.watch.vec_size);
   3.137 +			kfree(msg->u.watch.vec);
   3.138 +			kfree(msg);
   3.139 +		}
   3.140 +
   3.141 +		up(&xenwatch_mutex);
   3.142 +	}
   3.143  }
   3.144  
   3.145  static int process_msg(void)
   3.146 @@ -696,8 +744,6 @@ static int process_msg(void)
   3.147  	body[msg->hdr.len] = '\0';
   3.148  
   3.149  	if (msg->hdr.type == XS_WATCH_EVENT) {
   3.150 -		INIT_WORK(&msg->u.watch.work, xenbus_fire_watch, msg);
   3.151 -
   3.152  		msg->u.watch.vec = split(body, msg->hdr.len,
   3.153  					 &msg->u.watch.vec_size);
   3.154  		if (IS_ERR(msg->u.watch.vec)) {
   3.155 @@ -709,7 +755,10 @@ static int process_msg(void)
   3.156  		msg->u.watch.handle = find_watch(
   3.157  			msg->u.watch.vec[XS_WATCH_TOKEN]);
   3.158  		if (msg->u.watch.handle != NULL) {
   3.159 -			queue_work(watches_workq, &msg->u.watch.work);
   3.160 +			spin_lock(&watch_events_lock);
   3.161 +			list_add_tail(&msg->list, &watch_events);
   3.162 +			wake_up(&watch_events_waitq);
   3.163 +			spin_unlock(&watch_events_lock);
   3.164  		} else {
   3.165  			kfree(msg->u.watch.vec);
   3.166  			kfree(msg);
   3.167 @@ -718,7 +767,7 @@ static int process_msg(void)
   3.168  	} else {
   3.169  		msg->u.reply.body = body;
   3.170  		spin_lock(&xs_state.reply_lock);
   3.171 -		list_add_tail(&msg->u.reply.list, &xs_state.reply_list);
   3.172 +		list_add_tail(&msg->list, &xs_state.reply_list);
   3.173  		spin_unlock(&xs_state.reply_lock);
   3.174  		wake_up(&xs_state.reply_waitq);
   3.175  	}
   3.176 @@ -726,7 +775,7 @@ static int process_msg(void)
   3.177  	return 0;
   3.178  }
   3.179  
   3.180 -static int read_thread(void *unused)
   3.181 +static int xenbus_thread(void *unused)
   3.182  {
   3.183  	int err;
   3.184  
   3.185 @@ -741,7 +790,7 @@ static int read_thread(void *unused)
   3.186  int xs_init(void)
   3.187  {
   3.188  	int err;
   3.189 -	struct task_struct *reader;
   3.190 +	struct task_struct *task;
   3.191  
   3.192  	INIT_LIST_HEAD(&xs_state.reply_list);
   3.193  	spin_lock_init(&xs_state.reply_lock);
   3.194 @@ -755,13 +804,14 @@ int xs_init(void)
   3.195  	if (err)
   3.196  		return err;
   3.197  
   3.198 -	/* Create our own workqueue for executing watch callbacks. */
   3.199 -	watches_workq = create_singlethread_workqueue("xenwatch");
   3.200 -	BUG_ON(watches_workq == NULL);
   3.201 +	task = kthread_run(xenwatch_thread, NULL, "xenwatch");
   3.202 +	if (IS_ERR(task))
   3.203 +		return PTR_ERR(task);
   3.204 +	xenwatch_pid = task->pid;
   3.205  
   3.206 -	reader = kthread_run(read_thread, NULL, "xenbus");
   3.207 -	if (IS_ERR(reader))
   3.208 -		return PTR_ERR(reader);
   3.209 +	task = kthread_run(xenbus_thread, NULL, "xenbus");
   3.210 +	if (IS_ERR(task))
   3.211 +		return PTR_ERR(task);
   3.212  
   3.213  	return 0;
   3.214  }