From: Ross Philipson Date: Wed, 21 Oct 2009 18:15:48 +0000 (-0400) Subject: Add v2v async implementation. X-Git-Url: http://xenbits.xensource.com/gitweb?a=commitdiff_plain;h=526f4d389fb4eba4f76599795005d23f36f3ddec;p=xenclient%2Flinux-2.6.27-pq.git Add v2v async implementation. This patch applies over the v2v-core patch and adds the asynchronous functionality to the base code and test driver. It also fixes a number of other bugs and issues in the core v2v code so they should be used together. Changes to be committed: modified: master/series new file: master/v2v-async --- diff --git a/master/series b/master/series index b9c5a62..9d164dd 100644 --- a/master/series +++ b/master/series @@ -354,3 +354,4 @@ bridge-carrier-follows-prio0.patch bsg-add-global-sgio-mutex.patch itpm v2v-core +v2v-async diff --git a/master/v2v-async b/master/v2v-async new file mode 100644 index 0000000..77b32ed --- /dev/null +++ b/master/v2v-async @@ -0,0 +1,2350 @@ +diff --git a/drivers/xen/v2v/v2v.c b/drivers/xen/v2v/v2v.c +index 8f6efd4..57d51be 100644 +--- a/drivers/xen/v2v/v2v.c ++++ b/drivers/xen/v2v/v2v.c +@@ -3,8 +3,8 @@ + * + * V2V interdomain communication driver. + * +- * Copyright (c) 2009 Steven Smith + * Copyright (c) 2009 Ross Philipson ++ * Copyright (c) 2009 Steven Smith + * Copyright (c) 2009 Citrix Systems, Inc. + * + * This program is free software; you can redistribute it and/or +@@ -36,6 +36,7 @@ + #include + #include + #include ++#include + #include + #include + #include +@@ -67,6 +68,9 @@ struct v2v_channel { + + int receive_evtchn_irq; + int send_evtchn_irq; ++ ++ /* async values */ ++ struct v2v_async asv; + + unsigned nr_prod_ring_pages; + unsigned nr_cons_ring_pages; +@@ -77,15 +81,17 @@ struct v2v_channel { + + int has_watch; + int is_temple; ++ int is_sync; + + union { + struct { + int has_grant_alloc; ++ int accepted; + grant_ref_t gref_head; + + grant_ref_t prod_grefs[MAX_RING_PAGES]; + grant_ref_t cons_grefs[MAX_RING_PAGES]; +- grant_ref_t control_gref; ++ grant_ref_t control_gref; + } temple; + struct { + struct vm_struct *prod_area; +@@ -203,12 +209,23 @@ v2v_remote_state_changed(struct xenbus_watch *watch, + return; /* not much we can do */ + + v2v_set_event(channel); ++ ++ /* Callback interested parties that regitered a control callback */ ++ if (channel->is_temple && !channel->u.temple.accepted) ++ return; ++ if (channel->asv.control_cb) ++ channel->asv.control_cb(channel->asv.control_ctx); + } + + static irqreturn_t + send_int(int irq, void *dev_id) + { + struct v2v_channel *channel = dev_id; ++ ++ if (!channel->is_sync) { ++ channel->asv.send_int(channel->asv.send_ctx); ++ return IRQ_HANDLED; ++ } + + if (v2v_wrq_queue(channel, V2V_WAKE_REASON_SEND)) + return IRQ_HANDLED; /* not much we can do */ +@@ -223,6 +240,11 @@ receive_int(int irq, void *dev_id) + { + struct v2v_channel *channel = dev_id; + ++ if (!channel->is_sync) { ++ channel->asv.receive_int(channel->asv.receive_ctx); ++ return IRQ_HANDLED; ++ } ++ + if (v2v_wrq_queue(channel, V2V_WAKE_REASON_RECEIVE)) + return IRQ_HANDLED; /* not much we can do */ + +@@ -231,6 +253,26 @@ receive_int(int irq, void *dev_id) + return IRQ_HANDLED; + } + ++static inline void ++v2v_close_receive_evtchn(struct v2v_channel *channel) ++{ ++ DPRINTK("closing receive evtchn irg: %d\n", channel->receive_evtchn_irq); ++ if (channel->receive_evtchn_irq != 0) { ++ unbind_from_irqhandler(channel->receive_evtchn_irq, channel); ++ channel->receive_evtchn_irq = 0; ++ } ++} ++ ++static inline void ++v2v_close_send_evtchn(struct v2v_channel *channel) ++{ ++ DPRINTK("closing send evtchn irg: %d\n", channel->send_evtchn_irq); ++ if (channel->send_evtchn_irq != 0) { ++ unbind_from_irqhandler(channel->send_evtchn_irq, channel); ++ channel->send_evtchn_irq = 0; ++ } ++} ++ + static void + v2v_destroy_channel(const struct v2v_channel *_chan, int free_temple) + { +@@ -308,10 +350,9 @@ v2v_destroy_channel(const struct v2v_channel *_chan, int free_temple) + DPRINTK("freed grant refs and rings for temple.\n"); + } + +- if (chan->receive_evtchn_irq != 0) +- unbind_from_irqhandler(chan->receive_evtchn_irq, chan); +- if (chan->send_evtchn_irq != 0) +- unbind_from_irqhandler(chan->send_evtchn_irq, chan); ++ v2v_close_receive_evtchn(chan); ++ v2v_close_send_evtchn(chan); ++ + DPRINTK("unbound irq handlers.\n"); + + /* cleanup anything in chan->wait_entry_list once the evtchns are shutdown */ +@@ -340,7 +381,7 @@ v2v_read_peer_domid(struct v2v_channel *chan) + } + + static struct v2v_channel * +-v2v_make_channel(const char *xenbus_prefix) ++v2v_make_channel(const char *xenbus_prefix, struct v2v_async *async_values) + { + struct v2v_channel *chan; + +@@ -357,9 +398,17 @@ v2v_make_channel(const char *xenbus_prefix) + strcpy(chan->local_prefix, xenbus_prefix); + + init_waitqueue_head(&chan->wait_state.wait_event); ++ atomic_set(&chan->wait_state.wait_condition, 0); + spin_lock_init(&chan->wait_list_lock); + INIT_LIST_HEAD(&chan->wait_entry_list); + ++ if (async_values) { ++ memcpy(&chan->asv, async_values, sizeof(struct v2v_async)); ++ chan->is_sync = 0; ++ } ++ else ++ chan->is_sync = 1; ++ + DPRINTK("created channel: %p with local prefix - %s\n", chan, chan->local_prefix); + + return chan; +@@ -479,7 +528,8 @@ v2v_change_local_state(struct v2v_channel *channel, + + int + v2v_listen(const char *xenbus_prefix, struct v2v_channel **channel, +- unsigned prod_ring_page_order, unsigned cons_ring_page_order) ++ unsigned prod_ring_page_order, unsigned cons_ring_page_order, ++ struct v2v_async *async_values) + { + unsigned prod_ring_size = PAGE_SIZE << prod_ring_page_order; + unsigned cons_ring_size = PAGE_SIZE << cons_ring_page_order; +@@ -499,9 +549,15 @@ v2v_listen(const char *xenbus_prefix, struct v2v_channel **channel, + return -EINVAL; + } + ++ if (async_values && ++ (!async_values->receive_int || !async_values->send_int)) { ++ EPRINTK("v2v_listen - invalid async arguments\n"); ++ return -EINVAL; ++ } ++ + *channel = NULL; + +- chan = v2v_make_channel(xenbus_prefix); ++ chan = v2v_make_channel(xenbus_prefix, async_values); + if (!chan) { + EPRINTK("v2v_listen - out of memory making channel\n"); + return -ENOMEM; +@@ -587,8 +643,7 @@ v2v_listen(const char *xenbus_prefix, struct v2v_channel **channel, + gnttab_grant_foreign_access_ref(ref, chan->peer_domid, mfn, 0); + DPRINTK("created control grant ref\n"); + +- err = +- bind_listening_port_to_irqhandler(chan->peer_domid, receive_int, 0, "v2v", chan); ++ err = bind_listening_port_to_irqhandler(chan->peer_domid, receive_int, 0, "v2v", chan); + if (err < 0) { + EPRINTK("bind_listening_port_to_irqhandler(receive) failed - err: %d\n", err); + goto err_out; +@@ -596,8 +651,7 @@ v2v_listen(const char *xenbus_prefix, struct v2v_channel **channel, + chan->receive_evtchn_irq = err; + DPRINTK("created listening port for receive: %d\n", chan->receive_evtchn_irq); + +- err = +- bind_listening_port_to_irqhandler(chan->peer_domid, send_int, 0, "v2v", chan); ++ err = bind_listening_port_to_irqhandler(chan->peer_domid, send_int, 0, "v2v", chan); + if (err < 0) { + EPRINTK("bind_listening_port_to_irqhandler(send) failed - err: %d\n", err); + goto err_out; +@@ -664,10 +718,8 @@ v2v_listen(const char *xenbus_prefix, struct v2v_channel **channel, + chan->u.temple.control_gref); + chan->u.temple.control_gref = GRANT_INVALID_REF; + +- unbind_from_irqhandler(chan->receive_evtchn_irq, chan); +- chan->receive_evtchn_irq = 0; +- unbind_from_irqhandler(chan->send_evtchn_irq, chan); +- chan->send_evtchn_irq = 0; ++ v2v_close_receive_evtchn(chan); ++ v2v_close_send_evtchn(chan); + + /* undo what v2v_connect_channel_xenbus did */ + unregister_xenbus_watch(&chan->remote_state_watch); +@@ -785,7 +837,7 @@ v2v_accept(struct v2v_channel *channel) + } + err = xenbus_transaction_end(xbt, 0); + if (err == 0) +- return 0; ++ goto final; /* success */ + if (err != -EAGAIN) { + EPRINTK("v2v_accept - error commiting xs transaction - err: %d\n", err); + return err; +@@ -794,6 +846,9 @@ v2v_accept(struct v2v_channel *channel) + } + } + ++final: ++ channel->u.temple.accepted = 1; ++ + /* the initial state of the send event is set to wakeup for send since there + is always send room on startup of the rings */ + err = v2v_wrq_queue(channel, V2V_WAKE_REASON_SEND); +@@ -808,7 +863,8 @@ v2v_accept(struct v2v_channel *channel) + EXPORT_SYMBOL_GPL(v2v_accept); + + int +-v2v_connect(const char *xenbus_prefix, struct v2v_channel **channel) ++v2v_connect(const char *xenbus_prefix, struct v2v_channel **channel, ++ struct v2v_async *async_values) + { + int err = 0; + struct xenbus_transaction xbt = {0}; +@@ -825,10 +881,16 @@ v2v_connect(const char *xenbus_prefix, struct v2v_channel **channel) + return -EINVAL; + } + ++ if (async_values && ++ (!async_values->receive_int || !async_values->send_int)) { ++ EPRINTK("v2v_connect - invalid async arguments\n"); ++ return -EINVAL; ++ } ++ + *channel = NULL; + + for (;;) { +- chan = v2v_make_channel(xenbus_prefix); ++ chan = v2v_make_channel(xenbus_prefix, async_values); + if (!chan) { + EPRINTK("v2v_connect - out of memory making channel\n"); + return -ENOMEM; +@@ -970,9 +1032,9 @@ v2v_connect(const char *xenbus_prefix, struct v2v_channel **channel) + chan->control = chan->u.supplicant.control_area->addr; + DPRINTK("mapped controle grant ref\n"); + +- err = +- bind_interdomain_evtchn_to_irqhandler(chan->peer_domid, chan->u.supplicant.prod_evtchn_port, +- receive_int, 0, "v2v", chan); ++ err = bind_interdomain_evtchn_to_irqhandler(chan->peer_domid, ++ chan->u.supplicant.prod_evtchn_port, ++ receive_int, 0, "v2v", chan); + if (err < 0) { + EPRINTK("bind_interdomain_evtchn_to_irqhandler(receive_int) failed - err: %d\n", err); + goto err_out; +@@ -980,9 +1042,9 @@ v2v_connect(const char *xenbus_prefix, struct v2v_channel **channel) + chan->receive_evtchn_irq = err; + DPRINTK("bound interdomain port for receive: %d\n", chan->receive_evtchn_irq); + +- err = +- bind_interdomain_evtchn_to_irqhandler(chan->peer_domid, chan->u.supplicant.cons_evtchn_port, +- send_int, 0, "v2v", chan); ++ err = bind_interdomain_evtchn_to_irqhandler(chan->peer_domid, ++ chan->u.supplicant.cons_evtchn_port, ++ send_int, 0, "v2v", chan); + if (err < 0) { + EPRINTK("bind_interdomain_evtchn_to_irqhandler(send_int) failed - err: %d\n", err); + goto err_out; +@@ -1052,6 +1114,8 @@ v2v_disconnect_temple(const struct v2v_channel *_channel) + err = v2v_change_local_state(channel, XBT_NIL, v2v_state_disconnecting); + if (err) + return err; ++ ++ channel->u.temple.accepted = 0; + + /* Get the other end to disconnect */ + for (;;) { +@@ -1167,14 +1231,8 @@ v2v_disconnect_temple(const struct v2v_channel *_channel) + if (!any_failed) + gnttab_free_grant_references(channel->u.temple.gref_head); + +- if (channel->receive_evtchn_irq != 0) { +- unbind_from_irqhandler(channel->receive_evtchn_irq, channel); +- channel->receive_evtchn_irq = 0; +- } +- if (channel->send_evtchn_irq != 0) { +- unbind_from_irqhandler(channel->send_evtchn_irq, channel); +- channel->send_evtchn_irq = 0; +- } ++ v2v_close_receive_evtchn(channel); ++ v2v_close_send_evtchn(channel); + + DPRINTK("finished disconnecting temple, channel: %p\n", channel); + +@@ -1197,14 +1255,8 @@ v2v_disconnect_supplicant(const struct v2v_channel *_channel) + channel->cons_sring = NULL; + channel->control = NULL; + +- if (channel->receive_evtchn_irq != 0) { +- unbind_from_irqhandler(channel->receive_evtchn_irq, channel); +- channel->receive_evtchn_irq = 0; +- } +- if (channel->send_evtchn_irq != 0) { +- unbind_from_irqhandler(channel->send_evtchn_irq, channel); +- channel->send_evtchn_irq = 0; +- } ++ v2v_close_receive_evtchn(channel); ++ v2v_close_send_evtchn(channel); + + err = v2v_change_local_state(channel, XBT_NIL, v2v_state_disconnected); + if (err) { +@@ -1239,6 +1291,11 @@ EXPORT_SYMBOL_GPL(v2v_get_wait_state); + u8 + v2v_get_wake_reason(struct v2v_channel *channel, u8 reasons) + { ++ if (!channel->is_sync) { ++ if ((reasons & (V2V_WAKE_REASON_SEND|V2V_WAKE_REASON_RECEIVE)) != 0) ++ return -EINVAL; ++ } ++ + return v2v_wrq_dequeue(channel, reasons); + } + EXPORT_SYMBOL_GPL(v2v_get_wake_reason); +@@ -1248,6 +1305,11 @@ v2v_set_wake_reason(struct v2v_channel *channel, u8 reason) + { + int err; + ++ if (!channel->is_sync) { ++ if ((reason == V2V_WAKE_REASON_SEND)||(reason == V2V_WAKE_REASON_RECEIVE)) ++ return -EINVAL; ++ } ++ + err = v2v_wrq_queue(channel, reason); + if (err) + return err; +@@ -1299,12 +1361,15 @@ retry: + goto retry; + } + /* ### clear the event (reset) */ +- v2v_wrq_clear(channel, V2V_WAKE_REASON_RECEIVE); ++ if (channel->is_sync) ++ v2v_wrq_clear(channel, V2V_WAKE_REASON_RECEIVE); + + if (nc2_final_check_for_messages(&channel->nc2_rings, prod)) { + /* ### now set the event */ +- v2v_wrq_queue(channel, V2V_WAKE_REASON_RECEIVE); +- v2v_set_event(channel); ++ if (channel->is_sync) { ++ v2v_wrq_queue(channel, V2V_WAKE_REASON_RECEIVE); ++ v2v_set_event(channel); ++ } + goto retry; + } + return -ENODATA; +@@ -1378,14 +1443,17 @@ v2v_nc2_prep_message(struct v2v_channel *channel, + v2v_nc2_send_messages(channel); + if (!nc2_can_send_payload_bytes(&channel->nc2_rings, rounded_size)) { + /* ### clear the event (reset) */ +- v2v_wrq_clear(channel, V2V_WAKE_REASON_SEND); ++ if (channel->is_sync) ++ v2v_wrq_clear(channel, V2V_WAKE_REASON_SEND); + + if (!nc2_can_send_payload_bytes(&channel->nc2_rings, rounded_size)) + return -EAGAIN; + + /* ### now set the event */ +- v2v_wrq_queue(channel, V2V_WAKE_REASON_SEND); +- v2v_set_event(channel); ++ if (channel->is_sync) { ++ v2v_wrq_queue(channel, V2V_WAKE_REASON_SEND); ++ v2v_set_event(channel); ++ } + } + __nc2_avoid_ring_wrap(&channel->nc2_rings, rounded_size); + hdr = __nc2_get_message_ptr(&channel->nc2_rings); +@@ -1499,4 +1567,3 @@ module_init(v2v_init); + module_exit(v2v_cleanup); + + MODULE_LICENSE("Dual BSD/GPL"); +- +diff --git a/drivers/xen/v2v/v2v_private.h b/drivers/xen/v2v/v2v_private.h +index c623a2d..4af02ee 100644 +--- a/drivers/xen/v2v/v2v_private.h ++++ b/drivers/xen/v2v/v2v_private.h +@@ -70,14 +70,15 @@ void v2v_xenops_grant_unmap(struct vm_struct *vm_area, + #define INVALID_GRANT_HANDLE ((grant_handle_t)~0U) + #define GRANT_INVALID_REF 0 + +-#define DPRINTK(fmt, args...) \ +- pr_debug("v2v (%s:%d) " fmt, \ ++#define DPRINTK(fmt, args...) \ ++ pr_debug("v2v-dbg (%s:%d) " fmt, \ + __FUNCTION__, __LINE__, ##args) +-#define IPRINTK(fmt, args...) \ ++ ++#define IPRINTK(fmt, args...) \ + printk(KERN_INFO "v2v: " fmt, ##args) +-#define WPRINTK(fmt, args...) \ ++#define WPRINTK(fmt, args...) \ + printk(KERN_WARNING "v2v: " fmt, ##args) +-#define EPRINTK(fmt, args...) \ ++#define EPRINTK(fmt, args...) \ + printk(KERN_ERR "v2v: " fmt, ##args) + + #endif /* !V2V_PRIVATE_H__ */ +diff --git a/drivers/xen/v2v/v2vdrv.c b/drivers/xen/v2v/v2vdrv.c +index 0c362bb..dfa8a43 100644 +--- a/drivers/xen/v2v/v2vdrv.c ++++ b/drivers/xen/v2v/v2vdrv.c +@@ -49,11 +49,13 @@ + + static const char v2vdrv_usage[] = \ + "Run as either the listener or connector by writing the following information:\n" \ +- "listener,\n" \ +- "connector,,[count],[size],[timout]\n" \ ++ "listener,,[async],[fastrx]\n" \ ++ "connector,,[async],[fastrx],[count],[size],[timout]\n" \ + " local_prefix: XenStore local path for the V2V endpoint\n" \ +- " count: (opt) number of messages to send\n" \ +- " size: (opt) size of each message\n" \ ++ " async: (opt) set to 1 to run in async mode, 0 for sync mode\n" \ ++ " fastrx: (opt) set to 1 to use fast rx mode (on with sync mode)\n" \ ++ " count: (opt) number of messages to send\n" \ ++ " size: (opt) size of each message\n" \ + " timout: (opt) timeout in ms for awaiting response\n" \ + "Also a xenstore reader test routine:\n" \ + "reader,[:value]\n"; +@@ -61,13 +63,6 @@ static const char v2vdrv_usage[] = \ + extern void v2vdrv_run_connector(struct v2vdrv_config *config); + extern void v2vdrv_run_listener(struct v2vdrv_config *config); + +-enum v2vdrv_role { +- role_unknown = 0, +- role_listener, +- role_connector, +- role_reader +-}; +- + static void v2vdrv_run_reader(struct v2vdrv_config *config) + { + char *path = NULL; +@@ -127,37 +122,40 @@ out: + kfree(value); + } + +-static enum v2vdrv_role v2vdrv_parse_config(const char *cfgstr, struct v2vdrv_config *config) ++static void ++v2vdrv_parse_config(const char *cfgstr, struct v2vdrv_config *config) + { + size_t len; +- enum v2vdrv_role role = role_unknown; + int i, err, val, parsed; + ++ config->role = role_unknown; + len = strlen(cfgstr); +- ++ + do { + if ((len > 9)&&(strncmp(cfgstr, "listener,", 9) == 0)) { + cfgstr += 9; + len -= 9; +- role = role_listener; ++ config->role = role_listener; + } + else if ((len > 10)&&(strncmp(cfgstr, "connector,", 10) == 0)) { + cfgstr += 10; + len -= 10; +- role = role_connector; ++ config->role = role_connector; + } + else if ((len > 7)&&(strncmp(cfgstr, "reader,", 7) == 0)) { + cfgstr += 7; + len -= 7; +- role = role_reader; ++ config->role = role_reader; + } + else + break; +- ++ + /* some defaults */ + config->xfer_count = 0; /* connect only, no data */ + config->xfer_size = 512; +- config->timeout = V2VDRV_RESPONSE_WAIT_TIMEOUT; ++ config->timeout = msecs_to_jiffies((const unsigned long)V2VDRV_RESPONSE_WAIT_TIMEOUT); ++ config->async = 0; /* default to sync mode */ ++ config->fastrx = 0; /* default to no fastrx mode */ + + for (i = 0; i < len; i++) + if (cfgstr[i] == ',') +@@ -166,29 +164,50 @@ static enum v2vdrv_role v2vdrv_parse_config(const char *cfgstr, struct v2vdrv_co + /* this is our local prefix */ + config->local_prefix = kmalloc(i + 1, GFP_KERNEL); + if (!config->local_prefix) { +- role = role_unknown; ++ config->role = role_unknown; + break; + } + strncpy(config->local_prefix, cfgstr, i); + config->local_prefix[i] = '\0'; + +- if (role == role_listener || role == role_reader) ++ if (config->role == role_reader) + break; + + if (i == len) /* no opt args */ + break; + +- /* else this is a connector with more opt args */ ++ /* else this is a listener/connector with more opt args */ + cfgstr += i + 1; + parsed = 0; + err = sscanf(cfgstr, "%d,%n", &val, &parsed); + if (err < 1) + break; +- ++ config->async = val; ++ if (parsed == 0) ++ break; ++ ++ cfgstr += parsed; ++ parsed = 0; ++ err = sscanf(cfgstr, "%d,%n", &val, &parsed); ++ if (err < 1) ++ break; ++ config->fastrx = (uint32_t)val; ++ if (parsed == 0) ++ break; ++ ++ /* all others are connector only */ ++ if (config->role == role_listener) ++ break; ++ ++ cfgstr += parsed; ++ parsed = 0; ++ err = sscanf(cfgstr, "%d,%n", &val, &parsed); ++ if (err < 1) ++ break; + config->xfer_count = (uint32_t)val; + if (parsed == 0) + break; +- ++ + cfgstr += parsed; + parsed = 0; + err = sscanf(cfgstr, "%d,%n", &val, &parsed); +@@ -205,8 +224,6 @@ static enum v2vdrv_role v2vdrv_parse_config(const char *cfgstr, struct v2vdrv_co + break; + config->timeout = msecs_to_jiffies((const unsigned long)val); + } while (0); +- +- return role; + } + + static ssize_t v2vdrv_read(struct file *file, char __user *buf, +@@ -235,7 +252,6 @@ static ssize_t v2vdrv_write(struct file *file, const char __user *buf, + { + char *cfgstr = NULL; + struct v2vdrv_config *config = NULL; +- enum v2vdrv_role role; + size_t written = -EFAULT; + + if (count == 0) +@@ -262,21 +278,31 @@ static ssize_t v2vdrv_write(struct file *file, const char __user *buf, + if (cfgstr[count - 1] == '\n') + cfgstr[count - 1] = '\0'; + +- role = v2vdrv_parse_config(cfgstr, config); ++ v2vdrv_parse_config(cfgstr, config); ++ ++ /* input checking */ ++ if (config->async && config->fastrx) { ++ printk("V2V-DRV WARNING cannot use async and fastrx mode together; disabling fastrx.\n"); ++ config->fastrx = 0; ++ } + +- if (role == role_listener) { ++ if (config->role == role_listener) { + printk("V2V-DRV loaded listener config...\n"); + printk(" local prefix: %s\n", config->local_prefix); ++ printk(" async: %d\n", config->async); ++ printk(" fastrx: %d\n", config->fastrx); + v2vdrv_run_listener(config); + } +- else if (role == role_connector) { ++ else if (config->role == role_connector) { + printk("V2V-DRV loaded connector config...\n"); + printk(" local prefix: %s\n", config->local_prefix); ++ printk(" async: %d\n", config->async); ++ printk(" fastrx: %d\n", config->fastrx); + printk(" count: %d size: %d timeout: %lu\n", + config->xfer_count, config->xfer_size, config->timeout); + v2vdrv_run_connector(config); + } +- else if (role == role_reader) { ++ else if (config->role == role_reader) { + printk("V2V-DRV loaded reader config...\n"); + printk(" local prefix: %s\n", config->local_prefix); + v2vdrv_run_reader(config); +diff --git a/drivers/xen/v2v/v2vdrv.h b/drivers/xen/v2v/v2vdrv.h +index 8c3cd06..c695176 100644 +--- a/drivers/xen/v2v/v2vdrv.h ++++ b/drivers/xen/v2v/v2vdrv.h +@@ -48,6 +48,13 @@ + #define V2V_MESSAGE_STATUS_NODATA 0xFFFFF102 + #define V2V_MESSAGE_STATUS_WRITE_ERR 0xFFFFF103 + ++enum v2vdrv_role { ++ role_unknown = 0, ++ role_listener, ++ role_connector, ++ role_reader ++}; ++ + struct v2vdrv_frame_header { + uint16_t id; + uint8_t type; +@@ -74,9 +81,12 @@ struct v2vdrv_listener_resp_item { + + struct v2vdrv_config { + char *local_prefix; ++ enum v2vdrv_role role; + uint32_t xfer_count; + uint32_t xfer_size; + unsigned long timeout; ++ int async; ++ int fastrx; + }; + + static inline uint8_t v2vdrv_checksum(const uint8_t *ptr, uint32_t length) +diff --git a/drivers/xen/v2v/v2vops.c b/drivers/xen/v2v/v2vops.c +index 0c2798c..83854a2 100644 +--- a/drivers/xen/v2v/v2vops.c ++++ b/drivers/xen/v2v/v2vops.c +@@ -1,7 +1,7 @@ + /****************************************************************************** + * drivers/xen/v2v/v2vops.c + * +- * V2V sample client driver synchronous operations. ++ * V2V sample client driver operations. + * + * Copyright (c) 2009 Ross Philipson + * Copyright (c) 2009 Citrix Systems, Inc. +@@ -39,6 +39,7 @@ + #include + #include + #include ++#include + #include + #include + #include +@@ -46,38 +47,118 @@ + #include "v2vdrv.h" + + /************************** GENERAL **************************/ ++#define V2V_WAKE_REASON_TERMINATE V2V_WAKE_REASON_USER1 ++#define V2V_MAX_FASTRX_SIZE 1024 ++#define MIN(x, y) ((x) < (y) ? (x) : (y)) ++ ++#define V2V_TERM_UNKNOWN (uint32_t)-1 ++#define V2V_TERM_COMPLETE 0x0 ++#define V2V_TERM_GENERAL_ERROR 0x1 ++#define V2V_TERM_RX_ERROR 0x2 ++#define V2V_TERM_TX_ERROR 0x3 ++#define V2V_TERM_TIMEOUT 0x4 ++ ++struct v2vdrv_context { ++ struct v2vdrv_config *config; ++ struct v2v_channel *channel; ++ struct v2v_async *asvp; ++ uint32_t tx_counter; ++ uint32_t rx_counter; ++ ++ union { ++ struct { ++ struct v2vdrv_listener_resp_item *resp_list; ++ struct v2vdrv_listener_resp_item *resp_tail; ++ } listener; ++ struct { ++ int reserved; ++ } connector; ++ } r; ++ ++ union { ++ struct { ++ struct v2v_async asv; ++ struct work_struct rx_work; ++ struct work_struct tx_work; ++ struct timer_list to_timer; ++ spinlock_t rx_lock; ++ spinlock_t tx_lock; ++ atomic_t running; ++ uint32_t term_status; ++ } async; ++ struct { ++ int reserved; ++ } sync; ++ } s; ++}; ++ ++static void v2vdrv_async_init(struct v2vdrv_context *ctx); ++ ++static int ++v2vdrv_input_sanity_check(struct v2vdrv_context *ctx) ++{ ++ if (ctx->config->role == role_connector) { ++ if (ctx->config->xfer_size <= sizeof(struct v2vdrv_post_internal)) { ++ printk("%s (%s) transfer size %d (0x%x) too small; %d (0x%x) required\n", V2VDRV_LOGTAG, ++ "connector", ctx->config->xfer_size, ctx->config->xfer_size, ++ sizeof(struct v2vdrv_post_internal) + 1, sizeof(struct v2vdrv_post_internal) + 1); ++ return 0; ++ } ++ } ++ /* listener always sends fixed sized messages and doesn use xfer_size */ ++ ++ return 1; ++} ++ + static int + v2vdrv_message_header_check(const char *rstr, + struct v2vdrv_frame_header *header, + size_t msg_size, +- size_t min_size, +- uint32_t rx_counter) ++ size_t min_size) + { + if ((msg_size < sizeof(struct v2vdrv_frame_header))||(msg_size < min_size)) { +- printk("%s (%s) response #%d is too small!!!\n", V2VDRV_LOGTAG, rstr, rx_counter); ++ printk("%s (%s) response is too small!!!\n", V2VDRV_LOGTAG, rstr); + return 0; + } + if (header->length < msg_size) { +- printk("%s (%s) response #%d header length incorrect!!!\n", V2VDRV_LOGTAG, rstr, rx_counter); ++ printk("%s (%s) response header length incorrect!!!\n", V2VDRV_LOGTAG, rstr); + return 0; + } + +- printk("%s (%s) received message #%d\n", V2VDRV_LOGTAG, rstr, rx_counter); ++ printk("%s (%s) received message\n", V2VDRV_LOGTAG, rstr); + printk("------ id=%d type=%d length=0x%x\n", header->id, header->type, header->length); + + return 1; + } + ++static int ++v2vdrv_status_check(struct v2vdrv_context *ctx, const char *rstr) ++{ ++ int err; ++ enum v2v_endpoint_state state; ++ ++ err = v2v_get_remote_state(ctx->channel, &state); ++ if (err) { ++ printk("%s %s(%p) failure in v2v_get_remote_state(); aborting - error: %d\n", ++ V2VDRV_LOGTAG, rstr, ctx, err); ++ return 1; /* done with error */ ++ } ++ printk("%s %s(%p) state changed for other end - new state: %s\n", ++ V2VDRV_LOGTAG, rstr, ctx, v2v_endpoint_state_name(state)); ++ if (v2v_state_requests_disconnect(state)) { ++ printk("%s %s(%p) main processing loop ending for disconnect request...\n", ++ V2VDRV_LOGTAG, rstr, ctx); ++ return 1; /* done */ ++ } ++ return 0; ++} ++ + /************************* CONNECTOR *************************/ +-struct v2vdrv_connector_context { +- struct v2vdrv_config *config; +- struct v2v_channel *channel; +- uint32_t tx_counter; +- uint32_t rx_counter; +-}; ++static void v2vdrv_connector_process_messages_sync(struct v2vdrv_context *ctx); ++static void v2vdrv_connector_process_messages_async(struct v2vdrv_context *ctx); + + static int +-v2vdrv_connect(struct v2vdrv_connector_context *vcc) ++v2vdrv_connect(struct v2vdrv_context *ctx) + { + int err = 0; + enum v2v_endpoint_state state; +@@ -87,18 +168,18 @@ v2vdrv_connect(struct v2vdrv_connector_context *vcc) + struct timespec ts = {0}, tsz = {0}, now, delta; + + /* Connect to the listener, get back a channel handle */ +- err = v2v_connect(vcc->config->local_prefix, &vcc->channel); ++ err = v2v_connect(ctx->config->local_prefix, &ctx->channel, ctx->asvp); + if (err) { +- printk("%s connector(%p) failure in v2v_connect() - error: %d\n", V2VDRV_LOGTAG, vcc, err); ++ printk("%s connector(%p) failure in v2v_connect() - error: %d\n", V2VDRV_LOGTAG, ctx, err); + return err; + } + +- BUG_ON(vcc->channel == NULL); ++ BUG_ON(ctx->channel == NULL); + +- printk("%s connector(%p) connected to listener; wait for listenter to indicate it has accepted the connection...\n", V2VDRV_LOGTAG, vcc); ++ printk("%s connector(%p) connected to listener; wait for listenter to indicate it has accepted the connection...\n", V2VDRV_LOGTAG, ctx); + +- wait_state = v2v_get_wait_state(vcc->channel); +- to = vcc->config->timeout << 2; /* in jiffies x4*/ ++ wait_state = v2v_get_wait_state(ctx->channel); ++ to = ctx->config->timeout << 2; /* in jiffies x4*/ + + do { + if (!timespec_equal(&ts, &tsz)) { +@@ -120,24 +201,24 @@ v2vdrv_connect(struct v2vdrv_connector_context *vcc) + to); + if (rc == 0) { + printk("%s connector(%p) timed out waiting for accept from listener; disconnecting\n", +- V2VDRV_LOGTAG, vcc); ++ V2VDRV_LOGTAG, ctx); + err = -ETIMEDOUT; + break; + } + +- reasons = v2v_get_wake_reason(vcc->channel, V2V_WAKE_REASON_CONTROL); ++ reasons = v2v_get_wake_reason(ctx->channel, V2V_WAKE_REASON_CONTROL); + if (reasons & V2V_WAKE_REASON_CONTROL) { +- err = v2v_get_remote_state(vcc->channel, &state); ++ err = v2v_get_remote_state(ctx->channel, &state); + if (err) { + printk("%s connector(%p) failure in v2v_get_remote_state(); aborting - error: %d\n", +- V2VDRV_LOGTAG, vcc, err); ++ V2VDRV_LOGTAG, ctx, err); + break; + } + printk("%s connector(%p) state changed for other end - new state: %s\n", +- V2VDRV_LOGTAG, vcc, v2v_endpoint_state_name(state)); ++ V2VDRV_LOGTAG, ctx, v2v_endpoint_state_name(state)); + if (state == v2v_state_connected) { + printk("%s connector(%p) listener reports connected; begin processing messages.\n", +- V2VDRV_LOGTAG, vcc); ++ V2VDRV_LOGTAG, ctx); + err = 0; + break; + } +@@ -145,31 +226,89 @@ v2vdrv_connect(struct v2vdrv_connector_context *vcc) + } while (1); + + if (err) +- v2v_disconnect(vcc->channel); ++ v2v_disconnect(ctx->channel); + + return err; + } + ++static void ++v2vdrv_connector_disconnect(struct v2vdrv_context *ctx) ++{ ++ int err; ++ ++ printk("%s connector(%p) Disconnecting...\n", V2VDRV_LOGTAG, ctx); ++ err = v2v_disconnect(ctx->channel); ++ printk("%s connector(%p) Disconnected - status: %d\n", V2VDRV_LOGTAG, ctx, err); ++ ++ printk("%s connector(%p) Sent message counter: %d\n", V2VDRV_LOGTAG, ctx, ctx->tx_counter); ++ printk("%s connector(%p) Received response counter: %d\n", V2VDRV_LOGTAG, ctx, ctx->rx_counter); ++ ++ if (ctx->tx_counter != ctx->rx_counter) ++ printk("%s connector(%p) WARNING Response count does not match the send count\n", V2VDRV_LOGTAG, ctx); ++} ++ ++void v2vdrv_run_connector(struct v2vdrv_config *config) ++{ ++ struct v2vdrv_context *ctx; ++ int err; ++ ++ ctx = kmalloc(sizeof(struct v2vdrv_context), GFP_KERNEL); ++ if (!ctx) { ++ printk("%s connector out of memory\n", V2VDRV_LOGTAG); ++ return; ++ } ++ memset(ctx, 0, sizeof(struct v2vdrv_context)); ++ ctx->config = config; ++ ++ if (config->async) ++ v2vdrv_async_init(ctx); ++ ++ if (!v2vdrv_input_sanity_check(ctx)) { ++ kfree(ctx); ++ return; ++ } ++ ++ err = v2vdrv_connect(ctx); ++ if (err) { ++ kfree(ctx); ++ return; ++ } ++ ++ /* This runs the main processing loop, when it is done we disconnect ++ and cleanup regardless of what may have occured */ ++ if (config->async) ++ v2vdrv_connector_process_messages_async(ctx); ++ else ++ v2vdrv_connector_process_messages_sync(ctx); ++ ++ v2vdrv_connector_disconnect(ctx); ++ ++ kfree(ctx); ++} ++ ++/********************** CONNECTOR SYNC ***********************/ + static int +-v2vdrv_connector_process_internal_rx(struct v2vdrv_connector_context *vcc) ++v2vdrv_connector_process_internal_sync_rx(struct v2vdrv_context *ctx) + { + int err; + volatile void *msg; + size_t size; +- unsigned type; +- unsigned flags; ++ unsigned vtype; ++ unsigned vflags; + struct v2vdrv_frame_header *header; + struct v2vdrv_resp_internal *vri; + uint8_t sum; + +- while ((err = v2v_nc2_get_message(vcc->channel, (const volatile void **)&msg, &size, &type, &flags)) ++ if (ctx->config->fastrx) ++ v2v_nc2_request_fast_receive(ctx->channel); ++ ++ while ((err = v2v_nc2_get_message(ctx->channel, (const volatile void **)&msg, &size, &vtype, &vflags)) + == 0) { +- vcc->rx_counter++; ++ ctx->rx_counter++; + header = (struct v2vdrv_frame_header*)msg; + if (!v2vdrv_message_header_check("connector", header, size, +- sizeof(struct v2vdrv_resp_internal), +- vcc->rx_counter)) { +- v2v_nc2_finish_message(vcc->channel); ++ sizeof(struct v2vdrv_resp_internal))) { ++ v2v_nc2_finish_message(ctx->channel); + return -EBADMSG; + } + +@@ -184,97 +323,107 @@ v2vdrv_connector_process_internal_rx(struct v2vdrv_connector_context *vcc) + + sum = v2vdrv_checksum((const uint8_t*)msg, header->length); + if (sum != 0) +- printk("%s connector(%p) bad checksumm on response #%d!!!\n", V2VDRV_LOGTAG, vcc, vcc->rx_counter); ++ printk("%s connector(%p) bad checksumm on response #%d!!!\n", V2VDRV_LOGTAG, ctx, ctx->rx_counter); + +- v2v_nc2_finish_message(vcc->channel); ++ v2v_nc2_finish_message(ctx->channel); + } ++ if (ctx->config->fastrx) ++ v2v_nc2_cancel_fast_receive(ctx->channel); ++ + if (err == -ENODATA) { + /* No more messages */ +- printk("%s connector(%p) no more messages, returning\n", V2VDRV_LOGTAG, vcc); ++ printk("%s connector(%p) no more messages, returning\n", V2VDRV_LOGTAG, ctx); + return 0; + } + + printk("%s connector(%p) receive internal data failure; abort processing - error: %d\n", +- V2VDRV_LOGTAG, vcc, err); ++ V2VDRV_LOGTAG, ctx, err); + return err; /* failure */ + } + + static int +-v2vdrv_connector_process_internal_tx(struct v2vdrv_connector_context *vcc) ++v2vdrv_connector_process_internal_sync_tx(struct v2vdrv_context *ctx) + { + int err; + unsigned available; + volatile void *msg; + uint8_t *msgp; ++ size_t msize; + struct v2vdrv_frame_header *header; + struct v2vdrv_post_internal *vpi; + +- printk("%s connector(%p) sending internal message #%d\n", V2VDRV_LOGTAG, vcc, vcc->tx_counter + 1); +- available = v2v_nc2_producer_bytes_available(vcc->channel); +- printk("%s connector(%p) channel indicates minimum bytes available: 0x%x\n", V2VDRV_LOGTAG, vcc, available); ++ printk("%s connector(%p) sending internal message #%d\n", V2VDRV_LOGTAG, ctx, ctx->tx_counter + 1); ++ available = v2v_nc2_producer_bytes_available(ctx->channel); ++ printk("%s connector(%p) channel indicates minimum bytes available: 0x%x\n", V2VDRV_LOGTAG, ctx, available); ++ ++ if (ctx->config->fastrx && v2v_nc2_remote_requested_fast_wakeup(ctx->channel)) ++ msize = MIN(ctx->config->xfer_size, V2V_MAX_FASTRX_SIZE); ++ else ++ msize = ctx->config->xfer_size; + +- err = v2v_nc2_prep_message(vcc->channel, vcc->config->xfer_size, V2V_MESSAGE_TYPE_INTERNAL, 0, &msg); ++ err = v2v_nc2_prep_message(ctx->channel, msize, V2V_MESSAGE_TYPE_INTERNAL, 0, &msg); + if (err) { + if (err == -EAGAIN) { + /* No room right now, return and try again later */ + printk("%s connector(%p) not enough buffer space to send message #%d; retry\n", +- V2VDRV_LOGTAG, vcc, vcc->tx_counter + 1); ++ V2VDRV_LOGTAG, ctx, ctx->tx_counter + 1); + return -EAGAIN; + } + printk("%s connector(%p) transmit internal data failure; abort processing - error: %d\n", +- V2VDRV_LOGTAG, vcc, err); ++ V2VDRV_LOGTAG, ctx, err); + return err; /* failure */ + } +- vcc->tx_counter++; /* next message */ ++ ctx->tx_counter++; /* next message */ + header = (struct v2vdrv_frame_header*)msg; +- header->id = (uint16_t)vcc->tx_counter; ++ header->id = (uint16_t)ctx->tx_counter; + header->type = V2V_MESSAGE_TYPE_INTERNAL; + header->cs = 0; +- header->length = vcc->config->xfer_size; ++ header->length = ctx->config->xfer_size; + vpi = (struct v2vdrv_post_internal*)msg; + generate_random_uuid(vpi->guid); + + /* Fill it up with some data and send it */ + msgp = (uint8_t*)msg + sizeof(struct v2vdrv_post_internal); +- memset(msgp, 'X', (vcc->config->xfer_size - sizeof(struct v2vdrv_post_internal))); +- header->cs = v2vdrv_checksum((const uint8_t*)msg, vcc->config->xfer_size); +- v2v_nc2_send_messages(vcc->channel); ++ memset(msgp, 'X', (ctx->config->xfer_size - sizeof(struct v2vdrv_post_internal))); ++ header->cs = v2vdrv_checksum((const uint8_t*)msg, ctx->config->xfer_size); ++ v2v_nc2_send_messages(ctx->channel); + + /* Keep the send loop going by setting the event. If there is no more room, the prep message call + will return ERROR_RETRY and just land us back in the wait. */ +- v2v_set_wake_reason(vcc->channel, V2V_WAKE_REASON_SEND); ++ v2v_set_wake_reason(ctx->channel, V2V_WAKE_REASON_SEND); + + return 0; + } + + static void +-v2vdrv_connector_process_messages(struct v2vdrv_connector_context *vcc) ++v2vdrv_connector_process_messages_sync(struct v2vdrv_context *ctx) + { + int err = 0, wait_to = 0, done = 0; + struct v2v_wait *wait_state; + u8 reasons_mask = V2V_WAKE_REASON_CONTROL|V2V_WAKE_REASON_RECEIVE|V2V_WAKE_REASON_SEND; +- u8 reasons; +- enum v2v_endpoint_state state; ++ u8 reason; + unsigned long to, td, rc; + struct timespec ts = {0}, tsz = {0}, now, delta; + ++ printk("%s connector(%p) started SYNC processing loop for transfer\n", V2VDRV_LOGTAG, ctx); ++ + /* A transfer count of 0 is used to just test connecting and disconnecting + w/o sending any data */ +- if (vcc->config->xfer_count == 0) { +- printk("%s connector(%p) tranfer count set to 0; disconnecting.\n", V2VDRV_LOGTAG, vcc); ++ if (ctx->config->xfer_count == 0) { ++ printk("%s connector(%p) tranfer count set to 0; disconnecting.\n", V2VDRV_LOGTAG, ctx); + return; + } + +- wait_state = v2v_get_wait_state(vcc->channel); +- to = vcc->config->timeout << 2; /* in jiffies x4*/ ++ wait_state = v2v_get_wait_state(ctx->channel); ++ to = ctx->config->timeout; /* in jiffies */ + + /* Send our first file chunk to the listener to start things off */ +- err = v2vdrv_connector_process_internal_tx(vcc); ++ err = v2vdrv_connector_process_internal_sync_tx(ctx); + if (err) { + /* we should not get an -EAGAIN on first message, return it as an error */ + BUG_ON(err == -EAGAIN); /* SNO on first message */ + printk("%s connector(%p) transmit internal data failure on first message; abort processing - error: %d\n", +- V2VDRV_LOGTAG, vcc, err); ++ V2VDRV_LOGTAG, ctx, err); + return; + } + +@@ -282,10 +431,10 @@ v2vdrv_connector_process_messages(struct v2vdrv_connector_context *vcc) + do { + /* When the tx counter reaches the transfer count value, stop sending and wait for + the rest of the responses */ +- if (vcc->tx_counter == vcc->config->xfer_count) { ++ if (ctx->tx_counter == ctx->config->xfer_count) { + /* First see if we are done */ +- if (vcc->rx_counter == vcc->tx_counter) { +- printk("%s connector(%p) received all remaing responses from listener; disconnecting.\n", V2VDRV_LOGTAG, vcc); ++ if (ctx->rx_counter == ctx->tx_counter) { ++ printk("%s connector(%p) received all remaing responses from listener; disconnecting.\n", V2VDRV_LOGTAG, ctx); + err = 0; + break; + } +@@ -318,50 +467,35 @@ v2vdrv_connector_process_messages(struct v2vdrv_connector_context *vcc) + } + if (rc == 0) { + printk("%s connector(%p) timed out waiting for ack responses from listener; disconnecting\n", +- V2VDRV_LOGTAG, vcc); ++ V2VDRV_LOGTAG, ctx); + break; + } + + do { +- reasons = v2v_get_wake_reason(vcc->channel, reasons_mask); ++ reason = v2v_get_wake_reason(ctx->channel, reasons_mask); + +- if (reasons & V2V_WAKE_REASON_CONTROL) { +- err = v2v_get_remote_state(vcc->channel, &state); +- if (err) { +- printk("%s connector(%p) failure in v2v_get_remote_state(); aborting - error: %d\n", +- V2VDRV_LOGTAG, vcc, err); +- done = 1; +- break; +- } +- printk("%s connector(%p) state changed for other end - new state: %s\n", +- V2VDRV_LOGTAG, vcc, v2v_endpoint_state_name(state)); +- if (v2v_state_requests_disconnect(state)) { +- printk("%s connector(%p) main processing loop ending for disconnect request...\n", +- V2VDRV_LOGTAG, vcc); +- err = 0; +- done = 1; +- break; +- } ++ if (reason & V2V_WAKE_REASON_CONTROL) { ++ done = v2vdrv_status_check(ctx, "connector"); ++ if (done) ++ break; + } +- +- if (reasons & V2V_WAKE_REASON_SEND) { +- if (vcc->tx_counter != vcc->config->xfer_count) { +- err = v2vdrv_connector_process_internal_tx(vcc); ++ else if (reason & V2V_WAKE_REASON_SEND) { ++ if (ctx->tx_counter != ctx->config->xfer_count) { ++ err = v2vdrv_connector_process_internal_sync_tx(ctx); + if ((err != 0)&&(err != -EAGAIN)) { + done = 1; + break; + } + } + } +- +- if (reasons & V2V_WAKE_REASON_RECEIVE) { +- err = v2vdrv_connector_process_internal_rx(vcc); ++ else if (reason & V2V_WAKE_REASON_RECEIVE) { ++ err = v2vdrv_connector_process_internal_sync_rx(ctx); + if (err) { + done = 1; + break; + } + } +- } while (reasons != V2V_WAKE_REASON_NONE); ++ } while (reason != V2V_WAKE_REASON_NONE); + + if (done) + break; +@@ -369,114 +503,364 @@ v2vdrv_connector_process_messages(struct v2vdrv_connector_context *vcc) + } while (1); + } + +-static void +-v2vdrv_connector_disconnect(struct v2vdrv_connector_context *vcc) ++/********************* CONNECTOR ASYNC ***********************/ ++static int ++v2vdrv_connector_process_internal_async_rx(struct v2vdrv_context *ctx) + { +- int err; ++ int err, last = 0; ++ unsigned long flags; ++ uint32_t rxc = 0; ++ volatile void *msg; ++ size_t size; ++ unsigned vtype; ++ unsigned vflags; ++ struct v2vdrv_frame_header *header; ++ struct v2vdrv_resp_internal lvri; ++ uint8_t *pguid; ++ uint8_t sum; + +- printk("%s connector(%p) Disconnecting...\n", V2VDRV_LOGTAG, vcc); +- err = v2v_disconnect(vcc->channel); +- printk("%s connector(%p) Disconnected - status: %d\n", V2VDRV_LOGTAG, vcc, err); ++ do { ++ /* Critical section where we access the rings - have to lock this area */ ++ spin_lock_irqsave(&ctx->s.async.rx_lock, flags); + +- printk("%s connector(%p) Sent message counter: %d\n", V2VDRV_LOGTAG, vcc, vcc->tx_counter); +- printk("%s connector(%p) Received response counter: %d\n", V2VDRV_LOGTAG, vcc, vcc->rx_counter); ++ err = v2v_nc2_get_message(ctx->channel, (const volatile void **)&msg, &size, &vtype, &vflags); ++ if (err != 0) { ++ spin_unlock_irqrestore(&ctx->s.async.rx_lock, flags); ++ break; ++ } + +- if (vcc->tx_counter != vcc->rx_counter) +- printk("%s connector(%p) WARNING Response count does not match the send count\n", V2VDRV_LOGTAG, vcc); ++ header = (struct v2vdrv_frame_header*)msg; ++ if (!v2vdrv_message_header_check("connector", header, size, ++ sizeof(struct v2vdrv_resp_internal))) { ++ v2v_nc2_finish_message(ctx->channel); ++ spin_unlock_irqrestore(&ctx->s.async.rx_lock, flags); ++ err = -EBADMSG; ++ break; ++ } ++ ++ /* Make a local copy of the message header and run a checksum over the message */ ++ memcpy(&lvri, (void*)msg, sizeof(struct v2vdrv_resp_internal)); ++ sum = v2vdrv_checksum((const uint8_t*)msg, header->length); ++ ++ v2v_nc2_finish_message(ctx->channel); ++ spin_unlock_irqrestore(&ctx->s.async.rx_lock, flags); ++ ++ /* Critical section to test counter state */ ++ spin_lock_irqsave(&ctx->s.async.tx_lock, flags); ++ ++ /* Update counter tracking state, we just sent one */ ++ rxc = ++ctx->rx_counter; ++ ++ /* See if we are done sending and receiving and set the completion event */ ++ if ((ctx->tx_counter == ctx->config->xfer_count)&&(ctx->rx_counter == ctx->tx_counter)) { ++ ctx->s.async.term_status = V2V_TERM_COMPLETE; ++ v2v_set_wake_reason(ctx->channel, V2V_WAKE_REASON_TERMINATE); ++ last = 0; ++ } ++ spin_unlock_irqrestore(&ctx->s.async.tx_lock, flags); ++ ++ /* Out of critical section and the buffer in the ring has been released, ++ back; can't touch the msg any longer - do some testing and tracing */ ++ printk("------ message status=%d\n", lvri.status); ++ pguid = &lvri.guid[0]; ++ printk("------ GUID1=%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x\n", ++ pguid[0], pguid[1], pguid[2], pguid[3], pguid[4], pguid[5], pguid[6], pguid[7]); ++ printk("------ GUID2=%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x\n", ++ pguid[8], pguid[9], pguid[10], pguid[11], pguid[12], pguid[13], pguid[14], pguid[15]); ++ if (sum != 0) ++ printk("%s connector(%p) bad checksumm on response #%d!!!\n", V2VDRV_LOGTAG, ctx, ctx->rx_counter); ++ ++ if (last) { ++ err = 0; ++ break; ++ } ++ } while (1); ++ ++ /* If we ended the receive handler with this code, there are no more message right ++ now so we wait for an rx interrupt. If the status is success then it was the last ++ message. */ ++ if (err == -ENODATA) { ++ /* No more messages */ ++ printk("%s connector(%p) no more messages, returning\n", V2VDRV_LOGTAG, ctx); ++ return 0; ++ } ++ else if (err == 0) { ++ /* Last message received */ ++ printk("%s connector(%p) last message sent, returning\n", V2VDRV_LOGTAG, ctx); ++ return 0; ++ } ++ ++ printk("%s connector(%p) receive internal data failure; abort processing - error: %d\n", ++ V2VDRV_LOGTAG, ctx, err); ++ return err; /* failure */ + } + +-void v2vdrv_run_connector(struct v2vdrv_config *config) ++static int ++v2vdrv_connector_process_internal_async_tx(struct v2vdrv_context *ctx) + { +- struct v2vdrv_connector_context *vcc; + int err; ++ unsigned long flags; ++ uint32_t txc = 0; ++ unsigned available; ++ volatile void *msg; ++ uint8_t *msgp; ++ struct v2vdrv_frame_header *header; ++ struct v2vdrv_post_internal *vpi; ++ uint8_t guid[16]; + +- vcc = kmalloc(sizeof(struct v2vdrv_connector_context), GFP_KERNEL); +- if (!vcc) { +- printk("%s connector out of memory\n", V2VDRV_LOGTAG); ++ /* Loop and send as many messages as possible. The internal message connector does not wait for ++ responses before posting more messages. */ ++ do { ++ /* Make a GUID for the next message we send if any */ ++ generate_random_uuid(&guid[0]); ++ ++ /* Critical section where we access the rings - have to lock this area */ ++ spin_lock_irqsave(&ctx->s.async.tx_lock, flags); ++ ++ /* See if we are done sending, if so set a timer and exit */ ++ if (ctx->tx_counter == ctx->config->xfer_count) { ++ spin_unlock_irqrestore(&ctx->s.async.tx_lock, flags); ++ /* Set the expiry for waiting for responses now that all messages have been sent */ ++ if (!timer_pending(&ctx->s.async.to_timer)) { ++ ctx->s.async.to_timer.expires = jiffies + ctx->config->timeout; /* in jiffies */ ++ add_timer(&ctx->s.async.to_timer); ++ printk("%s connector(%p) finished sending message, setting timer and exiting\n", V2VDRV_LOGTAG, ctx); ++ } ++ else ++ printk("%s connector(%p) finished sending message, timer already pending???\n", V2VDRV_LOGTAG, ctx); ++ ++ break; ++ } ++ ++ available = v2v_nc2_producer_bytes_available(ctx->channel); ++ err = v2v_nc2_prep_message(ctx->channel, ctx->config->xfer_size, V2V_MESSAGE_TYPE_INTERNAL, 0, &msg); ++ if (err) { ++ spin_unlock_irqrestore(&ctx->s.async.tx_lock, flags); ++ if (err == -EAGAIN) { ++ /* No room right now, return and try again later */ ++ printk("%s connector(%p) not enough buffer space to send message #%d; retry\n", ++ V2VDRV_LOGTAG, ctx, ctx->tx_counter + 1); ++ err = 0; ++ } ++ else /* failure */ ++ printk("%s connector(%p) transmit internal data failure; abort processing - error: %d\n", ++ V2VDRV_LOGTAG, ctx, err); ++ break; ++ } ++ ++ txc = ++ctx->tx_counter; /* next message */ ++ header = (struct v2vdrv_frame_header*)msg; ++ header->id = (uint16_t)ctx->tx_counter; ++ header->type = V2V_MESSAGE_TYPE_INTERNAL; ++ header->cs = 0; ++ header->length = ctx->config->xfer_size; ++ vpi = (struct v2vdrv_post_internal*)msg; ++ memcpy(&vpi->guid[0], &guid, sizeof(guid)); ++ ++ /* Fill it up with some data and send it */ ++ msgp = (uint8_t*)msg + sizeof(struct v2vdrv_post_internal); ++ memset(msgp, 'X', (ctx->config->xfer_size - sizeof(struct v2vdrv_post_internal))); ++ header->cs = v2vdrv_checksum((const uint8_t*)msg, ctx->config->xfer_size); ++ v2v_nc2_send_messages(ctx->channel); ++ ++ spin_unlock_irqrestore(&ctx->s.async.tx_lock, flags); ++ ++ printk("%s connector(%p) sent internal message #%d\n", V2VDRV_LOGTAG, ctx, txc); ++ } while (1); ++ ++ return err; ++} ++ ++static void ++v2vdrv_connector_process_messages_async(struct v2vdrv_context *ctx) ++{ ++ int done = 0; ++ struct v2v_wait *wait_state; ++ u8 reasons_mask = V2V_WAKE_REASON_CONTROL|V2V_WAKE_REASON_TERMINATE; ++ u8 reason; ++ ++ printk("%s connector(%p) started ASYNC processing loop for transfer\n", V2VDRV_LOGTAG, ctx); ++ ++ /* A transfer count of 0 is used to just test connecting and disconnecting ++ w/o sending any data */ ++ if (ctx->config->xfer_count == 0) { ++ printk("%s connector(%p) tranfer count set to 0; disconnecting.\n", V2VDRV_LOGTAG, ctx); + return; + } +- memset(vcc, 0, sizeof(struct v2vdrv_connector_context)); +- vcc->config = config; + +- err = v2vdrv_connect(vcc); +- if (err) +- return; ++ atomic_inc(&ctx->s.async.running); ++ wait_state = v2v_get_wait_state(ctx->channel); + +- /* This runs the main processing loop, when it is done we disconnect +- and cleanup regardless of what may have occured */ +- v2vdrv_connector_process_messages(vcc); ++ /* Send our first message to the listener to start things off */ ++ schedule_work(&ctx->s.async.tx_work); + +- v2vdrv_connector_disconnect(vcc); ++ /* Start out processing loop, wait for status changes */ ++ do { ++ wait_event(wait_state->wait_event, ++ atomic_xchg(&wait_state->wait_condition, 0) == 1); ++ ++ do { ++ reason = v2v_get_wake_reason(ctx->channel, reasons_mask); ++ ++ if (reason & V2V_WAKE_REASON_CONTROL) { ++ done = v2vdrv_status_check(ctx, "connector"); ++ if (done) ++ break; ++ } ++ else if (reason & V2V_WAKE_REASON_TERMINATE) { ++ /* The terminate event may indicate an error or normal completion */ ++ switch (ctx->s.async.term_status) { ++ case V2V_TERM_COMPLETE: ++ case V2V_TERM_TIMEOUT: ++ printk("%s connector(%p) async handlers signalled a terminate for reason: %s; exiting.\n", ++ V2VDRV_LOGTAG, ctx, (ctx->s.async.term_status == V2V_TERM_COMPLETE) ? "complete" : "timeout"); ++ break; ++ default: ++ printk("%s connector(%p) async handlers signalled a terminate with error status=0x%x; exiting.\n", ++ V2VDRV_LOGTAG, ctx, ctx->s.async.term_status); ++ } ++ done = 1; ++ break; ++ } ++ } while (reason != V2V_WAKE_REASON_NONE); ++ ++ if (done) ++ break; ++ ++ } while (1); + +- kfree(vcc); ++ atomic_dec(&ctx->s.async.running); ++ del_timer_sync(&ctx->s.async.to_timer); + } + +-/************************* LISTENER *************************/ +-struct v2vdrv_listener_context { +- struct v2vdrv_config *config; +- struct v2v_channel *channel; +- uint32_t tx_counter; +- uint32_t rx_counter; +- struct v2vdrv_listener_resp_item *resp_list; +- struct v2vdrv_listener_resp_item *resp_tail; +-}; ++/************************* LISTENER **************************/ ++static void v2vdrv_listener_process_messages_sync(struct v2vdrv_context *ctx); ++static void v2vdrv_listener_process_messages_async(struct v2vdrv_context *ctx); + + static int +-v2vdrv_listen_accept(struct v2vdrv_listener_context *vlc) ++v2vdrv_listen_accept(struct v2vdrv_context *ctx) + { + int err, err2; + + /* Start the listener, get back a channel handle */ +- err = v2v_listen(vlc->config->local_prefix, &vlc->channel, 0, 0); ++ err = v2v_listen(ctx->config->local_prefix, &ctx->channel, 0, 0, ctx->asvp); + if (err) { +- printk("%s listener(%p) failure in v2v_listen() - error: %d\n", V2VDRV_LOGTAG, vlc, err); ++ printk("%s listener(%p) failure in v2v_listen() - error: %d\n", V2VDRV_LOGTAG, ctx, err); + return err; + } +- BUG_ON(vlc->channel == NULL); +- printk("%s listener(%p) listener started, wait to accept...\n", V2VDRV_LOGTAG, vlc); ++ BUG_ON(ctx->channel == NULL); ++ printk("%s listener(%p) listener started, wait to accept...\n", V2VDRV_LOGTAG, ctx); + + /* Wait to accept the connection from the connector end */ +- err = v2v_accept(vlc->channel); ++ err = v2v_accept(ctx->channel); + if (err) { + if (err != -ENOLINK) +- printk("%s listener(%p) failure in v2v_accept() - error: %d\n", V2VDRV_LOGTAG, vlc, err); ++ printk("%s listener(%p) failure in v2v_accept() - error: %d\n", V2VDRV_LOGTAG, ctx, err); + else +- printk("%s listener(%p) remote end disconnected while waiting to accept\n", V2VDRV_LOGTAG, vlc); ++ printk("%s listener(%p) remote end disconnected while waiting to accept\n", V2VDRV_LOGTAG, ctx); + +- err2 = v2v_disconnect(vlc->channel); ++ err2 = v2v_disconnect(ctx->channel); + if (err2) { + printk("%s listener(%p) secondary failure in v2v_disconnect() after accept failed - error: %d\n", +- V2VDRV_LOGTAG, vlc, err2); ++ V2VDRV_LOGTAG, ctx, err2); + } + return err; + } + +- printk("%s listener(%p) accepted connection, ready to process incoming data.\n", V2VDRV_LOGTAG, vlc); ++ printk("%s listener(%p) accepted connection, ready to process incoming data.\n", V2VDRV_LOGTAG, ctx); + + return 0; + } + ++static void ++v2vdrv_listener_disconnect(struct v2vdrv_context *ctx) ++{ ++ int err; ++ uint32_t i = 0; ++ struct v2vdrv_listener_resp_item *resp; ++ ++ printk("%s listener(%p) Disconnecting...\n", V2VDRV_LOGTAG, ctx); ++ err = v2v_disconnect(ctx->channel); ++ printk("%s listener(%p) Disconnected - status: %d\n", V2VDRV_LOGTAG, ctx, err); ++ ++ printk("%s listener(%p) Sent message counter: %d\n", V2VDRV_LOGTAG, ctx, ctx->tx_counter); ++ printk("%s listener(%p) Received response counter: %d\n", V2VDRV_LOGTAG, ctx, ctx->rx_counter); ++ if (ctx->tx_counter != ctx->rx_counter) ++ printk("%s listener(%p) WARNING Response count does not match the send count\n", V2VDRV_LOGTAG, ctx); ++ ++ while (ctx->r.listener.resp_list) { ++ resp = ctx->r.listener.resp_list; ++ ctx->r.listener.resp_list = resp->next; ++ kfree(resp); ++ i++; ++ } ++ if (i > 0) ++ printk("%s listener(%p) WARNING Found %d unsent responses\n", V2VDRV_LOGTAG, ctx, i); ++} ++ ++void v2vdrv_run_listener(struct v2vdrv_config *config) ++{ ++ struct v2vdrv_context *ctx; ++ int err; ++ ++ ctx = kmalloc(sizeof(struct v2vdrv_context), GFP_KERNEL); ++ if (!ctx) { ++ printk("%s listener out of memory\n", V2VDRV_LOGTAG); ++ return; ++ } ++ memset(ctx, 0, sizeof(struct v2vdrv_context)); ++ ctx->config = config; ++ ++ if (config->async) ++ v2vdrv_async_init(ctx); ++ ++ if (!v2vdrv_input_sanity_check(ctx)) { ++ kfree(ctx); ++ return; ++ } ++ ++ err = v2vdrv_listen_accept(ctx); ++ if (err) { ++ kfree(ctx); ++ return; ++ } ++ ++ /* This runs the main processing loop, when it is done we disconnect ++ and cleanup regardless of what may have occured */ ++ if (config->async) ++ v2vdrv_listener_process_messages_async(ctx); ++ else ++ v2vdrv_listener_process_messages_sync(ctx); ++ ++ v2vdrv_listener_disconnect(ctx); ++ ++ kfree(ctx); ++} ++ ++/********************** LISTENER SYNC ************************/ + static int +-v2vdrv_listener_process_internal_rx(struct v2vdrv_listener_context *vlc) ++v2vdrv_listener_process_internal_sync_rx(struct v2vdrv_context *ctx) + { + int err; + volatile void *msg; + size_t size; +- unsigned type; +- unsigned flags; ++ unsigned vtype; ++ unsigned vflags; + struct v2vdrv_frame_header *header; + struct v2vdrv_post_internal *vpi; + struct v2vdrv_listener_resp_item *vlri; + uint8_t sum; + +- while ((err = v2v_nc2_get_message(vlc->channel, (const volatile void**)&msg, &size, &type, &flags)) ++ if (ctx->config->fastrx) ++ v2v_nc2_request_fast_receive(ctx->channel); ++ ++ while ((err = v2v_nc2_get_message(ctx->channel, (const volatile void**)&msg, &size, &vtype, &vflags)) + == 0) { +- vlc->rx_counter++; ++ ctx->rx_counter++; + header = (struct v2vdrv_frame_header*)msg; + if (!v2vdrv_message_header_check("listener", header, size, +- sizeof(struct v2vdrv_post_internal), +- vlc->rx_counter)) { +- v2v_nc2_finish_message(vlc->channel); ++ sizeof(struct v2vdrv_post_internal))) { ++ v2v_nc2_finish_message(ctx->channel); + return -EBADMSG; + } + +@@ -490,7 +874,7 @@ v2vdrv_listener_process_internal_rx(struct v2vdrv_listener_context *vlc) + + sum = v2vdrv_checksum((const uint8_t*)msg, header->length); + if (sum != 0) +- printk("%s listener(%p) bad checksumm on message #%d!!!\n", V2VDRV_LOGTAG, vlc, vlc->rx_counter); ++ printk("%s listener(%p) bad checksumm on message #%d!!!\n", V2VDRV_LOGTAG, ctx, ctx->rx_counter); + + /* Queue a response */ + vlri = kmalloc(sizeof(struct v2vdrv_listener_resp_item), GFP_KERNEL); +@@ -501,35 +885,38 @@ v2vdrv_listener_process_internal_rx(struct v2vdrv_listener_context *vlc) + vlri->resp.header.cs = 0; + vlri->resp.header.length = sizeof(struct v2vdrv_resp_internal); /* header + resp data */ + vlri->resp.status = (sum == 0 ? V2V_MESSAGE_STATUS_OK : V2V_MESSAGE_STATUS_BADCS); +- memcpy(&vlri->resp.guid, vpi->guid, 16); ++ memcpy(&vlri->resp.guid[0], vpi->guid, sizeof(vlri->resp.guid)); + vlri->resp.header.cs = v2vdrv_checksum((const uint8_t*)vlri, sizeof(struct v2vdrv_resp_internal)); +- if (vlc->resp_list) { +- vlc->resp_tail->next = vlri; +- vlc->resp_tail = vlri; ++ if (ctx->r.listener.resp_list) { ++ ctx->r.listener.resp_tail->next = vlri; ++ ctx->r.listener.resp_tail = vlri; + } + else { +- vlc->resp_list = vlri; +- vlc->resp_tail = vlri; ++ ctx->r.listener.resp_list = vlri; ++ ctx->r.listener.resp_tail = vlri; + } + } + else +- printk("%s listener(%p) cannot queue response; out of memory\n", V2VDRV_LOGTAG, vlc); ++ printk("%s listener(%p) cannot queue response; out of memory\n", V2VDRV_LOGTAG, ctx); + +- v2v_nc2_finish_message(vlc->channel); ++ v2v_nc2_finish_message(ctx->channel); + } ++ if (ctx->config->fastrx) ++ v2v_nc2_cancel_fast_receive(ctx->channel); ++ + if (err == -ENODATA) { + /* No more messages */ +- printk("%s listener(%p) no more messages, returning\n", V2VDRV_LOGTAG, vlc); ++ printk("%s listener(%p) no more messages, returning\n", V2VDRV_LOGTAG, ctx); + return 0; + } + + printk("%s listener(%p) receive internal data failure; abort processing - error: %d\n", +- V2VDRV_LOGTAG, vlc, err); ++ V2VDRV_LOGTAG, ctx, err); + return err; /* failure */ + } + + static int +-v2vdrv_listener_process_internal_tx(struct v2vdrv_listener_context *vlc) ++v2vdrv_listener_process_internal_sync_tx(struct v2vdrv_context *ctx) + { + int err; + unsigned available; +@@ -537,59 +924,60 @@ v2vdrv_listener_process_internal_tx(struct v2vdrv_listener_context *vlc) + uint8_t *msgp; + struct v2vdrv_listener_resp_item *vlri; + +- printk("%s listener(%p) sending internal response #%d\n", V2VDRV_LOGTAG, vlc, vlc->tx_counter + 1); +- available = v2v_nc2_producer_bytes_available(vlc->channel); +- printk("%s listener(%p) channel indicates minimum bytes available: 0x%x\n", V2VDRV_LOGTAG, vlc, available); +- BUG_ON(vlc->resp_list == NULL); ++ printk("%s listener(%p) sending internal response #%d\n", V2VDRV_LOGTAG, ctx, ctx->tx_counter + 1); ++ available = v2v_nc2_producer_bytes_available(ctx->channel); ++ printk("%s listener(%p) channel indicates minimum bytes available: 0x%x\n", V2VDRV_LOGTAG, ctx, available); ++ BUG_ON(ctx->r.listener.resp_list == NULL); ++ ++ /* No resizing fixed responses for fastrx */ + +- err = v2v_nc2_prep_message(vlc->channel, sizeof(struct v2vdrv_resp_internal), V2V_MESSAGE_TYPE_INTERNAL, 0, &msg); ++ err = v2v_nc2_prep_message(ctx->channel, sizeof(struct v2vdrv_resp_internal), V2V_MESSAGE_TYPE_INTERNAL, 0, &msg); + if (err) { + if (err == -EAGAIN) { + /* No room right now, return and try again later */ + printk("%s listener(%p) not enough buffer space to send response #%d; retry\n", +- V2VDRV_LOGTAG, vlc, vlc->tx_counter + 1); ++ V2VDRV_LOGTAG, ctx, ctx->tx_counter + 1); + return -EAGAIN; + } + printk("%s listener(%p) transmit internal response failure; abort processing - error: %d\n", +- V2VDRV_LOGTAG, vlc, err); ++ V2VDRV_LOGTAG, ctx, err); + return err; /* failure */ + } +- vlc->tx_counter++; /* next message */ +- vlri = vlc->resp_list; +- vlc->resp_list = vlri->next; +- if (!vlc->resp_list) +- vlc->resp_tail = NULL; +- +- /* Response already formed, just copy it in */ +- mb(); ++ ctx->tx_counter++; /* next message */ ++ vlri = ctx->r.listener.resp_list; ++ ctx->r.listener.resp_list = vlri->next; ++ if (!ctx->r.listener.resp_list) ++ ctx->r.listener.resp_tail = NULL; ++ ++ /* Response already formed, just copy it in */ + msgp = (uint8_t*)msg; + memcpy(msgp, vlri, sizeof(struct v2vdrv_resp_internal)); +- mb(); + kfree(vlri); + +- v2v_nc2_send_messages(vlc->channel); ++ v2v_nc2_send_messages(ctx->channel); + + /* Keep the send loop going by setting the event. If there is no more room, the prep message call + will return ERROR_RETRY and just land us back in the wait. */ +- v2v_set_wake_reason(vlc->channel, V2V_WAKE_REASON_SEND); ++ v2v_set_wake_reason(ctx->channel, V2V_WAKE_REASON_SEND); + + return 0; + } + + static void +-v2vdrv_listener_process_messages(struct v2vdrv_listener_context *vlc) ++v2vdrv_listener_process_messages_sync(struct v2vdrv_context *ctx) + { + int err = 0, done = 0; +- enum v2v_endpoint_state state; + struct v2v_wait *wait_state; + u8 reasons_mask = V2V_WAKE_REASON_CONTROL|V2V_WAKE_REASON_RECEIVE; +- u8 reasons; ++ u8 reason; + +- wait_state = v2v_get_wait_state(vlc->channel); ++ printk("%s listener(%p) started SYNC processing loop for transfer\n", V2VDRV_LOGTAG, ctx); ++ ++ wait_state = v2v_get_wait_state(ctx->channel); + + /* Start out processing loop, wait for message */ + do { +- if (vlc->resp_list) ++ if (ctx->r.listener.resp_list) + reasons_mask = V2V_WAKE_REASON_CONTROL|V2V_WAKE_REASON_RECEIVE|V2V_WAKE_REASON_SEND; + else + reasons_mask = V2V_WAKE_REASON_CONTROL|V2V_WAKE_REASON_RECEIVE; +@@ -597,102 +985,355 @@ v2vdrv_listener_process_messages(struct v2vdrv_listener_context *vlc) + wait_event(wait_state->wait_event, atomic_xchg(&wait_state->wait_condition, 0) == 1); + + do { +- reasons = v2v_get_wake_reason(vlc->channel, reasons_mask); ++ reason = v2v_get_wake_reason(ctx->channel, reasons_mask); + +- if (reasons & V2V_WAKE_REASON_CONTROL) { +- err = v2v_get_remote_state(vlc->channel, &state); +- if (err) { +- printk("%s listener(%p) failure in v2v_get_remote_state(); aborting - error: %d\n", +- V2VDRV_LOGTAG, vlc, err); +- done = 1; ++ if (reason & V2V_WAKE_REASON_CONTROL) { ++ done = v2vdrv_status_check(ctx, "listener"); ++ if (done) + break; +- } +- printk("%s listener(%p) state changed for other end - new state: %s\n", +- V2VDRV_LOGTAG, vlc, v2v_endpoint_state_name(state)); +- if (v2v_state_requests_disconnect(state)) { +- printk("%s listener(%p) main processing loop ending for disconnect request...\n", +- V2VDRV_LOGTAG, vlc); +- err = 0; +- done = 1; +- break; +- } + } +- +- if (reasons & V2V_WAKE_REASON_SEND) { +- if (vlc->resp_list) { +- err = v2vdrv_listener_process_internal_tx(vlc); ++ else if (reason & V2V_WAKE_REASON_SEND) { ++ if (ctx->r.listener.resp_list) { ++ err = v2vdrv_listener_process_internal_sync_tx(ctx); + if ((err != 0)&&(err != -EAGAIN)) { + done = 1; + break; + } + } + } +- +- if (reasons & V2V_WAKE_REASON_RECEIVE) { +- err = v2vdrv_listener_process_internal_rx(vlc); ++ else if (reason & V2V_WAKE_REASON_RECEIVE) { ++ err = v2vdrv_listener_process_internal_sync_rx(ctx); + if (err) { + done = 1; + break; + } + /* we now have data, set the event again to process sends */ +- v2v_set_wake_reason(vlc->channel, V2V_WAKE_REASON_SEND); ++ v2v_set_wake_reason(ctx->channel, V2V_WAKE_REASON_SEND); + } +- } while (reasons != V2V_WAKE_REASON_NONE); ++ } while (reason != V2V_WAKE_REASON_NONE); + + if (done) + break; + } while (1); + } + +-static void +-v2vdrv_listener_disconnect(struct v2vdrv_listener_context *vlc) ++/********************* LISTENER ASYNC ************************/ ++static int v2vdrv_listener_process_internal_async_tx(struct v2vdrv_context *ctx); ++ ++static int ++v2vdrv_listener_process_internal_async_rx(struct v2vdrv_context *ctx) + { + int err; +- uint32_t i = 0; +- struct v2vdrv_listener_resp_item *resp; ++ unsigned long flags; ++ uint32_t rxc = 0; ++ volatile void *msg; ++ size_t size; ++ unsigned vtype; ++ unsigned vflags; ++ struct v2vdrv_frame_header *header; ++ struct v2vdrv_post_internal *vpi; ++ struct v2vdrv_listener_resp_item *vlri; ++ uint8_t *pguid; ++ uint8_t sum; + +- printk("%s listener(%p) Disconnecting...\n", V2VDRV_LOGTAG, vlc); +- err = v2v_disconnect(vlc->channel); +- printk("%s listener(%p) Disconnected - status: %d\n", V2VDRV_LOGTAG, vlc, err); ++ do { ++ /* Create one up front, drop out if we can't */ ++ vlri = kmalloc(sizeof(struct v2vdrv_listener_resp_item), GFP_KERNEL); ++ if (vlri == NULL) { ++ printk("%s listener(%p) cannot queue response; out of memory\n", V2VDRV_LOGTAG, ctx); ++ err = -ENOMEM; ++ break; ++ } + +- printk("%s listener(%p) Sent message counter: %d\n", V2VDRV_LOGTAG, vlc, vlc->tx_counter); +- printk("%s listener(%p) Received response counter: %d\n", V2VDRV_LOGTAG, vlc, vlc->rx_counter); +- if (vlc->tx_counter != vlc->rx_counter) +- printk("%s listener(%p) WARNING Response count does not match the send count\n", V2VDRV_LOGTAG, vlc); ++ /* Critical section where we access the rings - have to lock this area */ ++ spin_lock_irqsave(&ctx->s.async.rx_lock, flags); + +- while (vlc->resp_list) { +- resp = vlc->resp_list; +- vlc->resp_list = resp->next; +- kfree(resp); +- i++; ++ err = v2v_nc2_get_message(ctx->channel, (const volatile void**)&msg, &size, &vtype, &vflags); ++ if (err != 0) { ++ spin_unlock_irqrestore(&ctx->s.async.rx_lock, flags); ++ kfree(vlri); ++ break; ++ } ++ ++ rxc = ++ctx->rx_counter; ++ header = (struct v2vdrv_frame_header*)msg; ++ if (!v2vdrv_message_header_check("listener", header, size, ++ sizeof(struct v2vdrv_post_internal))) { ++ v2v_nc2_finish_message(ctx->channel); ++ spin_unlock_irqrestore(&ctx->s.async.rx_lock, flags); ++ kfree(vlri); ++ err = -EBADMSG; ++ break; ++ } ++ ++ vpi = (struct v2vdrv_post_internal*)msg; ++ pguid = &vpi->guid[0]; ++ ++ /* Save the GUID and checksum and exit the critical section */ ++ vlri->resp.header.id = header->id; ++ memcpy(&vlri->resp.guid[0], pguid, sizeof(vlri->resp.guid)); ++ sum = v2vdrv_checksum((const uint8_t*)msg, header->length); ++ ++ v2v_nc2_finish_message(ctx->channel); ++ spin_unlock_irqrestore(&ctx->s.async.rx_lock, flags); ++ ++ /* Out of critical section and the buffer in the ring has been released, ++ * back; can't touch the msg any longer - do some testing and tracing */ ++ pguid = &vlri->resp.guid[0]; ++ printk("------ GUID1=%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x\n", ++ pguid[0], pguid[1], pguid[2], pguid[3], ++ pguid[4], pguid[5], pguid[6], pguid[7]); ++ printk("------ GUID2=%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x:%2.2x\n", ++ pguid[8], pguid[9], pguid[10], pguid[11], ++ pguid[12], pguid[13], pguid[14], pguid[15]); ++ if (sum != 0) ++ printk("%s listener(%p) bad checksumm on message #%d!!!\n", V2VDRV_LOGTAG, ctx, ctx->rx_counter); ++ ++ /* Finish setting up the response and queue it for sending */ ++ vlri->next = NULL; ++ vlri->resp.header.type = V2V_MESSAGE_TYPE_INTERNAL; ++ vlri->resp.header.cs = 0; ++ vlri->resp.header.length = sizeof(struct v2vdrv_resp_internal); /* header + resp data */ ++ vlri->resp.status = (sum == 0 ? V2V_MESSAGE_STATUS_OK : V2V_MESSAGE_STATUS_BADCS); ++ vlri->resp.header.cs = v2vdrv_checksum((const uint8_t*)vlri, sizeof(struct v2vdrv_resp_internal)); ++ ++ spin_lock_irqsave(&ctx->s.async.tx_lock, flags); ++ if (ctx->r.listener.resp_list) { ++ ctx->r.listener.resp_tail->next = vlri; ++ ctx->r.listener.resp_tail = vlri; ++ } ++ else { ++ ctx->r.listener.resp_list = vlri; ++ ctx->r.listener.resp_tail = vlri; ++ } ++ spin_unlock_irqrestore(&ctx->s.async.tx_lock, flags); ++ ++ } while (1); ++ ++ /* If we ended the receive handler with this code, there are no more message right ++ now. At this point we can send out any queued responses we might have queued */ ++ if (err == -ENODATA) { ++ /* No more messages */ ++ printk("%s listener(%p) no more messages, calling TX processor\n", V2VDRV_LOGTAG, ctx); ++ return v2vdrv_listener_process_internal_async_tx(ctx); + } +- if (i > 0) +- printk("%s listener(%p) WARNING Found %d unsent responses\n", V2VDRV_LOGTAG, vlc, i); ++ ++ printk("%s listener(%p) receive internal data failure; abort processing - error: %d\n", ++ V2VDRV_LOGTAG, ctx, err); ++ return err; /* failure */ + } + +-void v2vdrv_run_listener(struct v2vdrv_config *config) ++static int ++v2vdrv_listener_process_internal_async_tx(struct v2vdrv_context *ctx) + { +- struct v2vdrv_listener_context *vlc; + int err; ++ unsigned long flags; ++ uint32_t txc = 0; ++ unsigned available; ++ volatile void *msg; ++ uint8_t *msgp; ++ struct v2vdrv_listener_resp_item *vlri; + +- vlc = kmalloc(sizeof(struct v2vdrv_listener_context), GFP_KERNEL); +- if (!vlc) { +- printk("%s listener out of memory\n", V2VDRV_LOGTAG); +- return; ++ /* Loop and send any queued response */ ++ do { ++ /* Critical section where we access the rings - have to lock this area */ ++ spin_lock_irqsave(&ctx->s.async.tx_lock, flags); ++ ++ if (ctx->r.listener.resp_list == NULL) { ++ spin_unlock_irqrestore(&ctx->s.async.tx_lock, flags); ++ printk("%s listener(%p) no responses to send in internal TX handler; exiting\n", V2VDRV_LOGTAG, ctx); ++ err = 0; ++ break; ++ } ++ ++ available = v2v_nc2_producer_bytes_available(ctx->channel); ++ err = v2v_nc2_prep_message(ctx->channel, sizeof(struct v2vdrv_resp_internal), V2V_MESSAGE_TYPE_INTERNAL, 0, &msg); ++ if (err) { ++ spin_unlock_irqrestore(&ctx->s.async.tx_lock, flags); ++ if (err == -EAGAIN) { ++ /* No room right now, return and try again later */ ++ printk("%s listener(%p) not enough buffer space to send message #%d; retry\n", ++ V2VDRV_LOGTAG, ctx, ctx->tx_counter + 1); ++ err = 0; ++ } ++ else /* failure */ ++ printk("%s listener(%p) transmit internal data failure; abort processing - error: %d\n", ++ V2VDRV_LOGTAG, ctx, err); ++ break; ++ } ++ ++ txc = ++ctx->tx_counter; /* next message */ ++ vlri = ctx->r.listener.resp_list; ++ ctx->r.listener.resp_list = vlri->next; ++ if (!ctx->r.listener.resp_list) ++ ctx->r.listener.resp_tail = NULL; ++ /* Response already formed, just copy it in */ ++ msgp = (uint8_t*)msg; ++ memcpy(msgp, vlri, sizeof(struct v2vdrv_resp_internal)); ++ kfree(vlri); ++ ++ v2v_nc2_send_messages(ctx->channel); ++ spin_unlock_irqrestore(&ctx->s.async.tx_lock, flags); ++ ++ printk("%s listener(%p) sent internal response #%d\n", V2VDRV_LOGTAG, ctx, txc); ++ } while (1); ++ ++ return err; ++} ++ ++static void ++v2vdrv_listener_process_messages_async(struct v2vdrv_context *ctx) ++{ ++ int done = 0; ++ struct v2v_wait *wait_state; ++ u8 reasons_mask = V2V_WAKE_REASON_CONTROL|V2V_WAKE_REASON_TERMINATE; ++ u8 reason; ++ ++ printk("%s listener(%p) started ASYNC processing loop for transfer\n", V2VDRV_LOGTAG, ctx); ++ ++ atomic_inc(&ctx->s.async.running); ++ wait_state = v2v_get_wait_state(ctx->channel); ++ ++ /* Start out processing loop, wait for status changes */ ++ do { ++ wait_event(wait_state->wait_event, ++ atomic_xchg(&wait_state->wait_condition, 0) == 1); ++ ++ do { ++ reason = v2v_get_wake_reason(ctx->channel, reasons_mask); ++ ++ if (reason & V2V_WAKE_REASON_CONTROL) { ++ done = v2vdrv_status_check(ctx, "listener"); ++ if (done) ++ break; ++ } ++ else if (reason & V2V_WAKE_REASON_TERMINATE) { ++ /* The terminate event indicates an error since the listener side does not ++ complete or timeout through this means */ ++ printk("%s listener(%p) async handlers signalled a terminate with error status=0x%x; exiting.\n", ++ V2VDRV_LOGTAG, ctx, ctx->s.async.term_status); ++ done = 1; ++ break; ++ } ++ } while (reason != V2V_WAKE_REASON_NONE); ++ ++ if (done) ++ break; ++ ++ } while (1); ++ ++ atomic_dec(&ctx->s.async.running); ++} ++ ++/********************** ASYNC COMMON *************************/ ++static void ++v2vdrv_rx_work(struct work_struct *arg) ++{ ++ struct v2vdrv_context *ctx = container_of(arg, struct v2vdrv_context, s.async.rx_work); ++ int err; ++ ++ if (ctx->config->role == role_listener) ++ err = v2vdrv_listener_process_internal_async_rx(ctx); ++ else ++ err = v2vdrv_connector_process_internal_async_rx(ctx); ++ ++ if (err) { ++ ctx->s.async.term_status = V2V_TERM_TX_ERROR; ++ v2v_set_wake_reason(ctx->channel, V2V_WAKE_REASON_TERMINATE); + } +- memset(vlc, 0, sizeof(struct v2vdrv_listener_context)); +- vlc->config = config; ++} + +- err = v2vdrv_listen_accept(vlc); +- if (err) ++static void ++v2vdrv_tx_work(struct work_struct *arg) ++{ ++ struct v2vdrv_context *ctx = container_of(arg, struct v2vdrv_context, s.async.tx_work); ++ int err; ++ ++ if (ctx->config->role == role_listener) ++ err = v2vdrv_listener_process_internal_async_tx(ctx); ++ else ++ err = v2vdrv_connector_process_internal_async_tx(ctx); ++ ++ if (err) { ++ ctx->s.async.term_status = V2V_TERM_TX_ERROR; ++ v2v_set_wake_reason(ctx->channel, V2V_WAKE_REASON_TERMINATE); ++ } ++} ++ ++static void ++v2vdrv_timeout_cb(unsigned long ptr) ++{ ++ struct v2vdrv_context *ctx = (struct v2vdrv_context *)ptr; ++ ++ ctx->s.async.term_status = V2V_TERM_TIMEOUT; ++ v2v_set_wake_reason(ctx->channel, V2V_WAKE_REASON_TERMINATE); ++} ++ ++static void ++v2vdrv_receive_int(void *receive_ctx) ++{ ++ struct v2vdrv_context *ctx = receive_ctx; ++ ++ /* Avoid spurious interrupts before initialization is complete */ ++ if (atomic_read(&ctx->s.async.running) == 0) + return; + +- /* This runs the main processing loop, when it is done we disconnect +- and cleanup regardless of what may have occured */ +- v2vdrv_listener_process_messages(vlc); ++ /* The V2V async APIs allow raw interrupt service routines to be ++ registered to allow maximum flexibility to the client. In almost all ++ cases though, the best practice is to dispatch the work to a bottom ++ half and complete the irq asap. */ ++ schedule_work(&ctx->s.async.rx_work); ++} ++ ++static void ++v2vdrv_send_int(void *send_ctx) ++{ ++ struct v2vdrv_context *ctx = send_ctx; ++ ++ /* Avoid spurious interrupts before initialization is complete */ ++ if (atomic_read(&ctx->s.async.running) == 0) ++ return; ++ ++ /* See comments for v2vdrv_receive_int() */ ++ schedule_work(&ctx->s.async.tx_work); ++} + +- v2vdrv_listener_disconnect(vlc); ++static void ++v2vdrv_control_cb(void *control_ctx) ++{ ++ struct v2vdrv_context *ctx = control_ctx; + +- kfree(vlc); ++ /* Avoid xenstore events before initialization is complete */ ++ if (atomic_read(&ctx->s.async.running) == 0) ++ return; ++ ++ /* The async V2V functionality provides a way to register an ++ * async callback for control message processing as an alternative ++ * to waiting on the control event. For purposes of the sample, ++ * we will use the control event to keep the processing thread ++ * busy waiting for events. ++ */ ++ v2vdrv_status_check(ctx, "ctrlcb"); + } + ++static void ++v2vdrv_async_init(struct v2vdrv_context *ctx) ++{ ++ ctx->s.async.asv.receive_int = v2vdrv_receive_int; ++ ctx->s.async.asv.receive_ctx = ctx; ++ ctx->s.async.asv.send_int = v2vdrv_send_int; ++ ctx->s.async.asv.send_ctx = ctx; ++ ctx->s.async.asv.control_cb = v2vdrv_control_cb; ++ ctx->s.async.asv.control_ctx = ctx; ++ atomic_set(&ctx->s.async.running, 0); ++ INIT_WORK(&ctx->s.async.rx_work, v2vdrv_rx_work); ++ INIT_WORK(&ctx->s.async.tx_work, v2vdrv_tx_work); ++ if (ctx->config->role == role_connector) { ++ init_timer(&ctx->s.async.to_timer); ++ ctx->s.async.to_timer.data = (unsigned long)ctx; ++ ctx->s.async.to_timer.function = v2vdrv_timeout_cb; ++ } ++ spin_lock_init(&ctx->s.async.rx_lock); ++ spin_lock_init(&ctx->s.async.tx_lock); ++ ctx->asvp = &ctx->s.async.asv; ++} +diff --git a/drivers/xen/v2v/v2vutl.c b/drivers/xen/v2v/v2vutl.c +index 3dc36c7..454157c 100644 +--- a/drivers/xen/v2v/v2vutl.c ++++ b/drivers/xen/v2v/v2vutl.c +@@ -223,4 +223,3 @@ v2v_xenops_grant_unmap(struct vm_struct *vm_area, grant_handle_t *ghandles, unsi + + free_vm_area(vm_area); + } +- +diff --git a/include/xen/v2v.h b/include/xen/v2v.h +index abef7be..5233f80 100644 +--- a/include/xen/v2v.h ++++ b/include/xen/v2v.h +@@ -49,6 +49,43 @@ + */ + struct v2v_channel; + ++/* Input structure used for initilializing asynchronous v2v ++ * comminucations. The @receive_int and @send_int values must be provided. ++ * These routines will be called back for receive and send event ++ * notification. The @control_cb may or may not be set depending on whether ++ * the caller wants to get control callbacks or use the control event through ++ * returned by v2v_get_control_event to process changes in control state. ++ * The @control_ctx will be passed in to the control callback routine when ++ * called. ++ * ++ * Note that the @receive_int and @send_int callbacks are actually in the ++ * context of the interrupt service routines for the event channels. This is ++ * done to allow the v2v client the most flexibility in processing events ++ * including doing work in the interrupt context (though this should be ++ * avoided). What this mainly allows is for the client to determine what ++ * bottom half processing it wants to emply. In most cases, the client's ++ * event handler callbacks would simply do something like the following: ++ * ++ * schedule_work(&my_context->my_work); ++ * ++ * And immediately returning; doing the actual event processing in the ++ * work handler. ++ * ++ * Listening, connecting and accepting the v2v channel for asynchronous ++ * operations is done the same way as for synchronous operations (except ++ * for providing this structure to the APIs). Once the channel is established, ++ * the client using v2v in asynchronous mode will be called back to indicate ++ * receipt of data or space availability for further sending. ++ */ ++struct v2v_async { ++ void (*receive_int)(void *receive_ctx); ++ void *receive_ctx; ++ void (*send_int)(void *send_ctx); ++ void *send_ctx; ++ void (*control_cb)(void *control_ctx); ++ void *control_ctx; ++}; ++ + /* Wait state structure returned by v2v_get_wait_state. See this + * function for more details. + */ +@@ -67,12 +104,17 @@ struct v2v_wait { + * It is generally a mistake to have several processes listening on + * the same xenbus prefix. + * ++ * Passing a valid @async_values structure to this routine will ++ * initialize the endpoint for ascynchronous communications operation. ++ * To use synchronous operations, NULL should be passed. ++ * + * Returns 0 on success and an appropriate errno code on failure. + */ + int v2v_listen(const char *xenbus_prefix, + struct v2v_channel **channel, + unsigned prod_ring_page_order, +- unsigned cons_ring_page_order); ++ unsigned cons_ring_page_order, ++ struct v2v_async *async_values); + + /* Wait for a remote domain to connect to the channel @channel, which + * should have been allocated with v2v_listen(). +@@ -90,10 +132,15 @@ int v2v_accept(struct v2v_channel *channel); + * This will fail if the remote endpoint is not currently listening on + * the specified channel. + * ++ * Passing a valid @async_values structure to this routine will ++ * initialize the endpoint for ascynchronous communications operation. ++ * To use synchronous operations, NULL should be passed. ++ * + * Returns 0 on success and an appropriate errno code on failure. + */ + int v2v_connect(const char *xenbus_prefix, +- struct v2v_channel **channel); ++ struct v2v_channel **channel, ++ struct v2v_async *async_values); + + /* Disconnect from a VM-to-VM channel @channel which was previously + * established using either v2v_connect() or v2v_listen(). The channel +@@ -202,11 +249,16 @@ const char *v2v_endpoint_state_name(enum v2v_endpoint_state state); + /* Wake reasons. The values can be OR'd together in the functions below + * to specify more than one wait reason. + */ +-#define V2V_WAKE_REASON_NONE 0x00 +-#define V2V_WAKE_REASON_CONTROL 0x01 +-#define V2V_WAKE_REASON_SEND 0x02 +-#define V2V_WAKE_REASON_RECEIVE 0x04 +-#define V2V_WAKE_REASON_ANY 0xFF ++#define V2V_WAKE_REASON_NONE 0x00 ++#define V2V_WAKE_REASON_CONTROL 0x01 ++#define V2V_WAKE_REASON_SEND 0x02 ++#define V2V_WAKE_REASON_RECEIVE 0x04 ++#define V2V_WAKE_REASON_RESERVED 0x08 ++#define V2V_WAKE_REASON_USER1 0x10 ++#define V2V_WAKE_REASON_USER2 0x20 ++#define V2V_WAKE_REASON_USER3 0x40 ++#define V2V_WAKE_REASON_USER4 0x80 ++#define V2V_WAKE_REASON_ANY 0xFF + + /* Retrieve the wait state object for the @channel. This object allows + * a thread to wait until one of several conditions is true. Normally +@@ -235,19 +287,29 @@ const char *v2v_endpoint_state_name(enum v2v_endpoint_state state); + * V2V_WAKE_REASON_RECEIVE: This reason is set when data is recieved. Note + * that it may sometimes be set when there are no messages incoming, and + * will remain so until v2v_nc2_get_message() is called. ++ * ++ * V2V_WAKE_REASON_USERn: These 4 wait reasons can be used by clients ++ * to add user defined events to the queue. These wake reasons will ++ * never be set internally by the V2V library. ++ * ++ * For asynchronous operation, the wait state object will be used only ++ * for V2V_WAKE_REASON_CONTROL events. + */ + struct v2v_wait *v2v_get_wait_state(struct v2v_channel *channel); + + /* Retrive the @reasons that a wait was satisfied. The @reasons value +- * is an OR'ed list list of the V2V_WAKE_REASONE_* values above. Each ++ * is an OR'ed list list of the V2V_WAKE_REASON_* values above. Each + * call to v2v_get_wake_reason will return that next wake reason in the + * internal queue that matches the @reasons mask. This routine can be + * called multiple times before and after a wait depending on the specific + * use case involved. + * +- * Returns one of V2V_WAKE_REASONE_CONTROL, V2V_WAKE_REASONE_SEND or +- * V2V_WAKE_REASONE_RECEIVE from the internal queue. When no more wake ++ * Returns one of V2V_WAKE_REASON_CONTROL, V2V_WAKE_REASON_SEND or ++ * V2V_WAKE_REASON_RECEIVE from the internal queue. When no more wake + * reasons are present, V2V_WAKE_REASON_NONE is returned. ++ * ++ * For asynchronous operation, only V2V_WAKE_REASON_CONTROL or ++ * V2V_WAKE_REASON_NONE would be returned. + */ + u8 v2v_get_wake_reason(struct v2v_channel *channel, u8 reasons); + +@@ -258,7 +320,7 @@ u8 v2v_get_wake_reason(struct v2v_channel *channel, u8 reasons); + */ + int v2v_set_wake_reason(struct v2v_channel *channel, u8 reason); + +-/* This routine can be used to explicitly set a wake @reason in the ++/* This routine can be used to explicitly clear wake @reasons in the + * internal queue. + */ + void v2v_clear_wake_reason(struct v2v_channel *channel, u8 reasons); +@@ -271,6 +333,11 @@ void v2v_clear_wake_reason(struct v2v_channel *channel, u8 reasons); + * written to, and a malicious remote could cause its contents to + * change at any time. + * ++ * When using asynchronous operations mode, the caller must provide ++ * locking (mutual exclusion) to this routine and the finalizing call ++ * to v2v_nc2_finish_message() within the same locked section (using the ++ * same synchronization object). ++ * + * Returns 0 on success. If no more messages are available, -ENODATA + * is returned. On failure an errno error code is returned. + * +@@ -288,6 +355,11 @@ int v2v_nc2_get_message(struct v2v_channel *channel, + * released, so that the remote can use it to send another message, + * and the channel advances to the next incoming message. + * ++ * When using asynchronous operations mode, the caller must provide ++ * locking (mutual exclusion) to this routine and the initializing call ++ * to v2v_nc2_get_message() within the same locked section (using the ++ * same synchronization object). ++ * + * The payload returned by v2v_nc2_get_message() must not be touched + * once this returns. + */ +@@ -302,6 +374,11 @@ void v2v_nc2_finish_message(struct v2v_channel *channel); + * The message is not actually sent until v2v_nc2_send_messages() is + * called. + * ++ * When using asynchronous operations mode, the caller must provide ++ * locking (mutual exclusion) to this routine and the finalizing call ++ * to v2v_nc2_send_messages() within the same locked section (using the ++ * same synchronization object). ++ * + * If there is insufficient space in the ring, this routine requests + * that the remote endpoint set the local ring's send event when more + * space becomes available, the call returns -EAGAIN. +@@ -319,6 +396,11 @@ int v2v_nc2_prep_message(struct v2v_channel *channel, + * (incorrectly behaving remote endpoints can always look ahead) and + * the remote is woken up if appropriate. + * ++ * When using asynchronous operations mode, the caller must provide ++ * locking (mutual exclusion) to this routine and the initializing call ++ * to v2v_nc2_prep_message() within the same locked section (using the ++ * same synchronization object). ++ * + * The client must not touch the payload pointers returned by + * v2v_nc2_prep_message() after calling this function. + */