ia64/xen-unstable

changeset 8169:ca236a81729d

Implement generic notiication hold-off in the ring macros
(ring.h). Move net and blk split drivers to use it.
blktap probably needs some work. :-)

Signed-off-by: Keir Fraser <keir@xensource.com>
author kaf24@firebug.cl.cam.ac.uk
date Thu Dec 01 20:23:07 2005 +0100 (2005-12-01)
parents 6b18f820f6a7
children f62f9b1732b9
files linux-2.6-xen-sparse/drivers/xen/blkback/blkback.c linux-2.6-xen-sparse/drivers/xen/blkfront/blkfront.c linux-2.6-xen-sparse/drivers/xen/blktap/blktap.c linux-2.6-xen-sparse/drivers/xen/netback/netback.c linux-2.6-xen-sparse/drivers/xen/netfront/netfront.c xen/include/public/io/blkif.h xen/include/public/io/netif.h xen/include/public/io/ring.h
line diff
     1.1 --- a/linux-2.6-xen-sparse/drivers/xen/blkback/blkback.c	Thu Dec 01 13:13:15 2005 +0000
     1.2 +++ b/linux-2.6-xen-sparse/drivers/xen/blkback/blkback.c	Thu Dec 01 20:23:07 2005 +0100
     1.3 @@ -481,6 +481,7 @@ static void make_response(blkif_t *blkif
     1.4  	blkif_response_t *resp;
     1.5  	unsigned long     flags;
     1.6  	blkif_back_ring_t *blk_ring = &blkif->blk_ring;
     1.7 +	int notify;
     1.8  
     1.9  	/* Place on the response ring for the relevant domain. */ 
    1.10  	spin_lock_irqsave(&blkif->blk_ring_lock, flags);
    1.11 @@ -488,13 +489,23 @@ static void make_response(blkif_t *blkif
    1.12  	resp->id        = id;
    1.13  	resp->operation = op;
    1.14  	resp->status    = st;
    1.15 -	wmb(); /* Ensure other side can see the response fields. */
    1.16  	blk_ring->rsp_prod_pvt++;
    1.17 -	RING_PUSH_RESPONSES(blk_ring);
    1.18 +	RING_PUSH_RESPONSES_AND_CHECK_NOTIFY(blk_ring, notify);
    1.19  	spin_unlock_irqrestore(&blkif->blk_ring_lock, flags);
    1.20  
    1.21 -	/* Kick the relevant domain. */
    1.22 -	notify_remote_via_irq(blkif->irq);
    1.23 +	/*
    1.24 +         * Tail check for pending requests. Allows frontend to avoid
    1.25 +         * notifications if requests are already in flight (lower overheads
    1.26 +         * and promotes batching).
    1.27 +         */
    1.28 +	if (!__on_blkdev_list(blkif) &&
    1.29 +	    RING_HAS_UNCONSUMED_REQUESTS(blk_ring)) {
    1.30 +		add_to_blkdev_list_tail(blkif);
    1.31 +		maybe_trigger_blkio_schedule();
    1.32 +	}
    1.33 +
    1.34 +	if (notify)
    1.35 +		notify_remote_via_irq(blkif->irq);
    1.36  }
    1.37  
    1.38  void blkif_deschedule(blkif_t *blkif)
     2.1 --- a/linux-2.6-xen-sparse/drivers/xen/blkfront/blkfront.c	Thu Dec 01 13:13:15 2005 +0000
     2.2 +++ b/linux-2.6-xen-sparse/drivers/xen/blkfront/blkfront.c	Thu Dec 01 20:23:07 2005 +0100
     2.3 @@ -394,8 +394,17 @@ static inline void ADD_ID_TO_FREELIST(
     2.4  
     2.5  static inline void flush_requests(struct blkfront_info *info)
     2.6  {
     2.7 +	RING_IDX old_prod = info->ring.sring->req_prod;
     2.8 +
     2.9  	RING_PUSH_REQUESTS(&info->ring);
    2.10 -	notify_remote_via_irq(info->irq);
    2.11 +
    2.12 +	/*
    2.13 +         * Send new requests /then/ check if any old requests are still in
    2.14 +         * flight. If so then there is no need to send a notification.
    2.15 +         */
    2.16 +	mb();
    2.17 +	if (info->ring.sring->rsp_prod == old_prod)
    2.18 +		notify_remote_via_irq(info->irq);
    2.19  }
    2.20  
    2.21  static void kick_pending_request_queues(struct blkfront_info *info)
    2.22 @@ -631,6 +640,7 @@ static irqreturn_t blkif_int(int irq, vo
    2.23  		return IRQ_HANDLED;
    2.24  	}
    2.25  
    2.26 + again:
    2.27  	rp = info->ring.sring->rsp_prod;
    2.28  	rmb(); /* Ensure we see queued responses up to 'rp'. */
    2.29  
    2.30 @@ -666,6 +676,15 @@ static irqreturn_t blkif_int(int irq, vo
    2.31  
    2.32  	info->ring.rsp_cons = i;
    2.33  
    2.34 +	if (i != info->ring.req_prod_pvt) {
    2.35 +		int more_to_do;
    2.36 +		RING_FINAL_CHECK_FOR_RESPONSES(&info->ring, more_to_do);
    2.37 +		if (more_to_do)
    2.38 +			goto again;
    2.39 +	} else {
    2.40 +		info->ring.sring->rsp_event = i + 1;
    2.41 +	}
    2.42 +
    2.43  	kick_pending_request_queues(info);
    2.44  
    2.45  	spin_unlock_irqrestore(&blkif_io_lock, flags);
    2.46 @@ -751,9 +770,6 @@ static void blkif_recover(struct blkfron
    2.47  
    2.48  	kfree(copy);
    2.49  
    2.50 -	/* info->ring->req_prod will be set when we flush_requests().*/
    2.51 -	wmb();
    2.52 -
    2.53  	/* Kicks things back into life. */
    2.54  	flush_requests(info);
    2.55  
     3.1 --- a/linux-2.6-xen-sparse/drivers/xen/blktap/blktap.c	Thu Dec 01 13:13:15 2005 +0000
     3.2 +++ b/linux-2.6-xen-sparse/drivers/xen/blktap/blktap.c	Thu Dec 01 20:23:07 2005 +0100
     3.3 @@ -375,7 +375,7 @@ static int blktap_ioctl(struct inode *in
     3.4  static unsigned int blktap_poll(struct file *file, poll_table *wait)
     3.5  {
     3.6  	poll_wait(file, &blktap_wait, wait);
     3.7 -	if (RING_HAS_UNPUSHED_REQUESTS(&blktap_ufe_ring)) {
     3.8 +	if (blktap_ufe_ring.req_prod_pvt != blktap_ufe_ring.sring->req_prod) {
     3.9  		flush_tlb_all();
    3.10  		RING_PUSH_REQUESTS(&blktap_ufe_ring);
    3.11  		return POLLIN | POLLRDNORM;
     4.1 --- a/linux-2.6-xen-sparse/drivers/xen/netback/netback.c	Thu Dec 01 13:13:15 2005 +0000
     4.2 +++ b/linux-2.6-xen-sparse/drivers/xen/netback/netback.c	Thu Dec 01 20:23:07 2005 +0100
     4.3 @@ -44,9 +44,6 @@ static mmu_update_t rx_mmu[NET_RX_RING_S
     4.4  static gnttab_transfer_t grant_rx_op[MAX_PENDING_REQS];
     4.5  static unsigned char rx_notify[NR_IRQS];
     4.6  
     4.7 -/* Don't currently gate addition of an interface to the tx scheduling list. */
     4.8 -#define tx_work_exists(_if) (1)
     4.9 -
    4.10  static unsigned long mmap_vstart;
    4.11  #define MMAP_VADDR(_req) (mmap_vstart + ((_req) * PAGE_SIZE))
    4.12  
    4.13 @@ -377,25 +374,22 @@ static void add_to_net_schedule_list_tai
    4.14   * aggressive in avoiding new-packet notifications -- frontend only needs to
    4.15   * send a notification if there are no outstanding unreceived responses.
    4.16   * If we may be buffer transmit buffers for any reason then we must be rather
    4.17 - * more conservative and advertise that we are 'sleeping' this connection here.
    4.18 + * more conservative and treat this as the final check for pending work.
    4.19   */
    4.20  void netif_schedule_work(netif_t *netif)
    4.21  {
    4.22 -	if (RING_HAS_UNCONSUMED_REQUESTS(&netif->tx)) {
    4.23 +	int more_to_do;
    4.24 +
    4.25 +#ifdef CONFIG_XEN_NETDEV_PIPELINED_TRANSMITTER
    4.26 +	more_to_do = RING_HAS_UNCONSUMED_REQUESTS(&netif->tx);
    4.27 +#else
    4.28 +	RING_FINAL_CHECK_FOR_REQUESTS(&netif->tx, more_to_do);
    4.29 +#endif
    4.30 +
    4.31 +	if (more_to_do) {
    4.32  		add_to_net_schedule_list_tail(netif);
    4.33  		maybe_schedule_tx_action();
    4.34  	}
    4.35 -#ifndef CONFIG_XEN_NETDEV_PIPELINED_TRANSMITTER
    4.36 -	else {
    4.37 -		netif->tx.sring->server_is_sleeping = 1;
    4.38 -		mb();
    4.39 -		if (RING_HAS_UNCONSUMED_REQUESTS(&netif->tx)) {
    4.40 -			netif->tx.sring->server_is_sleeping = 0;
    4.41 -			add_to_net_schedule_list_tail(netif);
    4.42 -			maybe_schedule_tx_action();
    4.43 -		}
    4.44 -	}
    4.45 -#endif
    4.46  }
    4.47  
    4.48  void netif_deschedule_work(netif_t *netif)
    4.49 @@ -447,26 +441,6 @@ inline static void net_tx_action_dealloc
    4.50          
    4.51  		pending_ring[MASK_PEND_IDX(pending_prod++)] = pending_idx;
    4.52  
    4.53 -		/*
    4.54 -		 * Scheduling checks must happen after the above response is
    4.55 -		 * posted. This avoids a possible race with a guest OS on
    4.56 -		 * another CPU if that guest is testing against 'resp_prod'
    4.57 -		 * when deciding whether to notify us when it queues additional
    4.58 -                 * packets.
    4.59 -		 */
    4.60 -		mb();
    4.61 -
    4.62 -		if (RING_HAS_UNCONSUMED_REQUESTS(&netif->tx)) {
    4.63 -			add_to_net_schedule_list_tail(netif);
    4.64 -		} else {
    4.65 -			netif->tx.sring->server_is_sleeping = 1;
    4.66 -			mb();
    4.67 -			if (RING_HAS_UNCONSUMED_REQUESTS(&netif->tx)) {
    4.68 -				netif->tx.sring->server_is_sleeping = 0;
    4.69 -				add_to_net_schedule_list_tail(netif);
    4.70 -			}
    4.71 -		}
    4.72 -
    4.73  		netif_put(netif);
    4.74  	}
    4.75  }
    4.76 @@ -482,7 +456,7 @@ static void net_tx_action(unsigned long 
    4.77  	RING_IDX i;
    4.78  	gnttab_map_grant_ref_t *mop;
    4.79  	unsigned int data_len;
    4.80 -	int ret;
    4.81 +	int ret, work_to_do;
    4.82  
    4.83  	if (dealloc_cons != dealloc_prod)
    4.84  		net_tx_action_dealloc();
    4.85 @@ -496,8 +470,8 @@ static void net_tx_action(unsigned long 
    4.86  		netif_get(netif);
    4.87  		remove_from_net_schedule_list(netif);
    4.88  
    4.89 -		/* Work to do? */
    4.90 -		if (!RING_HAS_UNCONSUMED_REQUESTS(&netif->tx)) {
    4.91 +		RING_FINAL_CHECK_FOR_REQUESTS(&netif->tx, work_to_do);
    4.92 +		if (!work_to_do) {
    4.93  			netif_put(netif);
    4.94  			continue;
    4.95  		}
    4.96 @@ -695,10 +669,8 @@ static void netif_page_release(struct pa
    4.97  irqreturn_t netif_be_int(int irq, void *dev_id, struct pt_regs *regs)
    4.98  {
    4.99  	netif_t *netif = dev_id;
   4.100 -	if (tx_work_exists(netif)) {
   4.101 -		add_to_net_schedule_list_tail(netif);
   4.102 -		maybe_schedule_tx_action();
   4.103 -	}
   4.104 +	add_to_net_schedule_list_tail(netif);
   4.105 +	maybe_schedule_tx_action();
   4.106  	return IRQ_HANDLED;
   4.107  }
   4.108  
   4.109 @@ -708,17 +680,25 @@ static void make_tx_response(netif_t *ne
   4.110  {
   4.111  	RING_IDX i = netif->tx.rsp_prod_pvt;
   4.112  	netif_tx_response_t *resp;
   4.113 +	int notify;
   4.114  
   4.115  	resp = RING_GET_RESPONSE(&netif->tx, i);
   4.116  	resp->id     = id;
   4.117  	resp->status = st;
   4.118 -	wmb();
   4.119 +
   4.120  	netif->tx.rsp_prod_pvt = ++i;
   4.121 -	RING_PUSH_RESPONSES(&netif->tx);
   4.122 +	RING_PUSH_RESPONSES_AND_CHECK_NOTIFY(&netif->tx, notify);
   4.123 +	if (notify)
   4.124 +		notify_remote_via_irq(netif->irq);
   4.125  
   4.126 -	mb(); /* Update producer before checking event threshold. */
   4.127 -	if (i == netif->tx.sring->rsp_event)
   4.128 -		notify_remote_via_irq(netif->irq);
   4.129 +#ifdef CONFIG_XEN_NETDEV_PIPELINED_TRANSMITTER
   4.130 +	if (i == netif->tx.req_cons) {
   4.131 +		int more_to_do;
   4.132 +		RING_FINAL_CHECK_FOR_REQUESTS(&netif->tx, more_to_do);
   4.133 +		if (more_to_do)
   4.134 +			add_to_net_schedule_list_tail(netif);
   4.135 +	}
   4.136 +#endif
   4.137  }
   4.138  
   4.139  static int make_rx_response(netif_t *netif, 
   4.140 @@ -730,6 +710,7 @@ static int make_rx_response(netif_t *net
   4.141  {
   4.142  	RING_IDX i = netif->rx.rsp_prod_pvt;
   4.143  	netif_rx_response_t *resp;
   4.144 +	int notify;
   4.145  
   4.146  	resp = RING_GET_RESPONSE(&netif->rx, i);
   4.147  	resp->offset     = offset;
   4.148 @@ -738,12 +719,11 @@ static int make_rx_response(netif_t *net
   4.149  	resp->status     = (s16)size;
   4.150  	if (st < 0)
   4.151  		resp->status = (s16)st;
   4.152 -	wmb();
   4.153 +
   4.154  	netif->rx.rsp_prod_pvt = ++i;
   4.155 -	RING_PUSH_RESPONSES(&netif->rx);
   4.156 +	RING_PUSH_RESPONSES_AND_CHECK_NOTIFY(&netif->rx, notify);
   4.157  
   4.158 -	mb(); /* Update producer before checking event threshold. */
   4.159 -	return (i == netif->rx.sring->rsp_event);
   4.160 +	return notify;
   4.161  }
   4.162  
   4.163  static irqreturn_t netif_be_dbg(int irq, void *dev_id, struct pt_regs *regs)
     5.1 --- a/linux-2.6-xen-sparse/drivers/xen/netfront/netfront.c	Thu Dec 01 13:13:15 2005 +0000
     5.2 +++ b/linux-2.6-xen-sparse/drivers/xen/netfront/netfront.c	Thu Dec 01 20:23:07 2005 +0100
     5.3 @@ -616,6 +616,7 @@ static int network_start_xmit(struct sk_
     5.4  	RING_IDX i;
     5.5  	grant_ref_t ref;
     5.6  	unsigned long mfn;
     5.7 +	int notify;
     5.8  
     5.9  	if (unlikely(np->tx_full)) {
    5.10  		printk(KERN_ALERT "%s: full queue wasn't stopped!\n",
    5.11 @@ -661,9 +662,10 @@ static int network_start_xmit(struct sk_
    5.12  	tx->size = skb->len;
    5.13  	tx->csum_blank = (skb->ip_summed == CHECKSUM_HW);
    5.14  
    5.15 -	wmb(); /* Ensure that backend will see the request. */
    5.16  	np->tx.req_prod_pvt = i + 1;
    5.17 -	RING_PUSH_REQUESTS(&np->tx);
    5.18 +	RING_PUSH_REQUESTS_AND_CHECK_NOTIFY(&np->tx, notify);
    5.19 +	if (notify)
    5.20 +		notify_remote_via_irq(np->irq);
    5.21  
    5.22  	network_tx_buf_gc(dev);
    5.23  
    5.24 @@ -677,13 +679,6 @@ static int network_start_xmit(struct sk_
    5.25  	np->stats.tx_bytes += skb->len;
    5.26  	np->stats.tx_packets++;
    5.27  
    5.28 -	/* Only notify Xen if we really have to. */
    5.29 -	mb();
    5.30 -	if (np->tx.sring->server_is_sleeping) {
    5.31 -		np->tx.sring->server_is_sleeping = 0;
    5.32 -		notify_remote_via_irq(np->irq);
    5.33 -	}
    5.34 -
    5.35  	return 0;
    5.36  
    5.37   drop:
    5.38 @@ -761,7 +756,8 @@ static int netif_poll(struct net_device 
    5.39  					rx->id, rx->status);
    5.40  			RING_GET_REQUEST(&np->rx, np->rx.req_prod_pvt)->id =
    5.41  				rx->id;
    5.42 -			wmb();
    5.43 +			RING_GET_REQUEST(&np->rx, np->rx.req_prod_pvt)->gref =
    5.44 +				ref;
    5.45  			np->rx.req_prod_pvt++;
    5.46  			RING_PUSH_REQUESTS(&np->rx);
    5.47  			work_done--;
    5.48 @@ -882,14 +878,9 @@ static int netif_poll(struct net_device 
    5.49  	if (work_done < budget) {
    5.50  		local_irq_save(flags);
    5.51  
    5.52 -		np->rx.sring->rsp_event = i + 1;
    5.53 -    
    5.54 -		/* Deal with hypervisor racing our resetting of rx_event. */
    5.55 -		mb();
    5.56 -		if (np->rx.sring->rsp_prod == i) {
    5.57 +		RING_FINAL_CHECK_FOR_RESPONSES(&np->rx, more_to_do);
    5.58 +		if (!more_to_do)
    5.59  			__netif_rx_complete(dev);
    5.60 -			more_to_do = 0;
    5.61 -		}
    5.62  
    5.63  		local_irq_restore(flags);
    5.64  	}
    5.65 @@ -930,7 +921,6 @@ static void network_connect(struct net_d
    5.66  
    5.67  	/* Step 1: Reinitialise variables. */
    5.68  	np->tx_full = 0;
    5.69 -	np->rx.sring->rsp_event = np->tx.sring->rsp_event = 1;
    5.70  
    5.71  	/*
    5.72  	 * Step 2: Rebuild the RX and TX ring contents.
    5.73 @@ -972,7 +962,7 @@ static void network_connect(struct net_d
    5.74  		np->stats.tx_bytes += skb->len;
    5.75  		np->stats.tx_packets++;
    5.76  	}
    5.77 -	wmb();
    5.78 +
    5.79  	np->tx.req_prod_pvt = requeue_idx;
    5.80  	RING_PUSH_REQUESTS(&np->tx);
    5.81  
    5.82 @@ -987,7 +977,7 @@ static void network_connect(struct net_d
    5.83  		RING_GET_REQUEST(&np->rx, requeue_idx)->id = i;
    5.84  		requeue_idx++; 
    5.85  	}
    5.86 -	wmb();                
    5.87 +
    5.88  	np->rx.req_prod_pvt = requeue_idx;
    5.89  	RING_PUSH_REQUESTS(&np->rx);
    5.90  
    5.91 @@ -998,7 +988,6 @@ static void network_connect(struct net_d
    5.92  	 * packets.
    5.93  	 */
    5.94  	np->backend_state = BEST_CONNECTED;
    5.95 -	wmb();
    5.96  	notify_remote_via_irq(np->irq);
    5.97  	network_tx_buf_gc(dev);
    5.98  
     6.1 --- a/xen/include/public/io/blkif.h	Thu Dec 01 13:13:15 2005 +0000
     6.2 +++ b/xen/include/public/io/blkif.h	Thu Dec 01 20:23:07 2005 +0100
     6.3 @@ -11,6 +11,19 @@
     6.4  
     6.5  #include "ring.h"
     6.6  
     6.7 +/*
     6.8 + * Front->back notifications: When enqueuing a new request, there is no
     6.9 + * need to send a notification if there are old requests still in flight
    6.10 + * (that is, old_req_prod != sring->rsp_prod). The backend guarantees to check
    6.11 + * for new requests after queuing the response for the last in-flight request.
    6.12 + * (NB. The generic req_event mechanism is not used for blk requests.)
    6.13 + * 
    6.14 + * Back->front notifications: When enqueuing a new response, sending a
    6.15 + * notification can be made conditional on rsp_event (i.e., the generic
    6.16 + * hold-off mechanism provided by the ring macros). Frontends must set
    6.17 + * rsp_event appropriately (e.g., using RING_FINAL_CHECK_FOR_RESPONSES()).
    6.18 + */
    6.19 +
    6.20  #ifndef blkif_vdev_t
    6.21  #define blkif_vdev_t   uint16_t
    6.22  #endif
     7.1 --- a/xen/include/public/io/netif.h	Thu Dec 01 13:13:15 2005 +0000
     7.2 +++ b/xen/include/public/io/netif.h	Thu Dec 01 20:23:07 2005 +0100
     7.3 @@ -11,6 +11,13 @@
     7.4  
     7.5  #include "ring.h"
     7.6  
     7.7 +/*
     7.8 + * Note that there is *never* any need to notify the backend when enqueuing
     7.9 + * receive requests (netif_rx_request_t). Notifications after enqueuing any
    7.10 + * other type of message should be conditional on the appropriate req_event
    7.11 + * or rsp_event field in the shared ring.
    7.12 + */
    7.13 +
    7.14  typedef struct netif_tx_request {
    7.15      grant_ref_t gref;      /* Reference to buffer page */
    7.16      uint16_t offset:15;    /* Offset within buffer page */
     8.1 --- a/xen/include/public/io/ring.h	Thu Dec 01 13:13:15 2005 +0000
     8.2 +++ b/xen/include/public/io/ring.h	Thu Dec 01 20:23:07 2005 +0100
     8.3 @@ -1,10 +1,10 @@
     8.4 -
     8.5 -
     8.6 -
     8.7 -/*
     8.8 +/******************************************************************************
     8.9 + * ring.h
    8.10 + * 
    8.11   * Shared producer-consumer ring macros.
    8.12 + *
    8.13   * Tim Deegan and Andrew Warfield November 2004.
    8.14 - */ 
    8.15 + */
    8.16  
    8.17  #ifndef __XEN_PUBLIC_IO_RING_H__
    8.18  #define __XEN_PUBLIC_IO_RING_H__
    8.19 @@ -28,32 +28,35 @@ typedef unsigned int RING_IDX;
    8.20      (__RD32(((_sz) - (long)&(_s)->ring + (long)(_s)) / sizeof((_s)->ring[0])))
    8.21  
    8.22  /*
    8.23 - *  Macros to make the correct C datatypes for a new kind of ring.
    8.24 + * Macros to make the correct C datatypes for a new kind of ring.
    8.25   * 
    8.26 - *  To make a new ring datatype, you need to have two message structures,
    8.27 - *  let's say request_t, and response_t already defined.
    8.28 + * To make a new ring datatype, you need to have two message structures,
    8.29 + * let's say request_t, and response_t already defined.
    8.30   *
    8.31 - *  In a header where you want the ring datatype declared, you then do:
    8.32 + * In a header where you want the ring datatype declared, you then do:
    8.33   *
    8.34   *     DEFINE_RING_TYPES(mytag, request_t, response_t);
    8.35   *
    8.36 - *  These expand out to give you a set of types, as you can see below.
    8.37 - *  The most important of these are:
    8.38 + * These expand out to give you a set of types, as you can see below.
    8.39 + * The most important of these are:
    8.40   *  
    8.41   *     mytag_sring_t      - The shared ring.
    8.42   *     mytag_front_ring_t - The 'front' half of the ring.
    8.43   *     mytag_back_ring_t  - The 'back' half of the ring.
    8.44   *
    8.45 - *  To initialize a ring in your code you need to know the location and size
    8.46 - *  of the shared memory area (PAGE_SIZE, for instance). To initialise
    8.47 - *  the front half:
    8.48 + * To initialize a ring in your code you need to know the location and size
    8.49 + * of the shared memory area (PAGE_SIZE, for instance). To initialise
    8.50 + * the front half:
    8.51   *
    8.52 - *      mytag_front_ring_t front_ring;
    8.53 + *     mytag_front_ring_t front_ring;
    8.54 + *     SHARED_RING_INIT((mytag_sring_t *)shared_page);
    8.55 + *     FRONT_RING_INIT(&front_ring, (mytag_sring_t *)shared_page, PAGE_SIZE);
    8.56   *
    8.57 - *      SHARED_RING_INIT((mytag_sring_t *)shared_page);
    8.58 - *      FRONT_RING_INIT(&front_ring, (mytag_sring_t *)shared_page, PAGE_SIZE);
    8.59 + * Initializing the back follows similarly (note that only the front
    8.60 + * initializes the shared ring):
    8.61   *
    8.62 - *  Initializing the back follows similarly...
    8.63 + *     mytag_back_ring_t back_ring;
    8.64 + *     BACK_RING_INIT(&back_ring, (mytag_sring_t *)shared_page, PAGE_SIZE);
    8.65   */
    8.66           
    8.67  #define DEFINE_RING_TYPES(__name, __req_t, __rsp_t)                     \
    8.68 @@ -66,10 +69,8 @@ union __name##_sring_entry {            
    8.69                                                                          \
    8.70  /* Shared ring page */                                                  \
    8.71  struct __name##_sring {                                                 \
    8.72 -    RING_IDX req_prod;                                                  \
    8.73 -    RING_IDX rsp_prod;                                                  \
    8.74 -    RING_IDX rsp_event; /* notify client when rsp_prod == rsp_event */  \
    8.75 -    uint8_t  server_is_sleeping; /* notify server to kick off work  */  \
    8.76 +    RING_IDX req_prod, req_event;                                       \
    8.77 +    RING_IDX rsp_prod, rsp_event;                                       \
    8.78      union __name##_sring_entry ring[1]; /* variable-length */           \
    8.79  };                                                                      \
    8.80                                                                          \
    8.81 @@ -95,24 +96,24 @@ typedef struct __name##_front_ring __nam
    8.82  typedef struct __name##_back_ring __name##_back_ring_t
    8.83  
    8.84  /*
    8.85 - *   Macros for manipulating rings.  
    8.86 + * Macros for manipulating rings.  
    8.87   * 
    8.88 - *   FRONT_RING_whatever works on the "front end" of a ring: here 
    8.89 - *   requests are pushed on to the ring and responses taken off it.
    8.90 + * FRONT_RING_whatever works on the "front end" of a ring: here 
    8.91 + * requests are pushed on to the ring and responses taken off it.
    8.92   * 
    8.93 - *   BACK_RING_whatever works on the "back end" of a ring: here 
    8.94 - *   requests are taken off the ring and responses put on.
    8.95 + * BACK_RING_whatever works on the "back end" of a ring: here 
    8.96 + * requests are taken off the ring and responses put on.
    8.97   * 
    8.98 - *   N.B. these macros do NO INTERLOCKS OR FLOW CONTROL.  
    8.99 - *   This is OK in 1-for-1 request-response situations where the 
   8.100 - *   requestor (front end) never has more than RING_SIZE()-1
   8.101 - *   outstanding requests.
   8.102 + * N.B. these macros do NO INTERLOCKS OR FLOW CONTROL.  
   8.103 + * This is OK in 1-for-1 request-response situations where the 
   8.104 + * requestor (front end) never has more than RING_SIZE()-1
   8.105 + * outstanding requests.
   8.106   */
   8.107  
   8.108  /* Initialising empty rings */
   8.109  #define SHARED_RING_INIT(_s) do {                                       \
   8.110 -    (_s)->req_prod = 0;                                                 \
   8.111 -    (_s)->rsp_prod = 0;                                                 \
   8.112 +    (_s)->req_prod  = (_s)->rsp_prod  = 0;                              \
   8.113 +    (_s)->req_event = (_s)->rsp_event = 1;                              \
   8.114  } while(0)
   8.115  
   8.116  #define FRONT_RING_INIT(_r, _s, __size) do {                            \
   8.117 @@ -148,10 +149,6 @@ typedef struct __name##_back_ring __name
   8.118  #define RING_SIZE(_r)                                                   \
   8.119      ((_r)->nr_ents)
   8.120  
   8.121 -/* How many empty slots are on a ring? */
   8.122 -#define RING_PENDING_REQUESTS(_r)                                       \
   8.123 -   ( ((_r)->req_prod_pvt - (_r)->rsp_cons) )
   8.124 -   
   8.125  /* Test if there is an empty slot available on the front ring. 
   8.126   * (This is only meaningful from the front. )
   8.127   */
   8.128 @@ -167,25 +164,6 @@ typedef struct __name##_back_ring __name
   8.129       (((_r)->req_cons - (_r)->rsp_prod_pvt) !=                          \
   8.130        RING_SIZE(_r)) )
   8.131        
   8.132 -/* Test if there are messages waiting to be pushed. */
   8.133 -#define RING_HAS_UNPUSHED_REQUESTS(_r)                                  \
   8.134 -   ( (_r)->req_prod_pvt != (_r)->sring->req_prod )
   8.135 -   
   8.136 -#define RING_HAS_UNPUSHED_RESPONSES(_r)                                 \
   8.137 -   ( (_r)->rsp_prod_pvt != (_r)->sring->rsp_prod )
   8.138 -
   8.139 -/* Copy the private producer pointer into the shared ring so the other end 
   8.140 - * can see the updates we've made. */
   8.141 -#define RING_PUSH_REQUESTS(_r) do {                                     \
   8.142 -    wmb();                                                              \
   8.143 -    (_r)->sring->req_prod = (_r)->req_prod_pvt;                         \
   8.144 -} while (0)
   8.145 -
   8.146 -#define RING_PUSH_RESPONSES(_r) do {                                    \
   8.147 -    wmb();                                                              \
   8.148 -    (_r)->sring->rsp_prod = (_r)->rsp_prod_pvt;                         \
   8.149 -} while (0)
   8.150 -
   8.151  /* Direct access to individual ring elements, by index. */
   8.152  #define RING_GET_REQUEST(_r, _idx)                                      \
   8.153   (&((_r)->sring->ring[                                                  \
   8.154 @@ -201,6 +179,82 @@ typedef struct __name##_back_ring __name
   8.155  #define RING_REQUEST_CONS_OVERFLOW(_r, _cons)                           \
   8.156      (((_cons) - (_r)->rsp_prod_pvt) >= RING_SIZE(_r))
   8.157  
   8.158 +#define RING_PUSH_REQUESTS(_r) do {                                     \
   8.159 +    wmb(); /* back sees requests /before/ updated producer index */     \
   8.160 +    (_r)->sring->req_prod = (_r)->req_prod_pvt;                         \
   8.161 +} while (0)
   8.162 +
   8.163 +#define RING_PUSH_RESPONSES(_r) do {                                    \
   8.164 +    wmb(); /* front sees responses /before/ updated producer index */   \
   8.165 +    (_r)->sring->rsp_prod = (_r)->rsp_prod_pvt;                         \
   8.166 +} while (0)
   8.167 +
   8.168 +/*
   8.169 + * Notification hold-off (req_event and rsp_event):
   8.170 + * 
   8.171 + * When queueing requests or responses on a shared ring, it may not always be
   8.172 + * necessary to notify the remote end. For example, if requests are in flight
   8.173 + * in a backend, the front may be able to queue further requests without
   8.174 + * notifying the back (if the back checks for new requests when it queues
   8.175 + * responses).
   8.176 + * 
   8.177 + * When enqueuing requests or responses:
   8.178 + * 
   8.179 + *  Use RING_PUSH_{REQUESTS,RESPONSES}_AND_CHECK_NOTIFY(). The second argument
   8.180 + *  is a boolean return value. True indicates that the receiver requires an
   8.181 + *  asynchronous notification.
   8.182 + * 
   8.183 + * After dequeuing requests or responses (before sleeping the connection):
   8.184 + * 
   8.185 + *  Use RING_FINAL_CHECK_FOR_REQUESTS() or RING_FINAL_CHECK_FOR_RESPONSES().
   8.186 + *  The second argument is a boolean return value. True indicates that there
   8.187 + *  are pending messages on the ring (i.e., the connection should not be put
   8.188 + *  to sleep).
   8.189 + *  
   8.190 + *  These macros will set the req_event/rsp_event field to trigger a
   8.191 + *  notification on the very next message that is enqueued. If you want to
   8.192 + *  create batches of work (i.e., only receive a notification after several
   8.193 + *  messages have been enqueued) then you will need to create a customised
   8.194 + *  version of the FINAL_CHECK macro in your own code, which sets the event
   8.195 + *  field appropriately.
   8.196 + */
   8.197 +
   8.198 +#define RING_PUSH_REQUESTS_AND_CHECK_NOTIFY(_r, _notify) do {           \
   8.199 +    RING_IDX __old = (_r)->sring->req_prod;                             \
   8.200 +    RING_IDX __new = (_r)->req_prod_pvt;                                \
   8.201 +    wmb(); /* back sees requests /before/ updated producer index */     \
   8.202 +    (_r)->sring->req_prod = __new;                                      \
   8.203 +    mb(); /* back sees new requests /before/ we check req_event */      \
   8.204 +    (_notify) = ((RING_IDX)(__new - (_r)->sring->req_event) <           \
   8.205 +                 (RING_IDX)(__new - __old));                            \
   8.206 +} while (0)
   8.207 +
   8.208 +#define RING_PUSH_RESPONSES_AND_CHECK_NOTIFY(_r, _notify) do {          \
   8.209 +    RING_IDX __old = (_r)->sring->rsp_prod;                             \
   8.210 +    RING_IDX __new = (_r)->rsp_prod_pvt;                                \
   8.211 +    wmb(); /* front sees responses /before/ updated producer index */   \
   8.212 +    (_r)->sring->rsp_prod = __new;                                      \
   8.213 +    mb(); /* front sees new responses /before/ we check rsp_event */    \
   8.214 +    (_notify) = ((RING_IDX)(__new - (_r)->sring->rsp_event) <           \
   8.215 +                 (RING_IDX)(__new - __old));                            \
   8.216 +} while (0)
   8.217 +
   8.218 +#define RING_FINAL_CHECK_FOR_REQUESTS(_r, _work_to_do) do {             \
   8.219 +    (_work_to_do) = RING_HAS_UNCONSUMED_REQUESTS(_r);                   \
   8.220 +    if (_work_to_do) break;                                             \
   8.221 +    (_r)->sring->req_event = (_r)->req_cons + 1;                        \
   8.222 +    mb();                                                               \
   8.223 +    (_work_to_do) = RING_HAS_UNCONSUMED_REQUESTS(_r);                   \
   8.224 +} while (0)
   8.225 +
   8.226 +#define RING_FINAL_CHECK_FOR_RESPONSES(_r, _work_to_do) do {            \
   8.227 +    (_work_to_do) = RING_HAS_UNCONSUMED_RESPONSES(_r);                  \
   8.228 +    if (_work_to_do) break;                                             \
   8.229 +    (_r)->sring->rsp_event = (_r)->rsp_cons + 1;                        \
   8.230 +    mb();                                                               \
   8.231 +    (_work_to_do) = RING_HAS_UNCONSUMED_RESPONSES(_r);                  \
   8.232 +} while (0)
   8.233 +
   8.234  #endif /* __XEN_PUBLIC_IO_RING_H__ */
   8.235  
   8.236  /*