]> xenbits.xensource.com Git - people/liuw/libxenctrl-split/libvirt.git/commitdiff
Ensure that EOF is dispatched to the stream callback
authorDaniel P. Berrange <berrange@redhat.com>
Tue, 28 Jun 2011 16:58:04 +0000 (17:58 +0100)
committerDaniel P. Berrange <berrange@redhat.com>
Wed, 29 Jun 2011 10:08:59 +0000 (11:08 +0100)
When the remote client receives end of file on the stream
it never invokes the stream callback. Applications relying
on async event driven I/O will thus never see the EOF
condition on the stream

* src/rpc/virnetclient.c, src/rpc/virnetclientstream.c:
  Ensure EOF is dispatched

src/rpc/virnetclient.c
src/rpc/virnetclientstream.c

index dc0ce5107b5dec4be9cced302706dc437ab57fff..39bdf14459b26869da3778d7d24e08fddf383942 100644 (file)
@@ -580,9 +580,6 @@ static int virNetClientCallDispatchStream(virNetClientPtr client)
         if (thecall && thecall->expectReply) {
             VIR_DEBUG("Got sync data packet completion");
             thecall->mode = VIR_NET_CLIENT_MODE_COMPLETE;
-        } else {
-            // XXX
-            //remoteStreamEventTimerUpdate(privst);
         }
         return 0;
     }
index 9da5aeec44703f17abc4ea69d01023bb2b5d55c1..d5efab12e5125e2660e9cf091d4c84751b050f67 100644 (file)
@@ -55,6 +55,7 @@ struct _virNetClientStream {
     char *incoming;
     size_t incomingOffset;
     size_t incomingLength;
+    bool incomingEOF;
 
     virNetClientStreamEventCallback cb;
     void *cbOpaque;
@@ -73,7 +74,7 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st)
 
     VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents);
 
-    if ((st->incomingOffset &&
+    if (((st->incomingOffset || st->incomingEOF) &&
          (st->cbEvents & VIR_STREAM_EVENT_READABLE)) ||
         (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) {
         VIR_DEBUG("Enabling event timer");
@@ -96,7 +97,7 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
 
     if (st->cb &&
         (st->cbEvents & VIR_STREAM_EVENT_READABLE) &&
-        st->incomingOffset)
+        (st->incomingOffset || st->incomingEOF))
         events |= VIR_STREAM_EVENT_READABLE;
     if (st->cb &&
         (st->cbEvents & VIR_STREAM_EVENT_WRITABLE))
@@ -284,24 +285,30 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st,
 
     virMutexLock(&st->lock);
     need = msg->bufferLength - msg->bufferOffset;
-    size_t avail = st->incomingLength - st->incomingOffset;
-    if (need > avail) {
-        size_t extra = need - avail;
-        if (VIR_REALLOC_N(st->incoming,
-                          st->incomingLength + extra) < 0) {
-            VIR_DEBUG("Out of memory handling stream data");
-            goto cleanup;
+    if (need) {
+        size_t avail = st->incomingLength - st->incomingOffset;
+        if (need > avail) {
+            size_t extra = need - avail;
+            if (VIR_REALLOC_N(st->incoming,
+                              st->incomingLength + extra) < 0) {
+                VIR_DEBUG("Out of memory handling stream data");
+                goto cleanup;
+            }
+            st->incomingLength += extra;
         }
-        st->incomingLength += extra;
-    }
 
-    memcpy(st->incoming + st->incomingOffset,
-           msg->buffer + msg->bufferOffset,
-           msg->bufferLength - msg->bufferOffset);
-    st->incomingOffset += (msg->bufferLength - msg->bufferOffset);
+        memcpy(st->incoming + st->incomingOffset,
+               msg->buffer + msg->bufferOffset,
+               msg->bufferLength - msg->bufferOffset);
+        st->incomingOffset += (msg->bufferLength - msg->bufferOffset);
+    } else {
+        st->incomingEOF = true;
+    }
 
-    VIR_DEBUG("Stream incoming data offset %zu length %zu",
-              st->incomingOffset, st->incomingLength);
+    VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d",
+              st->incomingOffset, st->incomingLength,
+              st->incomingEOF);
+    virNetClientStreamEventTimerUpdate(st);
 
     ret = 0;
 
@@ -372,7 +379,7 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
     VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d",
               st, client, data, nbytes, nonblock);
     virMutexLock(&st->lock);
-    if (!st->incomingOffset) {
+    if (!st->incomingOffset && !st->incomingEOF) {
         virNetMessagePtr msg;
         int ret;