]> xenbits.xensource.com Git - people/liuw/libxenctrl-split/libvirt.git/commitdiff
rpc: Fix slow volume download (virsh vol-download)
authorOssi Herrala <oherrala@gmail.com>
Mon, 20 Jul 2015 12:44:32 +0000 (12:44 +0000)
committerMartin Kletzander <mkletzan@redhat.com>
Mon, 3 Aug 2015 11:08:00 +0000 (13:08 +0200)
Use I/O vector (iovec) instead of one huge memory buffer as suggested
in https://bugzilla.redhat.com/show_bug.cgi?id=1026137#c7. This avoids
doing memmove() to big buffers and performance doesn't degrade if
source (virNetClientStreamQueuePacket()) is faster than sink
(virNetClientStreamRecvPacket()).

Resolves: http://bugzilla.redhat.com/1026137

Signed-off-by: Martin Kletzander <mkletzan@redhat.com>
src/rpc/virnetclientstream.c

index b428f4b4d575c234abcbe160eb324fa157a23d39..1cc9002b0ad740583f9a9a7be8a338cdf3c1632f 100644 (file)
@@ -49,9 +49,9 @@ struct _virNetClientStream {
      * time by stopping consuming any incoming data
      * off the socket....
      */
-    char *incoming;
-    size_t incomingOffset;
-    size_t incomingLength;
+    struct iovec *incomingVec; /* I/O Vector to hold data */
+    size_t writeVec;           /* Vectors produced */
+    size_t readVec;            /* Vectors consumed */
     bool incomingEOF;
 
     virNetClientStreamEventCallback cb;
@@ -86,9 +86,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st)
     if (!st->cb)
         return;
 
-    VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents);
+    VIR_DEBUG("Check timer readVec %zu writeVec %zu %d", st->readVec, st->writeVec, st->cbEvents);
 
-    if (((st->incomingOffset || st->incomingEOF) &&
+    if ((((st->readVec < st->writeVec) || st->incomingEOF) &&
          (st->cbEvents & VIR_STREAM_EVENT_READABLE)) ||
         (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) {
         VIR_DEBUG("Enabling event timer");
@@ -110,13 +110,14 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
 
     if (st->cb &&
         (st->cbEvents & VIR_STREAM_EVENT_READABLE) &&
-        (st->incomingOffset || st->incomingEOF))
+        ((st->readVec < st->writeVec) || st->incomingEOF))
         events |= VIR_STREAM_EVENT_READABLE;
     if (st->cb &&
         (st->cbEvents & VIR_STREAM_EVENT_WRITABLE))
         events |= VIR_STREAM_EVENT_WRITABLE;
 
-    VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, st->incomingOffset);
+    VIR_DEBUG("Got Timer dispatch %d %d readVec %zu writeVec %zu", events, st->cbEvents,
+              st->readVec, st->writeVec);
     if (events) {
         virNetClientStreamEventCallback cb = st->cb;
         void *cbOpaque = st->cbOpaque;
@@ -161,7 +162,7 @@ void virNetClientStreamDispose(void *obj)
     virNetClientStreamPtr st = obj;
 
     virResetError(&st->err);
-    VIR_FREE(st->incoming);
+    VIR_FREE(st->incomingVec);
     virObjectUnref(st->prog);
 }
 
@@ -265,38 +266,50 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st,
                                   virNetMessagePtr msg)
 {
     int ret = -1;
-    size_t need;
+    struct iovec iov;
+    char *base;
+    size_t piece, pieces, length, offset = 0, size = 1024*1024;
 
     virObjectLock(st);
-    need = msg->bufferLength - msg->bufferOffset;
-    if (need) {
-        size_t avail = st->incomingLength - st->incomingOffset;
-        if (need > avail) {
-            size_t extra = need - avail;
-            if (VIR_REALLOC_N(st->incoming,
-                              st->incomingLength + extra) < 0) {
-                VIR_DEBUG("Out of memory handling stream data");
-                goto cleanup;
-            }
-            st->incomingLength += extra;
-        }
 
-        memcpy(st->incoming + st->incomingOffset,
-               msg->buffer + msg->bufferOffset,
-               msg->bufferLength - msg->bufferOffset);
-        st->incomingOffset += (msg->bufferLength - msg->bufferOffset);
-    } else {
+    length = msg->bufferLength - msg->bufferOffset;
+
+    if (length == 0) {
         st->incomingEOF = true;
+        goto end;
     }
 
-    VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d",
-              st->incomingOffset, st->incomingLength,
-              st->incomingEOF);
-    virNetClientStreamEventTimerUpdate(st);
+    pieces = (length + size - 1) / size;
+    for (piece = 0; piece < pieces; piece++) {
+        if (size > length - offset)
+            size = length - offset;
+
+        if (VIR_ALLOC_N(base, size)) {
+            VIR_DEBUG("Allocation failed");
+            goto cleanup;
+        }
+
+        memcpy(base, msg->buffer + msg->bufferOffset + offset, size);
+        iov.iov_base = base;
+        iov.iov_len = size;
+        offset += size;
 
+        if (VIR_APPEND_ELEMENT(st->incomingVec, st->writeVec, iov) < 0) {
+            VIR_DEBUG("Append failed");
+            VIR_FREE(base);
+            goto cleanup;
+        }
+        VIR_DEBUG("Wrote piece of vector. readVec %zu, writeVec %zu size %zu",
+                  st->readVec, st->writeVec, size);
+    }
+
+ end:
+    virNetClientStreamEventTimerUpdate(st);
     ret = 0;
 
  cleanup:
+    VIR_DEBUG("Stream incoming data readVec %zu writeVec %zu EOF %d",
+              st->readVec, st->writeVec, st->incomingEOF);
     virObjectUnlock(st);
     return ret;
 }
@@ -361,17 +374,21 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
                                  size_t nbytes,
                                  bool nonblock)
 {
-    int rv = -1;
+    int ret = -1;
+    size_t partial, offset;
+
+    virObjectLock(st);
+
     VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d",
               st, client, data, nbytes, nonblock);
-    virObjectLock(st);
-    if (!st->incomingOffset && !st->incomingEOF) {
+
+    if ((st->readVec >= st->writeVec) && !st->incomingEOF) {
         virNetMessagePtr msg;
-        int ret;
+        int rv;
 
         if (nonblock) {
             VIR_DEBUG("Non-blocking mode and no data available");
-            rv = -2;
+            ret = -2;
             goto cleanup;
         }
 
@@ -387,37 +404,66 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
 
         VIR_DEBUG("Dummy packet to wait for stream data");
         virObjectUnlock(st);
-        ret = virNetClientSendWithReplyStream(client, msg, st);
+        rv = virNetClientSendWithReplyStream(client, msg, st);
         virObjectLock(st);
         virNetMessageFree(msg);
 
-        if (ret < 0)
+        if (rv < 0)
             goto cleanup;
     }
 
-    VIR_DEBUG("After IO %zu", st->incomingOffset);
-    if (st->incomingOffset) {
-        int want = st->incomingOffset;
-        if (want > nbytes)
-            want = nbytes;
-        memcpy(data, st->incoming, want);
-        if (want < st->incomingOffset) {
-            memmove(st->incoming, st->incoming + want, st->incomingOffset - want);
-            st->incomingOffset -= want;
-        } else {
-            VIR_FREE(st->incoming);
-            st->incomingOffset = st->incomingLength = 0;
+    offset = 0;
+    partial = nbytes;
+
+    while (st->incomingVec && (st->readVec < st->writeVec)) {
+        struct iovec *iov = st->incomingVec + st->readVec;
+
+        if (!iov || !iov->iov_base) {
+            virReportError(VIR_ERR_INTERNAL_ERROR,
+                           "%s", _("NULL pointer encountered"));
+            goto cleanup;
         }
-        rv = want;
-    } else {
-        rv = 0;
+
+        if (partial < iov->iov_len) {
+            memcpy(data+offset, iov->iov_base, partial);
+            memmove(iov->iov_base, (char*)iov->iov_base+partial,
+                    iov->iov_len-partial);
+            iov->iov_len -= partial;
+            offset += partial;
+            VIR_DEBUG("Consumed %zu, left %zu", partial, iov->iov_len);
+            break;
+        }
+
+        memcpy(data+offset, iov->iov_base, iov->iov_len);
+        VIR_DEBUG("Consumed %zu. Moving to next piece", iov->iov_len);
+        partial -= iov->iov_len;
+        offset += iov->iov_len;
+        VIR_FREE(iov->iov_base);
+        iov->iov_len = 0;
+        st->readVec++;
+
+        VIR_DEBUG("Read piece of vector. read %zu, readVec %zu, writeVec %zu",
+                  offset, st->readVec, st->writeVec);
     }
 
+    /* Shrink the I/O Vector buffer to free up memory. Do the
+       shrinking only when there is selected amount or more buffers to
+       free so it doesn't constantly memmove() and realloc() buffers.
+     */
+    if (st->readVec >= 16) {
+        memmove(st->incomingVec, st->incomingVec + st->readVec,
+                sizeof(*st->incomingVec)*(st->writeVec - st->readVec));
+        VIR_SHRINK_N(st->incomingVec, st->writeVec, st->readVec);
+        VIR_DEBUG("shrink removed %zu, left %zu", st->readVec, st->writeVec);
+        st->readVec = 0;
+    }
+
+    ret = offset;
     virNetClientStreamEventTimerUpdate(st);
 
  cleanup:
     virObjectUnlock(st);
-    return rv;
+    return ret;
 }