virNetMessagePtr msg;
bool expectReply;
+ bool nonBlock;
+ bool haveThread;
+ bool sentSomeData;
virCond cond;
int wakeupSendFD;
int wakeupReadFD;
- /* List of threads currently waiting for dispatch */
+ /*
+ * List of calls currently waiting for dispatch
+ * The calls should all have threads waiting for
+ * them, except possibly the first call in the list
+ * which might be a partially sent non-blocking call.
+ */
virNetClientCallPtr waitDispatch;
/* True if a thread holds the buck */
bool haveTheBuck;
virNetClientCallPtr thecall;
/* Ok, definitely got an RPC reply now find
- out who's been waiting for it */
+ out which waiting call is associated with it */
thecall = client->waitDispatch;
while (thecall &&
!(thecall->msg->header.prog == client->msg.header.prog &&
ret = virNetSocketWrite(client->sock,
thecall->msg->buffer + thecall->msg->bufferOffset,
thecall->msg->bufferLength - thecall->msg->bufferOffset);
+ if (ret > 0 || virNetSocketHasPendingData(client->sock))
+ thecall->sentSomeData = true;
if (ret <= 0)
return ret;
return false;
/*
- * ...they won't actually wakeup until
+ * ...if the call being removed from the list
+ * still has a thread, then wake that thread up,
+ * otherwise free the call. The latter should
+ * only happen for calls without replies.
+ *
+ * ...the threads won't actually wakeup until
* we release our mutex a short while
* later...
*/
- VIR_DEBUG("Waking up sleeping call %p", call);
- virCondSignal(&call->cond);
+ if (call->haveThread) {
+ VIR_DEBUG("Waking up sleep %p", call);
+ virCondSignal(&call->cond);
+ } else {
+ if (call->expectReply)
+ VIR_WARN("Got a call expecting a reply but without a waiting thread");
+ ignore_value(virCondDestroy(&call->cond));
+ VIR_FREE(call->msg);
+ VIR_FREE(call);
+ }
return true;
}
+static bool virNetClientIOEventLoopRemoveNonBlocking(virNetClientCallPtr call,
+ void *opaque)
+{
+ virNetClientCallPtr thiscall = opaque;
+
+ if (call == thiscall)
+ return false;
+
+ if (!call->nonBlock)
+ return false;
+
+ if (call->sentSomeData) {
+ /*
+ * If some data has been sent we must keep it in the list,
+ * but still wakeup any thread
+ */
+ if (call->haveThread) {
+ VIR_DEBUG("Waking up sleep %p", call);
+ virCondSignal(&call->cond);
+ }
+ return false;
+ } else {
+ /*
+ * If no data has been sent, we can remove it from the list.
+ * Wakup any thread, otherwise free the caller ourselves
+ */
+ if (call->haveThread) {
+ VIR_DEBUG("Waking up sleep %p", call);
+ virCondSignal(&call->cond);
+ } else {
+ if (call->expectReply)
+ VIR_WARN("Got a call expecting a reply but without a waiting thread");
+ ignore_value(virCondDestroy(&call->cond));
+ VIR_FREE(call->msg);
+ VIR_FREE(call);
+ }
+ return true;
+ }
+}
+
+
static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetClientCallPtr thiscall)
{
VIR_DEBUG("Giving up the buck %p", thiscall);
/* See if someone else is still waiting
* and if so, then pass the buck ! */
while (tmp) {
- if (tmp != thiscall) {
+ if (tmp != thiscall && tmp->haveThread) {
VIR_DEBUG("Passing the buck to %p", tmp);
virCondSignal(&tmp->cond);
break;
}
tmp = tmp->next;
}
+ VIR_DEBUG("No thread to pass the buck to");
+}
+
+
+static bool virNetClientIOEventLoopWantNonBlock(virNetClientCallPtr call, void *opaque ATTRIBUTE_UNUSED)
+{
+ return call->nonBlock;
}
/*
* Process all calls pending dispatch/receive until we
* get a reply to our own call. Then quit and pass the buck
* to someone else.
+ *
+ * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true),
+ * 0 if nothing sent (only for nonBlock==true) and -1 on error
*/
static int virNetClientIOEventLoop(virNetClientPtr client,
virNetClientCallPtr thiscall)
if (virNetSocketHasCachedData(client->sock))
timeout = 0;
+ /* If there are any non-blocking calls in the queue,
+ * then we don't want to sleep in poll()
+ */
+ if (virNetClientCallMatchPredicate(client->waitDispatch,
+ virNetClientIOEventLoopWantNonBlock,
+ NULL))
+ timeout = 0;
+
fds[0].events = fds[0].revents = 0;
fds[1].events = fds[1].revents = 0;
/* If we have existing SASL decoded data, pretend
* the socket became readable so we consume it
*/
- if (virNetSocketHasCachedData(client->sock))
+ if (virNetSocketHasCachedData(client->sock)) {
fds[0].revents |= POLLIN;
+ }
if (fds[1].revents) {
VIR_DEBUG("Woken up from poll by other thread");
}
if (ret < 0) {
+ /* XXX what's this dubious errno check doing ? */
if (errno == EWOULDBLOCK)
continue;
virReportSystemError(errno,
goto error;
}
- /* Iterate through waiting threads and if
- * any are complete then tell 'em to wakeup
+ /* Iterate through waiting calls and if any are
+ * complete, remove them from the dispatch list..
*/
virNetClientCallRemovePredicate(&client->waitDispatch,
virNetClientIOEventLoopRemoveDone,
thiscall);
+ /* Iterate through waiting calls and if any are
+ * non-blocking, remove them from the dispatch list...
+ */
+ virNetClientCallRemovePredicate(&client->waitDispatch,
+ virNetClientIOEventLoopRemoveNonBlocking,
+ thiscall);
+
/* Now see if *we* are done */
if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) {
virNetClientCallRemove(&client->waitDispatch, thiscall);
virNetClientIOEventLoopPassTheBuck(client, thiscall);
- return 0;
+ return 2;
}
+ /* We're not done, but we're non-blocking */
+ if (thiscall->nonBlock) {
+ virNetClientIOEventLoopPassTheBuck(client, thiscall);
+ return thiscall->sentSomeData ? 1 : 0;
+ }
if (fds[0].revents & (POLLHUP | POLLERR)) {
virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
* a strategy in power politics when the actions of one country/
* nation are blamed on another, providing an opportunity for war."
*
- * NB(5) Don't Panic!
+ * NB(5) If the 'thiscall' has the 'nonBlock' flag set, the caller
+ * must *NOT* free it, if this returns '1' (ie partial send).
+ *
+ * NB(6) The following input states are valid if *no* threads
+ * are currently executing this method
+ *
+ * - waitDispatch == NULL,
+ * - waitDispatch != NULL, waitDispatch.nonBlock == true
+ *
+ * The following input states are valid, if n threads are currently
+ * executing
+ *
+ * - waitDispatch != NULL
+ * - 0 or 1 waitDispatch.nonBlock == false, without any threads
+ * - 0 or more waitDispatch.nonBlock == false, with threads
+ *
+ * The following output states are valid when all threads are done
+ *
+ * - waitDispatch == NULL,
+ * - waitDispatch != NULL, waitDispatch.nonBlock == true
+ *
+ * NB(7) Don't Panic!
+ *
+ * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true),
+ * 0 if nothing sent (only for nonBlock==true) and -1 on error
*/
static int virNetClientIO(virNetClientPtr client,
virNetClientCallPtr thiscall)
}
VIR_DEBUG("Wokeup from sleep %p %p", client->waitDispatch, thiscall);
- /* Two reasons we can be woken up
+ /* Three reasons we can be woken up
* 1. Other thread has got our reply ready for us
* 2. Other thread is all done, and it is our turn to
* be the dispatcher to finish waiting for
* our reply
+ * 3. I/O was expected to block
*/
if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) {
- rv = 0;
+ rv = 2;
/*
* We avoided catching the buck and our reply is ready !
* We've already had 'thiscall' removed from the list
goto cleanup;
}
+ /* If we're non-blocking, get outta here */
+ if (thiscall->nonBlock) {
+ if (thiscall->sentSomeData)
+ rv = 1; /* In progress */
+ else
+ rv = 0; /* none at all */
+ goto cleanup;
+ }
+
/* Grr, someone passed the buck onto us ... */
}
}
+/*
+ * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true),
+ * 0 if nothing sent (only for nonBlock==true) and -1 on error
+ */
static int virNetClientSendInternal(virNetClientPtr client,
virNetMessagePtr msg,
- bool expectReply)
+ bool expectReply,
+ bool nonBlock)
{
virNetClientCallPtr call;
int ret = -1;
return -1;
}
+ if (expectReply && nonBlock) {
+ virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("Attempt to send an non-blocking message with a synchronous reply"));
+ return -1;
+ }
+
if (VIR_ALLOC(call) < 0) {
virReportOOMError();
return -1;
call->mode = VIR_NET_CLIENT_MODE_WAIT_RX;
call->msg = msg;
call->expectReply = expectReply;
+ call->nonBlock = nonBlock;
+ call->haveThread = true;
ret = virNetClientIO(client, call);
cleanup:
- ignore_value(virCondDestroy(&call->cond));
- VIR_FREE(call);
+ /* If partially sent, then the call is still on the dispatch queue */
+ if (ret == 1) {
+ call->haveThread = false;
+ } else {
+ ignore_value(virCondDestroy(&call->cond));
+ VIR_FREE(call);
+ }
virNetClientUnlock(client);
return ret;
}
+
/*
* @msg: a message allocated on heap or stack
*
int virNetClientSendWithReply(virNetClientPtr client,
virNetMessagePtr msg)
{
- int ret = virNetClientSendInternal(client, msg, true);
+ int ret = virNetClientSendInternal(client, msg, true, false);
if (ret < 0)
return -1;
return 0;
int virNetClientSendNoReply(virNetClientPtr client,
virNetMessagePtr msg)
{
- int ret = virNetClientSendInternal(client, msg, false);
+ int ret = virNetClientSendInternal(client, msg, false, false);
if (ret < 0)
return -1;
return 0;
}
+
+/*
+ * @msg: a message allocated on the heap.
+ *
+ * Send a message asynchronously, without any reply
+ *
+ * The caller is responsible for free'ing @msg, *except* if
+ * this method returns -1.
+ *
+ * Returns 2 on full send, 1 on partial send, 0 on no send, -1 on error
+ */
+int virNetClientSendNonBlock(virNetClientPtr client,
+ virNetMessagePtr msg)
+{
+ return virNetClientSendInternal(client, msg, false, true);
+}