]> xenbits.xensource.com Git - libvirt.git/commitdiff
Fix tracking of RPC messages wrt streams
authorDaniel P. Berrange <berrange@redhat.com>
Wed, 31 Aug 2011 16:42:58 +0000 (17:42 +0100)
committerDaniel P. Berrange <berrange@redhat.com>
Thu, 1 Sep 2011 09:52:35 +0000 (10:52 +0100)
Commit 2c85644b0b51fbe5b6244e6773531af29933a727 attempted to
fix a problem with tracking RPC messages from streams by doing

-            if (msg->header.type == VIR_NET_REPLY) {
+            if (msg->header.type == VIR_NET_REPLY ||
+                (msg->header.type == VIR_NET_STREAM &&
+                 msg->header.status != VIR_NET_CONTINUE)) {
                 client->nrequests--;

In other words any stream packet, with status NET_OK or NET_ERROR
would cause nrequests to be decremented. This is great if the
packet from from a synchronous virStreamFinish or virStreamAbort
API call, but wildly wrong if from a server initiated abort.
The latter resulted in 'nrequests' being decremented below zero.
This then causes all I/O for that client to be stopped.

Instead of trying to infer whether we need to decrement the
nrequests field, from the message type/status, introduce an
explicit 'bool tracked' field to mark whether the virNetMessagePtr
object is subject to tracking.

Also add a virNetMessageClear function to allow a message
contents to be cleared out, without adversely impacting the
'tracked' field as a naive memset() would do

* src/rpc/virnetmessage.c, src/rpc/virnetmessage.h: Add
  a 'bool tracked' field and virNetMessageClear() API
* daemon/remote.c, daemon/stream.c, src/rpc/virnetclientprogram.c,
  src/rpc/virnetclientstream.c, src/rpc/virnetserverclient.c,
  src/rpc/virnetserverprogram.c: Switch over to use
  virNetMessageClear() and pass in the 'bool tracked' value
  when creating messages.

daemon/remote.c
daemon/stream.c
src/rpc/virnetclientprogram.c
src/rpc/virnetclientstream.c
src/rpc/virnetmessage.c
src/rpc/virnetmessage.h
src/rpc/virnetserverclient.c
src/rpc/virnetserverprogram.c

index 34c63648dc570c4627df868afe182ecf93f49bca..d5ead81e01db23a574722cfa8a623c0acad2618f 100644 (file)
@@ -2495,7 +2495,7 @@ remoteDispatchDomainEventSend(virNetServerClientPtr client,
 {
     virNetMessagePtr msg;
 
-    if (!(msg = virNetMessageNew()))
+    if (!(msg = virNetMessageNew(false)))
         goto cleanup;
 
     msg->header.prog = virNetServerProgramGetID(program);
index ba3adc21c902bc57b4b02c65e6f31552f487060c..e3214c2f5704d791a3e7b2d7f372c3053299d959 100644 (file)
@@ -207,7 +207,7 @@ daemonStreamEvent(virStreamPtr st, int events, void *opaque)
             virNetError(VIR_ERR_RPC,
                         "%s", _("stream had I/O failure"));
 
-        msg = virNetMessageNew();
+        msg = virNetMessageNew(false);
         if (!msg) {
             ret = -1;
         } else {
@@ -344,7 +344,7 @@ int daemonFreeClientStream(virNetServerClientPtr client,
         virNetMessagePtr tmp = msg->next;
         if (client) {
             /* Send a dummy reply to free up 'msg' & unblock client rx */
-            memset(msg, 0, sizeof(*msg));
+            virNetMessageClear(msg);
             msg->header.type = VIR_NET_REPLY;
             if (virNetServerClientSendMessage(client, msg) < 0) {
                 virNetServerClientImmediateClose(client);
@@ -653,7 +653,7 @@ daemonStreamHandleWrite(virNetServerClientPtr client,
          * its active request count / throttling
          */
         if (msg->header.status == VIR_NET_CONTINUE) {
-            memset(msg, 0, sizeof(*msg));
+            virNetMessageClear(msg);
             msg->header.type = VIR_NET_REPLY;
             if (virNetServerClientSendMessage(client, msg) < 0) {
                 virNetMessageFree(msg);
@@ -715,7 +715,7 @@ daemonStreamHandleRead(virNetServerClientPtr client,
 
         memset(&rerr, 0, sizeof(rerr));
 
-        if (!(msg = virNetMessageNew()))
+        if (!(msg = virNetMessageNew(false)))
             ret = -1;
         else
             ret = virNetServerProgramSendStreamError(remoteProgram,
@@ -729,7 +729,7 @@ daemonStreamHandleRead(virNetServerClientPtr client,
         stream->tx = 0;
         if (ret == 0)
             stream->recvEOF = 1;
-        if (!(msg = virNetMessageNew()))
+        if (!(msg = virNetMessageNew(false)))
             ret = -1;
 
         if (msg) {
index c39520abfd831fe83b80db487bec5ee4db7fed9d..a07b744d3cfc3b404e4a3e88f348b1357112e159 100644 (file)
@@ -272,7 +272,7 @@ int virNetClientProgramCall(virNetClientProgramPtr prog,
 {
     virNetMessagePtr msg;
 
-    if (!(msg = virNetMessageNew()))
+    if (!(msg = virNetMessageNew(false)))
         return -1;
 
     msg->header.prog = prog->program;
index fe15acdf614da936c2405bc3451abbe0b9173ff4..2cc84d40dc31712640e2f06402c6b6f1f5f0a774 100644 (file)
@@ -328,7 +328,7 @@ int virNetClientStreamSendPacket(virNetClientStreamPtr st,
     bool wantReply;
     VIR_DEBUG("st=%p status=%d data=%p nbytes=%zu", st, status, data, nbytes);
 
-    if (!(msg = virNetMessageNew()))
+    if (!(msg = virNetMessageNew(false)))
         return -1;
 
     virMutexLock(&st->lock);
@@ -390,7 +390,7 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
             goto cleanup;
         }
 
-        if (!(msg = virNetMessageNew())) {
+        if (!(msg = virNetMessageNew(false))) {
             virReportOOMError();
             goto cleanup;
         }
index 072549190bb402a17c6afc18b3329eeb7f2c8342..a1ae9c4d7b5a32c83d1aff2695e165fe573143d7 100644 (file)
@@ -32,7 +32,7 @@
     virReportErrorHelper(VIR_FROM_THIS, code, __FILE__,           \
                          __FUNCTION__, __LINE__, __VA_ARGS__)
 
-virNetMessagePtr virNetMessageNew(void)
+virNetMessagePtr virNetMessageNew(bool tracked)
 {
     virNetMessagePtr msg;
 
@@ -41,11 +41,21 @@ virNetMessagePtr virNetMessageNew(void)
         return NULL;
     }
 
-    VIR_DEBUG("msg=%p", msg);
+    msg->tracked = tracked;
+    VIR_DEBUG("msg=%p tracked=%d", msg, tracked);
 
     return msg;
 }
 
+
+void virNetMessageClear(virNetMessagePtr msg)
+{
+    bool tracked = msg->tracked;
+    memset(msg, 0, sizeof(*msg));
+    msg->tracked = tracked;
+}
+
+
 void virNetMessageFree(virNetMessagePtr msg)
 {
     if (!msg)
index 2aae3f6499ce1476c7c3eae44bf2c43ecebf27da..307a0413ef4e90521c971fe50b00c4b9e91de557 100644 (file)
@@ -35,6 +35,8 @@ typedef void (*virNetMessageFreeCallback)(virNetMessagePtr msg, void *opaque);
  * use virNetMessageNew() to allocate on the heap
  */
 struct _virNetMessage {
+    bool tracked;
+
     char buffer[VIR_NET_MESSAGE_MAX + VIR_NET_MESSAGE_LEN_MAX];
     size_t bufferLength;
     size_t bufferOffset;
@@ -48,7 +50,9 @@ struct _virNetMessage {
 };
 
 
-virNetMessagePtr virNetMessageNew(void);
+virNetMessagePtr virNetMessageNew(bool tracked);
+
+void virNetMessageClear(virNetMessagePtr);
 
 void virNetMessageFree(virNetMessagePtr msg);
 
index a73b06d692438349db1c539793b9d6e708101a9c..412814def9b8f4989a09812200f0c59c742c3b24 100644 (file)
@@ -277,7 +277,7 @@ virNetServerClientCheckAccess(virNetServerClientPtr client)
         return -1;
     }
 
-    if (!(confirm = virNetMessageNew()))
+    if (!(confirm = virNetMessageNew(false)))
         return -1;
 
     /* Checks have succeeded.  Write a '\1' byte back to the client to
@@ -323,7 +323,7 @@ virNetServerClientPtr virNetServerClientNew(virNetSocketPtr sock,
         virNetTLSContextRef(tls);
 
     /* Prepare one for packet receive */
-    if (!(client->rx = virNetMessageNew()))
+    if (!(client->rx = virNetMessageNew(true)))
         goto error;
     client->rx->bufferLength = VIR_NET_MESSAGE_LEN_MAX;
     client->nrequests = 1;
@@ -805,7 +805,7 @@ readmore:
 
         /* Possibly need to create another receive buffer */
         if (client->nrequests < client->nrequests_max) {
-            if (!(client->rx = virNetMessageNew())) {
+            if (!(client->rx = virNetMessageNew(true))) {
                 client->wantClose = true;
             } else {
                 client->rx->bufferLength = VIR_NET_MESSAGE_LEN_MAX;
@@ -885,16 +885,14 @@ virNetServerClientDispatchWrite(virNetServerClientPtr client)
             /* Get finished msg from head of tx queue */
             msg = virNetMessageQueueServe(&client->tx);
 
-            if (msg->header.type == VIR_NET_REPLY ||
-                (msg->header.type == VIR_NET_STREAM &&
-                 msg->header.status != VIR_NET_CONTINUE)) {
+            if (msg->tracked) {
                 client->nrequests--;
                 /* See if the recv queue is currently throttled */
                 if (!client->rx &&
                     client->nrequests < client->nrequests_max) {
                     /* Ready to recv more messages */
+                    virNetMessageClear(msg);
                     client->rx = msg;
-                    memset(client->rx, 0, sizeof(*client->rx));
                     client->rx->bufferLength = VIR_NET_MESSAGE_LEN_MAX;
                     msg = NULL;
                     client->nrequests++;
index 2e9e3f7a156b8e82c5fd488eb132dc2627245e2b..643a97dba026b31c6578b38aa88f87cc87b0cbad 100644 (file)
@@ -284,7 +284,7 @@ int virNetServerProgramDispatch(virNetServerProgramPtr prog,
         VIR_INFO("Ignoring unexpected stream data serial=%d proc=%d status=%d",
                  msg->header.serial, msg->header.proc, msg->header.status);
         /* Send a dummy reply to free up 'msg' & unblock client rx */
-        memset(msg, 0, sizeof(*msg));
+        virNetMessageClear(msg);
         msg->header.type = VIR_NET_REPLY;
         if (virNetServerClientSendMessage(client, msg) < 0) {
             ret = -1;