--- /dev/null
+/*
+ * fdstream.h: generic streams impl for file descriptors
+ *
+ * Copyright (C) 2009-2010 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ */
+
+#include <config.h>
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <sys/socket.h>
+#if HAVE_SYS_UN_H
+# include <sys/un.h>
+#endif
+#include <netinet/in.h>
+
+#include "fdstream.h"
+#include "virterror_internal.h"
+#include "datatypes.h"
+#include "memory.h"
+#include "event.h"
+#include "util.h"
+#include "files.h"
+
+#define VIR_FROM_THIS VIR_FROM_STREAMS
+#define streamsReportError(code, ...) \
+ virReportErrorHelper(NULL, VIR_FROM_THIS, code, __FILE__, \
+ __FUNCTION__, __LINE__, __VA_ARGS__)
+
+/* Tunnelled migration stream support */
+struct virFDStreamData {
+ int fd;
+
+ int watch;
+ unsigned int cbRemoved;
+ unsigned int dispatching;
+ virStreamEventCallback cb;
+ void *opaque;
+ virFreeCallback ff;
+
+ virMutex lock;
+};
+
+static int virFDStreamRemoveCallback(virStreamPtr stream)
+{
+ struct virFDStreamData *fdst = stream->privateData;
+ int ret = -1;
+
+ if (!fdst) {
+ streamsReportError(VIR_ERR_INTERNAL_ERROR,
+ "%s", _("stream is not open"));
+ return -1;
+ }
+
+ virMutexLock(&fdst->lock);
+ if (fdst->watch == 0) {
+ streamsReportError(VIR_ERR_INTERNAL_ERROR,
+ "%s", _("stream does not have a callback registered"));
+ goto cleanup;
+ }
+
+ virEventRemoveHandle(fdst->watch);
+ if (fdst->dispatching)
+ fdst->cbRemoved = 1;
+ else if (fdst->ff)
+ (fdst->ff)(fdst->opaque);
+
+ fdst->watch = 0;
+ fdst->ff = NULL;
+ fdst->cb = NULL;
+ fdst->opaque = NULL;
+
+ ret = 0;
+
+cleanup:
+ virMutexUnlock(&fdst->lock);
+ return ret;
+}
+
+static int virFDStreamUpdateCallback(virStreamPtr stream, int events)
+{
+ struct virFDStreamData *fdst = stream->privateData;
+ int ret = -1;
+
+ if (!fdst) {
+ streamsReportError(VIR_ERR_INTERNAL_ERROR,
+ "%s", _("stream is not open"));
+ return -1;
+ }
+
+ virMutexLock(&fdst->lock);
+ if (fdst->watch == 0) {
+ streamsReportError(VIR_ERR_INTERNAL_ERROR,
+ "%s", _("stream does not have a callback registered"));
+ goto cleanup;
+ }
+
+ virEventUpdateHandle(fdst->watch, events);
+
+ ret = 0;
+
+cleanup:
+ virMutexUnlock(&fdst->lock);
+ return ret;
+}
+
+static void virFDStreamEvent(int watch ATTRIBUTE_UNUSED,
+ int fd ATTRIBUTE_UNUSED,
+ int events,
+ void *opaque)
+{
+ virStreamPtr stream = opaque;
+ struct virFDStreamData *fdst = stream->privateData;
+ virStreamEventCallback cb;
+ void *cbopaque;
+ virFreeCallback ff;
+
+ if (!fdst)
+ return;
+
+ virMutexLock(&fdst->lock);
+ if (!fdst->cb) {
+ virMutexUnlock(&fdst->lock);
+ return;
+ }
+
+ cb = fdst->cb;
+ cbopaque = fdst->opaque;
+ ff = fdst->ff;
+ fdst->dispatching = 1;
+ virMutexUnlock(&fdst->lock);
+
+ cb(stream, events, cbopaque);
+
+ virMutexLock(&fdst->lock);
+ fdst->dispatching = 0;
+ if (fdst->cbRemoved && ff)
+ (ff)(cbopaque);
+ virMutexUnlock(&fdst->lock);
+}
+
+static int
+virFDStreamAddCallback(virStreamPtr st,
+ int events,
+ virStreamEventCallback cb,
+ void *opaque,
+ virFreeCallback ff)
+{
+ struct virFDStreamData *fdst = st->privateData;
+ int ret = -1;
+
+ if (!fdst) {
+ streamsReportError(VIR_ERR_INTERNAL_ERROR,
+ "%s", _("stream is not open"));
+ return -1;
+ }
+
+ virMutexLock(&fdst->lock);
+ if (fdst->watch != 0) {
+ streamsReportError(VIR_ERR_INTERNAL_ERROR,
+ "%s", _("stream already has a callback registered"));
+ goto cleanup;
+ }
+
+ if ((fdst->watch = virEventAddHandle(fdst->fd,
+ events,
+ virFDStreamEvent,
+ st,
+ NULL)) < 0) {
+ streamsReportError(VIR_ERR_INTERNAL_ERROR,
+ "%s", _("cannot register file watch on stream"));
+ goto cleanup;
+ }
+
+ fdst->cbRemoved = 0;
+ fdst->cb = cb;
+ fdst->opaque = opaque;
+ fdst->ff = ff;
+ virStreamRef(st);
+
+ ret = 0;
+
+cleanup:
+ virMutexUnlock(&fdst->lock);
+ return ret;
+}
+
+static int virFDStreamFree(struct virFDStreamData *fdst)
+{
+ int ret;
+ ret = VIR_CLOSE(fdst->fd);
+ VIR_FREE(fdst);
+ return ret;
+}
+
+
+static int
+virFDStreamClose(virStreamPtr st)
+{
+ struct virFDStreamData *fdst = st->privateData;
+ int ret;
+
+ if (!fdst)
+ return 0;
+
+ virMutexLock(&fdst->lock);
+
+ ret = virFDStreamFree(fdst);
+
+ st->privateData = NULL;
+
+ virMutexUnlock(&fdst->lock);
+
+ return ret;
+}
+
+static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes)
+{
+ struct virFDStreamData *fdst = st->privateData;
+ int ret;
+
+ if (nbytes > INT_MAX) {
+ virReportSystemError(ERANGE, "%s",
+ _("Too many bytes to write to stream"));
+ return -1;
+ }
+
+ if (!fdst) {
+ streamsReportError(VIR_ERR_INTERNAL_ERROR,
+ "%s", _("stream is not open"));
+ return -1;
+ }
+
+ virMutexLock(&fdst->lock);
+
+retry:
+ ret = write(fdst->fd, bytes, nbytes);
+ if (ret < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ ret = -2;
+ } else if (errno == EINTR) {
+ goto retry;
+ } else {
+ ret = -1;
+ virReportSystemError(errno, "%s",
+ _("cannot write to stream"));
+ }
+ }
+
+ virMutexUnlock(&fdst->lock);
+ return ret;
+}
+
+
+static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes)
+{
+ struct virFDStreamData *fdst = st->privateData;
+ int ret;
+
+ if (nbytes > INT_MAX) {
+ virReportSystemError(ERANGE, "%s",
+ _("Too many bytes to read from stream"));
+ return -1;
+ }
+
+ if (!fdst) {
+ streamsReportError(VIR_ERR_INTERNAL_ERROR,
+ "%s", _("stream is not open"));
+ return -1;
+ }
+
+ virMutexLock(&fdst->lock);
+
+retry:
+ ret = read(fdst->fd, bytes, nbytes);
+ if (ret < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ ret = -2;
+ } else if (errno == EINTR) {
+ goto retry;
+ } else {
+ ret = -1;
+ virReportSystemError(errno, "%s",
+ _("cannot read from stream"));
+ }
+ }
+
+ virMutexUnlock(&fdst->lock);
+ return ret;
+}
+
+
+static virStreamDriver virFDStreamDrv = {
+ .streamSend = virFDStreamWrite,
+ .streamRecv = virFDStreamRead,
+ .streamFinish = virFDStreamClose,
+ .streamAbort = virFDStreamClose,
+ .streamAddCallback = virFDStreamAddCallback,
+ .streamUpdateCallback = virFDStreamUpdateCallback,
+ .streamRemoveCallback = virFDStreamRemoveCallback
+};
+
+int virFDStreamOpen(virStreamPtr st,
+ int fd)
+{
+ struct virFDStreamData *fdst;
+
+ if ((st->flags & VIR_STREAM_NONBLOCK) &&
+ virSetNonBlock(fd) < 0)
+ return -1;
+
+ if (VIR_ALLOC(fdst) < 0) {
+ virReportOOMError();
+ return -1;
+ }
+
+ fdst->fd = fd;
+ if (virMutexInit(&fdst->lock) < 0) {
+ VIR_FREE(fdst);
+ streamsReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("Unable to initialize mutex"));
+ return -1;
+ }
+
+ st->driver = &virFDStreamDrv;
+ st->privateData = fdst;
+
+ return 0;
+}
+
+
+#if HAVE_SYS_UN_H
+int virFDStreamConnectUNIX(virStreamPtr st,
+ const char *path,
+ bool abstract)
+{
+ struct sockaddr_un sa;
+ int i = 0;
+ int timeout = 3;
+ int ret;
+
+ int fd = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (fd < 0) {
+ virReportSystemError(errno, "%s", _("Unable to open UNIX socket"));
+ goto error;
+ }
+
+ memset(&sa, 0, sizeof(sa));
+ sa.sun_family = AF_UNIX;
+ if (abstract) {
+ if (virStrcpy(sa.sun_path+1, path, sizeof(sa.sun_path)-1) == NULL)
+ goto error;
+ sa.sun_path[0] = '\0';
+ } else {
+ if (virStrcpy(sa.sun_path, path, sizeof(sa.sun_path)) == NULL)
+ goto error;
+ }
+
+ do {
+ ret = connect(fd, (struct sockaddr *)&sa, sizeof(sa));
+ if (ret == 0)
+ break;
+
+ if (errno == ENOENT || errno == ECONNREFUSED) {
+ /* ENOENT : Socket may not have shown up yet
+ * ECONNREFUSED : Leftover socket hasn't been removed yet */
+ continue;
+ }
+
+ goto error;
+ } while ((++i <= timeout*5) && (usleep(.2 * 1000000) <= 0));
+
+ if (virFDStreamOpen(st, fd) < 0)
+ goto error;
+ return 0;
+
+error:
+ VIR_FORCE_CLOSE(fd);
+ return -1;
+}
+#else
+int virFDStreamConnectUNIX(virStreamPtr st ATTRIBUTE_UNUSED,
+ const char *path ATTRIBUTE_UNUSED,
+ bool abstract ATTRIBUTE_UNUSED)
+{
+ virReportSystemError(ENOSYS, "%s",
+ _("UNIX domain sockets are not supported on this platform"));
+ return -1;
+}
+#endif
+
+int virFDStreamOpenFile(virStreamPtr st,
+ const char *path,
+ int flags)
+{
+ int fd;
+ struct stat sb;
+
+ if (flags & O_CREAT) {
+ streamsReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("Unexpected O_CREAT flag when opening existing file"));
+ }
+
+ if ((fd = open(path, flags)) < 0) {
+ virReportSystemError(errno,
+ _("Unable to open stream for '%s'"),
+ path);
+ return -1;
+ }
+
+ if (fstat(fd, &sb) < 0) {
+ virReportSystemError(errno,
+ _("Unable to access stream for '%s'"),
+ path);
+ goto error;
+ }
+
+ /* Thanks to the POSIX i/o model, we can't reliably get
+ * non-blocking I/O on block devs/regular files. To
+ * support those we need to fork a helper process todo
+ * the I/O so we just have a fifo. Or use AIO :-(
+ */
+ if ((st->flags & VIR_STREAM_NONBLOCK) &&
+ (!S_ISCHR(sb.st_mode) &&
+ !S_ISFIFO(sb.st_mode))) {
+ streamsReportError(VIR_ERR_INTERNAL_ERROR,
+ _("Non-blocking I/O is not supported on %s"),
+ path);
+ goto error;
+ }
+
+ if (virFDStreamOpen(st, fd) < 0)
+ goto error;
+
+ return 0;
+
+error:
+ close(fd);
+ return -1;
+}
+
+int virFDStreamCreateFile(virStreamPtr st,
+ const char *path,
+ int flags,
+ mode_t mode)
+{
+ int fd = open(path, flags, mode);
+ struct stat sb;
+
+ if (fd < 0) {
+ virReportSystemError(errno,
+ _("Unable to open stream for '%s'"),
+ path);
+ return -1;
+ }
+
+ if (fstat(fd, &sb) < 0) {
+ virReportSystemError(errno,
+ _("Unable to access stream for '%s'"),
+ path);
+ goto error;
+ }
+
+ /* Thanks to the POSIX i/o model, we can't reliably get
+ * non-blocking I/O on block devs/regular files. To
+ * support those we need to fork a helper process todo
+ * the I/O so we just have a fifo. Or use AIO :-(
+ */
+ if ((st->flags & VIR_STREAM_NONBLOCK) &&
+ (!S_ISCHR(sb.st_mode) &&
+ !S_ISFIFO(sb.st_mode))) {
+ streamsReportError(VIR_ERR_INTERNAL_ERROR,
+ _("Non-blocking I/O is not supported on %s"),
+ path);
+ goto error;
+ }
+
+ if (virFDStreamOpen(st, fd) < 0)
+ goto error;
+
+ return 0;
+
+error:
+ close(fd);
+ return -1;
+}
#include "storage_file.h"
#include "virtaudit.h"
#include "files.h"
+#include "fdstream.h"
#define VIR_FROM_THIS VIR_FROM_QEMU
return true;
}
-
-/* Tunnelled migration stream support */
-struct qemuStreamMigFile {
- int fd;
-
- int watch;
- unsigned int cbRemoved;
- unsigned int dispatching;
- virStreamEventCallback cb;
- void *opaque;
- virFreeCallback ff;
-};
-
-static int qemuStreamMigRemoveCallback(virStreamPtr stream)
-{
- struct qemud_driver *driver = stream->conn->privateData;
- struct qemuStreamMigFile *qemust = stream->privateData;
- int ret = -1;
-
- if (!qemust) {
- qemuReportError(VIR_ERR_INTERNAL_ERROR,
- "%s", _("stream is not open"));
- return -1;
- }
-
- qemuDriverLock(driver);
- if (qemust->watch == 0) {
- qemuReportError(VIR_ERR_INTERNAL_ERROR,
- "%s", _("stream does not have a callback registered"));
- goto cleanup;
- }
-
- virEventRemoveHandle(qemust->watch);
- if (qemust->dispatching)
- qemust->cbRemoved = 1;
- else if (qemust->ff)
- (qemust->ff)(qemust->opaque);
-
- qemust->watch = 0;
- qemust->ff = NULL;
- qemust->cb = NULL;
- qemust->opaque = NULL;
-
- ret = 0;
-
-cleanup:
- qemuDriverUnlock(driver);
- return ret;
-}
-
-static int qemuStreamMigUpdateCallback(virStreamPtr stream, int events)
-{
- struct qemud_driver *driver = stream->conn->privateData;
- struct qemuStreamMigFile *qemust = stream->privateData;
- int ret = -1;
-
- if (!qemust) {
- qemuReportError(VIR_ERR_INTERNAL_ERROR,
- "%s", _("stream is not open"));
- return -1;
- }
-
- qemuDriverLock(driver);
- if (qemust->watch == 0) {
- qemuReportError(VIR_ERR_INTERNAL_ERROR,
- "%s", _("stream does not have a callback registered"));
- goto cleanup;
- }
-
- virEventUpdateHandle(qemust->watch, events);
-
- ret = 0;
-
-cleanup:
- qemuDriverUnlock(driver);
- return ret;
-}
-
-static void qemuStreamMigEvent(int watch ATTRIBUTE_UNUSED,
- int fd ATTRIBUTE_UNUSED,
- int events,
- void *opaque)
-{
- virStreamPtr stream = opaque;
- struct qemud_driver *driver = stream->conn->privateData;
- struct qemuStreamMigFile *qemust = stream->privateData;
- virStreamEventCallback cb;
- void *cbopaque;
- virFreeCallback ff;
-
- qemuDriverLock(driver);
- if (!qemust || !qemust->cb) {
- qemuDriverUnlock(driver);
- return;
- }
-
- cb = qemust->cb;
- cbopaque = qemust->opaque;
- ff = qemust->ff;
- qemust->dispatching = 1;
- qemuDriverUnlock(driver);
-
- cb(stream, events, cbopaque);
-
- qemuDriverLock(driver);
- qemust->dispatching = 0;
- if (qemust->cbRemoved && ff)
- (ff)(cbopaque);
- qemuDriverUnlock(driver);
-}
-
-static int
-qemuStreamMigAddCallback(virStreamPtr st,
- int events,
- virStreamEventCallback cb,
- void *opaque,
- virFreeCallback ff)
-{
- struct qemud_driver *driver = st->conn->privateData;
- struct qemuStreamMigFile *qemust = st->privateData;
- int ret = -1;
-
- if (!qemust) {
- qemuReportError(VIR_ERR_INTERNAL_ERROR,
- "%s", _("stream is not open"));
- return -1;
- }
-
- qemuDriverLock(driver);
- if (qemust->watch != 0) {
- qemuReportError(VIR_ERR_INTERNAL_ERROR,
- "%s", _("stream already has a callback registered"));
- goto cleanup;
- }
-
- if ((qemust->watch = virEventAddHandle(qemust->fd,
- events,
- qemuStreamMigEvent,
- st,
- NULL)) < 0) {
- qemuReportError(VIR_ERR_INTERNAL_ERROR,
- "%s", _("cannot register file watch on stream"));
- goto cleanup;
- }
-
- qemust->cbRemoved = 0;
- qemust->cb = cb;
- qemust->opaque = opaque;
- qemust->ff = ff;
- virStreamRef(st);
-
- ret = 0;
-
-cleanup:
- qemuDriverUnlock(driver);
- return ret;
-}
-
-static void qemuStreamMigFree(struct qemuStreamMigFile *qemust)
-{
- VIR_FORCE_CLOSE(qemust->fd);
- VIR_FREE(qemust);
-}
-
-static struct qemuStreamMigFile *qemuStreamMigOpen(virStreamPtr st,
- const char *unixfile)
-{
- struct qemuStreamMigFile *qemust = NULL;
- struct sockaddr_un sa_qemu;
- int i = 0;
- int timeout = 3;
- int ret;
-
- if (VIR_ALLOC(qemust) < 0) {
- virReportOOMError();
- return NULL;
- }
-
- qemust->fd = socket(AF_UNIX, SOCK_STREAM, 0);
- if (qemust->fd < 0)
- goto cleanup;
-
- memset(&sa_qemu, 0, sizeof(sa_qemu));
- sa_qemu.sun_family = AF_UNIX;
- if (virStrcpy(sa_qemu.sun_path, unixfile, sizeof(sa_qemu.sun_path)) == NULL)
- goto cleanup;
-
- do {
- ret = connect(qemust->fd, (struct sockaddr *)&sa_qemu, sizeof(sa_qemu));
- if (ret == 0)
- break;
-
- if (errno == ENOENT || errno == ECONNREFUSED) {
- /* ENOENT : Socket may not have shown up yet
- * ECONNREFUSED : Leftover socket hasn't been removed yet */
- continue;
- }
-
- goto cleanup;
- } while ((++i <= timeout*5) && (usleep(.2 * 1000000) <= 0));
-
- if ((st->flags & VIR_STREAM_NONBLOCK) && virSetNonBlock(qemust->fd) < 0)
- goto cleanup;
-
- return qemust;
-
-cleanup:
- qemuStreamMigFree(qemust);
- return NULL;
-}
-
-static int
-qemuStreamMigClose(virStreamPtr st)
-{
- struct qemud_driver *driver = st->conn->privateData;
- struct qemuStreamMigFile *qemust = st->privateData;
-
- if (!qemust)
- return 0;
-
- qemuDriverLock(driver);
-
- qemuStreamMigFree(qemust);
-
- st->privateData = NULL;
-
- qemuDriverUnlock(driver);
-
- return 0;
-}
-
-static int qemuStreamMigWrite(virStreamPtr st, const char *bytes, size_t nbytes)
-{
- struct qemud_driver *driver = st->conn->privateData;
- struct qemuStreamMigFile *qemust = st->privateData;
- int ret;
-
- if (!qemust) {
- qemuReportError(VIR_ERR_INTERNAL_ERROR,
- "%s", _("stream is not open"));
- return -1;
- }
-
- qemuDriverLock(driver);
-
-retry:
- ret = write(qemust->fd, bytes, nbytes);
- if (ret < 0) {
- if (errno == EAGAIN || errno == EWOULDBLOCK) {
- ret = -2;
- } else if (errno == EINTR) {
- goto retry;
- } else {
- ret = -1;
- virReportSystemError(errno, "%s",
- _("cannot write to stream"));
- }
- }
-
- qemuDriverUnlock(driver);
- return ret;
-}
-
-static virStreamDriver qemuStreamMigDrv = {
- .streamSend = qemuStreamMigWrite,
- .streamFinish = qemuStreamMigClose,
- .streamAbort = qemuStreamMigClose,
- .streamAddCallback = qemuStreamMigAddCallback,
- .streamUpdateCallback = qemuStreamMigUpdateCallback,
- .streamRemoveCallback = qemuStreamMigRemoveCallback
-};
-
/* Prepare is the first step, and it runs on the destination host.
*
* This version starts an empty VM listening on a localhost TCP port, and
int internalret;
char *unixfile = NULL;
unsigned long long qemuCmdFlags;
- struct qemuStreamMigFile *qemust = NULL;
qemuDomainObjPrivatePtr priv = NULL;
struct timeval now;
goto endjob;
}
- qemust = qemuStreamMigOpen(st, unixfile);
- if (qemust == NULL) {
+ if (virFDStreamConnectUNIX(st,
+ unixfile,
+ false) < 0) {
qemuDomainStartAudit(vm, "migrated", false);
qemudShutdownVMDaemon(driver, vm, 0);
if (!vm->persistent) {
goto endjob;
}
- st->driver = &qemuStreamMigDrv;
- st->privateData = qemust;
-
qemuDomainStartAudit(vm, "migrated", true);
+
event = virDomainEventNewFromObj(vm,
VIR_DOMAIN_EVENT_STARTED,
VIR_DOMAIN_EVENT_STARTED_MIGRATED);