--- /dev/null
+/*
+ * esx_stream.c: libcurl based stream driver
+ *
+ * Copyright (C) 2012-2014 Matthias Bolte <matthias.bolte@googlemail.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library. If not, see
+ * <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#include <config.h>
+
+#include "internal.h"
+#include "datatypes.h"
+#include "viralloc.h"
+#include "virstring.h"
+#include "esx_stream.h"
+
+#define VIR_FROM_THIS VIR_FROM_ESX
+
+/*
+ * This libcurl based stream driver cannot use a libcurl easy handle alone
+ * because curl_easy_perform would do the whole transfer before it returns.
+ * But there is no place in the stream handling concept that would allow for
+ * such a call to be made. The stream is driven by esxStream(Send|Recv) which
+ * is probably called multiple times to send/receive the stream in chunks.
+ * Therefore, a libcurl multi handle is used that allows to perform the data
+ * transfer in chunks and also allows to support non-blocking operations.
+ *
+ * In the upload direction esxStreamSend is called to push data into the
+ * stream and libcurl will call esxVI_CURL_ReadStream to pull data out of
+ * the stream to upload it via HTTP(S). To realize this esxStreamSend calls
+ * esxStreamTransfer that uses esxVI_MultiCURL_(Wait|Perform) to drive the
+ * transfer and makes libcurl read up the data passed to esxStreamSend.
+ *
+ * In the download direction esxStreamRecv is called to pull data out of the
+ * stream and libcurl will call esxVI_CURL_WriteStream to push data into the
+ * stream that it has downloaded via HTTP(S). To realize this esxStreamRecv
+ * calls esxStreamTransfer that uses esxVI_MultiCURL_(Wait|Perform) to drive
+ * the transfer and makes libcurl write to the buffer passed to esxStreamRecv.
+ *
+ * The download direction requires some extra logic because libcurl might
+ * call esxVI_CURL_WriteStream with more data than there is space left in the
+ * buffer passed to esxStreamRecv. But esxVI_CURL_WriteStream is not allowed
+ * to handle only a part of the incoming data, it needs to handle it all at
+ * once. Therefore the stream driver manages a backlog buffer that holds the
+ * extra data that didn't fit into the esxStreamRecv buffer anymore. The next
+ * time esxStreamRecv is called it'll read the data from the backlog buffer
+ * first before asking libcurl for more data.
+ *
+ * Typically libcurl will call esxVI_CURL_WriteStream with up to 16kb data
+ * this means that the typically maximum backlog size should be 16kb as well.
+ */
+
+enum _esxStreamMode {
+ ESX_STREAM_MODE_UPLOAD = 1,
+ ESX_STREAM_MODE_DOWNLOAD = 2
+};
+
+typedef struct _esxStreamPrivate esxStreamPrivate;
+typedef enum _esxStreamMode esxStreamMode;
+
+struct _esxStreamPrivate {
+ esxVI_CURL *curl;
+ int mode;
+
+ /* Backlog of downloaded data that has not been esxStreamRecv'ed yet */
+ char *backlog;
+ size_t backlog_size;
+ size_t backlog_used;
+
+ /* Buffer given to esxStream(Send|Recv) to (read|write) data (from|to) */
+ char *buffer;
+ size_t buffer_size;
+ size_t buffer_used;
+};
+
+static size_t
+esxVI_CURL_ReadStream(char *output, size_t size, size_t nmemb, void *userdata)
+{
+ esxStreamPrivate *priv = userdata;
+ size_t output_size = size * nmemb;
+ size_t output_used = 0;
+
+ if (output_size > priv->buffer_used)
+ output_used = priv->buffer_used;
+ else
+ output_used = output_size;
+
+ memcpy(output, priv->buffer + priv->buffer_size - priv->buffer_used,
+ output_used);
+
+ priv->buffer_used -= output_used;
+
+ return output_used;
+}
+
+static size_t
+esxVI_CURL_WriteStream(char *input, size_t size, size_t nmemb, void *userdata)
+{
+ esxStreamPrivate *priv = userdata;
+ size_t input_size = size * nmemb;
+ size_t input_used = priv->buffer_size - priv->buffer_used;
+
+ if (input_size == 0)
+ return input_size;
+
+ if (input_used > input_size)
+ input_used = input_size;
+
+ /* Fill buffer */
+ memcpy(priv->buffer + priv->buffer_used, input, input_used);
+ priv->buffer_used += input_used;
+
+ /* Move rest to backlog */
+ if (input_size > input_used) {
+ size_t input_remaining = input_size - input_used;
+ size_t backlog_remaining = priv->backlog_size - priv->backlog_used;
+
+ if (!priv->backlog) {
+ priv->backlog_size = input_remaining;
+ priv->backlog_used = 0;
+
+ if (VIR_ALLOC_N(priv->backlog, priv->backlog_size) < 0)
+ return 0;
+ } else if (input_remaining > backlog_remaining) {
+ priv->backlog_size += input_remaining - backlog_remaining;
+
+ if (VIR_REALLOC_N(priv->backlog, priv->backlog_size) < 0)
+ return 0;
+ }
+
+ memcpy(priv->backlog + priv->backlog_used, input + input_used,
+ input_remaining);
+
+ priv->backlog_used += input_remaining;
+ }
+
+ return input_size;
+}
+
+/* Returns -1 on error, 0 if it needs to be called again, and 1 if it's done for now */
+static int
+esxStreamTransfer(esxStreamPrivate *priv, bool blocking)
+{
+ int runningHandles = 0;
+ long responseCode = 0;
+ int status;
+ CURLcode errorCode;
+
+ if (blocking) {
+ if (esxVI_MultiCURL_Wait(priv->curl->multi, &runningHandles) < 0)
+ return -1;
+ } else {
+ if (esxVI_MultiCURL_Perform(priv->curl->multi, &runningHandles) < 0)
+ return -1;
+ }
+
+ if (runningHandles == 0) {
+ /* Transfer is done check for result */
+ status = esxVI_MultiCURL_CheckFirstMessage(priv->curl->multi,
+ &responseCode, &errorCode);
+
+ if (status == 0) {
+ /* No message, transfer finished successfully */
+ return 1;
+ }
+
+ if (status < 0) {
+ virReportError(VIR_ERR_INTERNAL_ERROR,
+ _("Could not complete transfer: %s (%d)"),
+ curl_easy_strerror(errorCode), errorCode);
+ return -1;
+ }
+
+ if (responseCode != 200 && responseCode != 206) {
+ virReportError(VIR_ERR_INTERNAL_ERROR,
+ _("Unexpected HTTP response code %lu"),
+ responseCode);
+ return -1;
+ }
+
+ return 1;
+ }
+
+ return blocking ? 0 : 1;
+}
+
+static int
+esxStreamSend(virStreamPtr stream, const char *data, size_t nbytes)
+{
+ int result = -1;
+ esxStreamPrivate *priv = stream->privateData;
+ int status;
+
+ if (nbytes == 0)
+ return 0;
+
+ if (!priv) {
+ virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("Stream is not open"));
+ return -1;
+ }
+
+ if (priv->mode != ESX_STREAM_MODE_UPLOAD) {
+ virReportError(VIR_ERR_INVALID_ARG, "%s", _("Not an upload stream"));
+ return -1;
+ }
+
+ virMutexLock(&priv->curl->lock);
+
+ priv->buffer = (char *)data;
+ priv->buffer_size = nbytes;
+ priv->buffer_used = nbytes;
+
+ if (stream->flags & VIR_STREAM_NONBLOCK) {
+ if (esxStreamTransfer(priv, false) < 0)
+ goto cleanup;
+
+ if (priv->buffer_used < priv->buffer_size)
+ result = priv->buffer_size - priv->buffer_used;
+ else
+ result = -2;
+ } else /* blocking */ {
+ do {
+ status = esxStreamTransfer(priv, true);
+
+ if (status < 0)
+ goto cleanup;
+
+ if (status > 0)
+ break;
+ } while (priv->buffer_used > 0);
+
+ result = priv->buffer_size - priv->buffer_used;
+ }
+
+ cleanup:
+ virMutexUnlock(&priv->curl->lock);
+
+ return result;
+}
+
+static int
+esxStreamRecv(virStreamPtr stream, char *data, size_t nbytes)
+{
+ int result = -1;
+ esxStreamPrivate *priv = stream->privateData;
+ int status;
+
+ if (nbytes == 0)
+ return 0;
+
+ if (!priv) {
+ virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("Stream is not open"));
+ return -1;
+ }
+
+ if (priv->mode != ESX_STREAM_MODE_DOWNLOAD) {
+ virReportError(VIR_ERR_INVALID_ARG, "%s", _("Not a download stream"));
+ return -1;
+ }
+
+ virMutexLock(&priv->curl->lock);
+
+ priv->buffer = data;
+ priv->buffer_size = nbytes;
+ priv->buffer_used = 0;
+
+ if (priv->backlog_used > 0) {
+ if (priv->buffer_size > priv->backlog_used)
+ priv->buffer_used = priv->backlog_used;
+ else
+ priv->buffer_used = priv->buffer_size;
+
+ memcpy(priv->buffer, priv->backlog, priv->buffer_used);
+ memmove(priv->backlog, priv->backlog + priv->buffer_used,
+ priv->backlog_used - priv->buffer_used);
+
+ priv->backlog_used -= priv->buffer_used;
+ result = priv->buffer_used;
+ } else if (stream->flags & VIR_STREAM_NONBLOCK) {
+ if (esxStreamTransfer(priv, false) < 0)
+ goto cleanup;
+
+ if (priv->buffer_used > 0)
+ result = priv->buffer_used;
+ else
+ result = -2;
+ } else /* blocking */ {
+ do {
+ status = esxStreamTransfer(priv, true);
+
+ if (status < 0)
+ goto cleanup;
+
+ if (status > 0)
+ break;
+ } while (priv->buffer_used < priv->buffer_size);
+
+ result = priv->buffer_used;
+ }
+
+ cleanup:
+ virMutexUnlock(&priv->curl->lock);
+
+ return result;
+}
+
+static void
+esxFreeStreamPrivate(esxStreamPrivate **priv)
+{
+ if (!priv || !*priv)
+ return;
+
+ esxVI_CURL_Free(&(*priv)->curl);
+ VIR_FREE((*priv)->backlog);
+ VIR_FREE(*priv);
+}
+
+static int
+esxStreamClose(virStreamPtr stream, bool finish)
+{
+ int result = 0;
+ esxStreamPrivate *priv = stream->privateData;
+
+ if (!priv)
+ return 0;
+
+ virMutexLock(&priv->curl->lock);
+
+ if (finish && priv->backlog_used > 0) {
+ virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("Stream has untransferred data left"));
+ result = -1;
+ }
+
+ stream->privateData = NULL;
+
+ virMutexUnlock(&priv->curl->lock);
+
+ esxFreeStreamPrivate(&priv);
+
+ return result;
+}
+
+static int
+esxStreamFinish(virStreamPtr stream)
+{
+ return esxStreamClose(stream, true);
+}
+
+static int
+esxStreamAbort(virStreamPtr stream)
+{
+ return esxStreamClose(stream, false);
+}
+
+virStreamDriver esxStreamDriver = {
+ .streamSend = esxStreamSend,
+ .streamRecv = esxStreamRecv,
+ /* FIXME: streamAddCallback missing */
+ /* FIXME: streamUpdateCallback missing */
+ /* FIXME: streamRemoveCallback missing */
+ .streamFinish = esxStreamFinish,
+ .streamAbort = esxStreamAbort,
+};
+
+static int
+esxStreamOpen(virStreamPtr stream, esxPrivate *priv, const char *url,
+ unsigned long long offset, unsigned long long length, int mode)
+{
+ int result = -1;
+ esxStreamPrivate *streamPriv;
+ char *range = NULL;
+ char *userpwd = NULL;
+ esxVI_MultiCURL *multi = NULL;
+
+ /* FIXME: Although there is already some code in place to deal with
+ * non-blocking streams it is currently incomplete, so usage
+ * of the non-blocking mode is denied here for now. */
+ if (stream->flags & VIR_STREAM_NONBLOCK) {
+ virReportError(VIR_ERR_OPERATION_INVALID, "%s",
+ _("Non-blocking streams are not supported yet"));
+ return -1;
+ }
+
+ if (VIR_ALLOC(streamPriv) < 0)
+ return -1;
+
+ streamPriv->mode = mode;
+
+ if (length > 0) {
+ if (virAsprintf(&range, "%llu-%llu", offset, offset + length - 1) < 0)
+ goto cleanup;
+ } else if (offset > 0) {
+ if (virAsprintf(&range, "%llu-", offset) < 0)
+ goto cleanup;
+ }
+
+ if (esxVI_CURL_Alloc(&streamPriv->curl) < 0 ||
+ esxVI_CURL_Connect(streamPriv->curl, priv->parsedUri) < 0)
+ goto cleanup;
+
+ if (mode == ESX_STREAM_MODE_UPLOAD) {
+ curl_easy_setopt(streamPriv->curl->handle, CURLOPT_UPLOAD, 1);
+ curl_easy_setopt(streamPriv->curl->handle, CURLOPT_READFUNCTION,
+ esxVI_CURL_ReadStream);
+ curl_easy_setopt(streamPriv->curl->handle, CURLOPT_READDATA, streamPriv);
+ } else {
+ curl_easy_setopt(streamPriv->curl->handle, CURLOPT_UPLOAD, 0);
+ curl_easy_setopt(streamPriv->curl->handle, CURLOPT_HTTPGET, 1);
+ curl_easy_setopt(streamPriv->curl->handle, CURLOPT_WRITEFUNCTION,
+ esxVI_CURL_WriteStream);
+ curl_easy_setopt(streamPriv->curl->handle, CURLOPT_WRITEDATA, streamPriv);
+ }
+
+ curl_easy_setopt(streamPriv->curl->handle, CURLOPT_URL, url);
+ curl_easy_setopt(streamPriv->curl->handle, CURLOPT_RANGE, range);
+
+#if LIBCURL_VERSION_NUM >= 0x071301 /* 7.19.1 */
+ curl_easy_setopt(streamPriv->curl->handle, CURLOPT_USERNAME,
+ priv->primary->username);
+ curl_easy_setopt(streamPriv->curl->handle, CURLOPT_PASSWORD,
+ priv->primary->password);
+#else
+ if (virAsprintf(&userpwd, "%s:%s", priv->primary->username,
+ priv->primary->password) < 0)
+ goto cleanup;
+
+ curl_easy_setopt(streamPriv->curl->handle, CURLOPT_USERPWD, userpwd);
+#endif
+
+ if (esxVI_MultiCURL_Alloc(&multi) < 0 ||
+ esxVI_MultiCURL_Add(multi, streamPriv->curl) < 0)
+ goto cleanup;
+
+ stream->driver = &esxStreamDriver;
+ stream->privateData = streamPriv;
+
+ result = 0;
+
+ cleanup:
+ if (result < 0) {
+ if (streamPriv->curl && multi != streamPriv->curl->multi)
+ esxVI_MultiCURL_Free(&multi);
+
+ esxFreeStreamPrivate(&streamPriv);
+ }
+
+ VIR_FREE(range);
+ VIR_FREE(userpwd);
+
+ return result;
+}
+
+int
+esxStreamOpenUpload(virStreamPtr stream, esxPrivate *priv, const char *url)
+{
+ return esxStreamOpen(stream, priv, url, 0, 0, ESX_STREAM_MODE_UPLOAD);
+}
+
+int
+esxStreamOpenDownload(virStreamPtr stream, esxPrivate *priv, const char *url,
+ unsigned long long offset, unsigned long long length)
+{
+ return esxStreamOpen(stream, priv, url, offset, length, ESX_STREAM_MODE_DOWNLOAD);
+}
* esx_vi.c: client for the VMware VI API 2.5 to manage ESX hosts
*
* Copyright (C) 2010-2012 Red Hat, Inc.
- * Copyright (C) 2009-2012 Matthias Bolte <matthias.bolte@googlemail.com>
+ * Copyright (C) 2009-2012, 2014 Matthias Bolte <matthias.bolte@googlemail.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
#include <config.h>
+#include <poll.h>
#include <libxml/parser.h>
#include <libxml/xpathInternals.h>
* MultiCURL
*/
+#if ESX_EMULATE_CURL_MULTI_WAIT
+
+static int
+esxVI_MultiCURL_SocketCallback(CURL *handle ATTRIBUTE_UNUSED,
+ curl_socket_t fd, int action,
+ void *callback_opaque,
+ void *socket_opaque ATTRIBUTE_UNUSED)
+{
+ esxVI_MultiCURL *multi = callback_opaque;
+ size_t i;
+ struct pollfd *pollfd = NULL;
+ struct pollfd dummy;
+
+ if (action & CURL_POLL_REMOVE) {
+ for (i = 0; i < multi->npollfds; ++i) {
+ if (multi->pollfds[i].fd == fd) {
+ VIR_DELETE_ELEMENT(multi->pollfds, i, multi->npollfds);
+ break;
+ }
+ }
+ } else {
+ for (i = 0; i < multi->npollfds; ++i) {
+ if (multi->pollfds[i].fd == fd) {
+ pollfd = &multi->pollfds[i];
+ break;
+ }
+ }
+
+ if (pollfd == NULL) {
+ if (VIR_APPEND_ELEMENT(multi->pollfds, multi->npollfds, dummy) < 0)
+ return 0; /* curl_multi_socket() doc says "The callback MUST return 0." */
+
+ pollfd = &multi->pollfds[multi->npollfds - 1];
+ }
+
+ pollfd->fd = fd;
+ pollfd->events = 0;
+
+ if (action & CURL_POLL_IN)
+ pollfd->events |= POLLIN;
+
+ if (action & CURL_POLL_OUT)
+ pollfd->events |= POLLOUT;
+ }
+
+ return 0;
+}
+
+static int
+esxVI_MultiCURL_TimerCallback(CURLM *handle ATTRIBUTE_UNUSED,
+ long timeout_ms, void *callback_opaque)
+{
+ esxVI_MultiCURL *multi = callback_opaque;
+
+ multi->timeoutPending = true;
+
+ return 0;
+}
+
+#endif
+
/* esxVI_MultiCURL_Alloc */
ESX_VI__TEMPLATE__ALLOC(MultiCURL)
if (item->handle) {
curl_multi_cleanup(item->handle);
}
+
+#if ESX_EMULATE_CURL_MULTI_WAIT
+ VIR_FREE(item->pollfds);
+#endif
})
int
_("Could not initialize CURL (multi)"));
return -1;
}
+
+#if ESX_EMULATE_CURL_MULTI_WAIT
+ curl_multi_setopt(multi->handle, CURLMOPT_SOCKETFUNCTION,
+ esxVI_MultiCURL_SocketCallback);
+ curl_multi_setopt(multi->handle, CURLMOPT_SOCKETDATA, multi);
+ curl_multi_setopt(multi->handle, CURLMOPT_TIMERFUNCTION,
+ esxVI_MultiCURL_TimerCallback);
+ curl_multi_setopt(multi->handle, CURLMOPT_TIMERDATA, multi);
+#endif
}
virMutexLock(&curl->lock);
return 0;
}
+#if ESX_EMULATE_CURL_MULTI_WAIT
+
+int
+esxVI_MultiCURL_Wait(esxVI_MultiCURL *multi, int *runningHandles)
+{
+ long timeout = -1;
+ CURLMcode errorCode;
+ int rc;
+ size_t i;
+ int action;
+
+ if (multi->timeoutPending) {
+ do {
+ errorCode = curl_multi_socket_action(multi->handle, CURL_SOCKET_TIMEOUT,
+ 0, runningHandles);
+ } while (errorCode == CURLM_CALL_MULTI_SOCKET);
+
+ if (errorCode != CURLM_OK) {
+ virReportError(VIR_ERR_INTERNAL_ERROR,
+ _("Could not trigger socket action: %s (%d)"),
+ curl_multi_strerror(errorCode), errorCode);
+ return -1;
+ }
+
+ multi->timeoutPending = false;
+ }
+
+ if (multi->npollfds == 0)
+ return 0;
+
+ curl_multi_timeout(multi->handle, &timeout);
+
+ if (timeout < 0) {
+ timeout = 1000; /* default to 1 sec timeout */
+ }
+
+ do {
+ rc = poll(multi->pollfds, multi->npollfds, timeout);
+ } while (rc < 0 && (errno == EAGAIN || errno == EINTR));
+
+ if (rc < 0) {
+ virReportSystemError(errno, "%s", _("Could not wait for transfer"));
+ return -1;
+ }
+
+ for (i = 0; i < multi->npollfds && rc > 0; ++i) {
+ if (multi->pollfds[i].revents == 0)
+ continue;
+
+ --rc;
+ action = 0;
+
+ if (multi->pollfds[i].revents & POLLIN)
+ action |= CURL_POLL_IN;
+
+ if (multi->pollfds[i].revents & POLLOUT)
+ action |= CURL_POLL_OUT;
+
+ do {
+ errorCode = curl_multi_socket_action(multi->handle,
+ multi->pollfds[i].fd, action,
+ runningHandles);
+ } while (errorCode == CURLM_CALL_MULTI_SOCKET);
+
+ if (errorCode != CURLM_OK) {
+ virReportError(VIR_ERR_INTERNAL_ERROR,
+ _("Could not trigger socket action: %s (%d)"),
+ curl_multi_strerror(errorCode), errorCode);
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+#else
+
+int
+esxVI_MultiCURL_Wait(esxVI_MultiCURL *multi, int *runningHandles)
+{
+ long timeout = -1;
+ CURLMcode errorCode;
+
+ curl_multi_timeout(multi->handle, &timeout);
+
+ if (timeout < 0)
+ timeout = 1000; /* default to 1 sec timeout */
+
+ errorCode = curl_multi_wait(multi->handle, NULL, 0, timeout, NULL);
+
+ if (errorCode != CURLM_OK) {
+ virReportError(VIR_ERR_INTERNAL_ERROR,
+ _("Could not wait for transfer: %s (%d)"),
+ curl_multi_strerror(errorCode), errorCode);
+ return -1;
+ }
+
+ return esxVI_MultiCURL_Perform(multi, runningHandles);
+}
+
+#endif
+
+int
+esxVI_MultiCURL_Perform(esxVI_MultiCURL *multi, int *runningHandles)
+{
+ CURLMcode errorCode;
+
+ do {
+ errorCode = curl_multi_perform(multi->handle, runningHandles);
+ } while (errorCode == CURLM_CALL_MULTI_PERFORM);
+
+ if (errorCode != CURLM_OK) {
+ virReportError(VIR_ERR_INTERNAL_ERROR,
+ _("Could not transfer data: %s (%d)"),
+ curl_multi_strerror(errorCode), errorCode);
+ return -1;
+ }
+
+ return 0;
+}
+
+/* Returns -1 on error, 0 if there is no DONE message, 1 if there is a DONE message */
+int
+esxVI_MultiCURL_CheckFirstMessage(esxVI_MultiCURL *multi, long *responseCode,
+ CURLcode *errorCode)
+{
+ int messagesInQueue;
+ CURLMsg* msg = curl_multi_info_read(multi->handle, &messagesInQueue);
+
+ *responseCode = 0;
+
+ if (!msg || msg->msg != CURLMSG_DONE)
+ return 0;
+
+ *errorCode = msg->data.result;
+
+ if (*errorCode != CURLE_OK)
+ return -1;
+
+ curl_easy_getinfo(msg->easy_handle, CURLINFO_RESPONSE_CODE, responseCode);
+
+ return 1;
+}
+
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *