#include "virpidfile.h"
#include "virprocess.h"
#include "virbuffer.h"
+#include "virthread.h"
#define VIR_FROM_THIS VIR_FROM_NONE
char **errbuf;
int infd;
+ int inpipe;
int outfd;
int errfd;
int *outfdptr;
int *errfdptr;
- size_t inbufOffset;
- int inWatch;
- int outWatch;
- int errWatch;
+ virThreadPtr asyncioThread;
bool handshake;
int handshakeWait[2];
cmd->handshakeNotify[0] = -1;
cmd->handshakeNotify[1] = -1;
- cmd->infd = cmd->outfd = cmd->errfd = -1;
- cmd->inWatch = cmd->outWatch = cmd->errWatch = -1;
+ cmd->infd = cmd->inpipe = cmd->outfd = cmd->errfd = -1;
cmd->pid = -1;
virCommandAddArgSet(cmd, args);
* Manage input and output to the child process.
*/
static int
-virCommandProcessIO(virCommandPtr cmd, int *inpipe)
+virCommandProcessIO(virCommandPtr cmd)
{
- int infd = -1, outfd = -1, errfd = -1;
+ int outfd = -1, errfd = -1;
size_t inlen = 0, outlen = 0, errlen = 0;
size_t inoff = 0;
int ret = 0;
/* With an input buffer, feed data to child
* via pipe */
- if (cmd->inbuf) {
+ if (cmd->inbuf)
inlen = strlen(cmd->inbuf);
- infd = *inpipe;
- }
/* With out/err buffer, the outfd/errfd have been filled with an
* FD for us. Guarantee an allocated string with partial results
struct pollfd fds[3];
int nfds = 0;
- if (infd != -1) {
- fds[nfds].fd = infd;
+ if (cmd->inpipe != -1) {
+ fds[nfds].fd = cmd->inpipe;
fds[nfds].events = POLLOUT;
fds[nfds].revents = 0;
nfds++;
}
if (fds[i].revents & (POLLOUT | POLLERR) &&
- fds[i].fd == infd) {
+ fds[i].fd == cmd->inpipe) {
int done;
/* Coverity 5.3.0 can't see that we only get here if
* infd is in the set because it was non-negative. */
sa_assert(infd != -1);
- done = write(infd, cmd->inbuf + inoff,
+ done = write(cmd->inpipe, cmd->inbuf + inoff,
inlen - inoff);
if (done < 0) {
if (errno == EPIPE) {
VIR_DEBUG("child closed stdin early, ignoring EPIPE "
- "on fd %d", infd);
- if (VIR_CLOSE(*inpipe) < 0)
- VIR_DEBUG("ignoring failed close on fd %d", infd);
- infd = -1;
+ "on fd %d", cmd->inpipe);
+ VIR_FORCE_CLOSE(cmd->inpipe);
} else if (errno != EINTR && errno != EAGAIN) {
virReportSystemError(errno, "%s",
_("unable to write to child input"));
}
} else {
inoff += done;
- if (inoff == inlen) {
- if (VIR_CLOSE(*inpipe) < 0)
- VIR_DEBUG("ignoring failed close on fd %d", infd);
- infd = -1;
- }
+ if (inoff == inlen)
+ VIR_FORCE_CLOSE(cmd->inpipe);
}
}
}
int ret = 0;
char *outbuf = NULL;
char *errbuf = NULL;
- int infd[2] = { -1, -1 };
struct stat st;
bool string_io;
bool async_io = false;
}
}
- /* If we have an input buffer, we need
- * a pipe to feed the data to the child */
- if (cmd->inbuf) {
- if (pipe2(infd, O_CLOEXEC) < 0) {
- virReportSystemError(errno, "%s",
- _("unable to open pipe"));
- cmd->has_error = -1;
- return -1;
- }
- cmd->infd = infd[0];
- }
-
/* If caller requested the same string for stdout and stderr, then
* merge those into one string. */
if (cmd->outbuf && cmd->outbuf == cmd->errbuf) {
cmd->flags |= VIR_EXEC_RUN_SYNC;
if (virCommandRunAsync(cmd, NULL) < 0) {
- if (cmd->inbuf) {
- tmpfd = infd[0];
- if (VIR_CLOSE(infd[0]) < 0)
- VIR_DEBUG("ignoring failed close on fd %d", tmpfd);
- tmpfd = infd[1];
- if (VIR_CLOSE(infd[1]) < 0)
- VIR_DEBUG("ignoring failed close on fd %d", tmpfd);
- }
cmd->has_error = -1;
return -1;
}
- tmpfd = infd[0];
- if (VIR_CLOSE(infd[0]) < 0)
- VIR_DEBUG("ignoring failed close on fd %d", tmpfd);
- if (string_io)
- ret = virCommandProcessIO(cmd, &infd[1]);
+ if (string_io) {
+ VIR_FORCE_CLOSE(cmd->infd);
+ ret = virCommandProcessIO(cmd);
+ }
if (virCommandWait(cmd, exitstatus) < 0)
ret = -1;
/* Reset any capturing, in case caller runs
* this identical command again */
- if (cmd->inbuf) {
- tmpfd = infd[1];
- if (VIR_CLOSE(infd[1]) < 0)
- VIR_DEBUG("ignoring failed close on fd %d", tmpfd);
- }
+ VIR_FORCE_CLOSE(cmd->inpipe);
if (cmd->outbuf == &outbuf) {
tmpfd = cmd->outfd;
if (VIR_CLOSE(cmd->outfd) < 0)
static void
-virCommandHandleReadWrite(int watch, int fd, int events, void *opaque)
-{
- virCommandPtr cmd = (virCommandPtr) opaque;
- char ***bufptr = NULL;
- char buf[1024];
- ssize_t nread, nwritten;
- size_t len = 0;
- int *watchPtr = NULL;
- bool eof = false;
- int *fdptr = NULL, **fdptrptr = NULL;
-
- VIR_DEBUG("watch=%d fd=%d events=%d", watch, fd, events);
- errno = 0;
-
- if (watch == cmd->inWatch) {
- watchPtr = &cmd->inWatch;
- fdptr = &cmd->infd;
-
- if (events & VIR_EVENT_HANDLE_WRITABLE) {
- len = strlen(cmd->inbuf);
-
- while (true) {
- nwritten = write(fd, cmd->inbuf + cmd->inbufOffset,
- len - cmd->inbufOffset);
- if (nwritten < 0) {
- if (errno != EAGAIN && errno != EINTR) {
- virReportSystemError(errno,
- _("Unable to write command's "
- "input to FD %d"),
- fd);
- eof = true;
- }
- break;
- }
-
- if (nwritten == 0) {
- eof = true;
- break;
- }
-
- cmd->inbufOffset += nwritten;
- if (cmd->inbufOffset == len) {
- VIR_FORCE_CLOSE(cmd->infd);
- eof = true;
- break;
- }
- }
-
- }
- } else {
- if (watch == cmd->outWatch) {
- watchPtr = &cmd->outWatch;
- bufptr = &cmd->outbuf;
- fdptr = &cmd->outfd;
- fdptrptr = &cmd->outfdptr;
- } else {
- watchPtr = &cmd->errWatch;
- bufptr = &cmd->errbuf;
- fdptr = &cmd->errfd;
- fdptrptr = &cmd->errfdptr;
- }
-
- if (events & VIR_EVENT_HANDLE_READABLE) {
- if (**bufptr)
- len = strlen(**bufptr);
-
- while (true) {
- nread = read(fd, buf, sizeof(buf));
- if (nread < 0) {
- if (errno != EAGAIN && errno != EINTR) {
- virReportSystemError(errno,
- _("unable to read command's "
- "output from FD %d"),
- fd);
- eof = true;
- }
- break;
- }
-
- if (nread == 0) {
- eof = true;
- break;
- }
-
- if (VIR_REALLOC_N(**bufptr, len + nread + 1) < 0) {
- virReportOOMError();
- break;
- }
-
- memcpy(**bufptr + len, buf, nread);
- (**bufptr)[len + nread] = '\0';
- }
-
- }
- }
-
- if (eof || (events & VIR_EVENT_HANDLE_HANGUP) ||
- (events & VIR_EVENT_HANDLE_ERROR)) {
- virEventRemoveHandle(watch);
-
- *watchPtr = -1;
- VIR_FORCE_CLOSE(*fdptr);
- if (bufptr)
- *bufptr = NULL;
- if (fdptrptr)
- *fdptrptr = NULL;
- }
-}
-
-
-static int
-virCommandRegisterEventLoop(virCommandPtr cmd)
+virCommandDoAsyncIOHelper(void *opaque)
{
- int ret = -1;
-
- if (cmd->inbuf &&
- (cmd->inWatch = virEventAddHandle(cmd->infd,
- VIR_EVENT_HANDLE_WRITABLE |
- VIR_EVENT_HANDLE_HANGUP |
- VIR_EVENT_HANDLE_ERROR,
- virCommandHandleReadWrite,
- cmd, NULL)) < 0) {
- virReportError(VIR_ERR_INTERNAL_ERROR,
- _("Unable to register infd %d in the event loop"),
- cmd->infd);
- goto cleanup;
- }
-
- if (cmd->outbuf && cmd->outfdptr == &cmd->outfd &&
- (cmd->outWatch = virEventAddHandle(cmd->outfd,
- VIR_EVENT_HANDLE_READABLE |
- VIR_EVENT_HANDLE_HANGUP |
- VIR_EVENT_HANDLE_ERROR,
- virCommandHandleReadWrite,
- cmd, NULL)) < 0) {
- virReportError(VIR_ERR_INTERNAL_ERROR,
- _("Unable to register outfd %d in the event loop"),
- cmd->outfd);
-
- if (cmd->inWatch != -1) {
- virEventRemoveHandle(cmd->inWatch);
- cmd->inWatch = -1;
- }
- goto cleanup;
+ virCommandPtr cmd = opaque;
+ if (virCommandProcessIO(cmd) < 0) {
+ /* If something went wrong, save errno or -1*/
+ cmd->has_error = errno ? errno : -1;
}
-
- if (cmd->errbuf && cmd->errfdptr == &cmd->errfd &&
- (cmd->errWatch = virEventAddHandle(cmd->errfd,
- VIR_EVENT_HANDLE_READABLE |
- VIR_EVENT_HANDLE_HANGUP |
- VIR_EVENT_HANDLE_ERROR,
- virCommandHandleReadWrite,
- cmd, NULL)) < 0) {
- virReportError(VIR_ERR_INTERNAL_ERROR,
- _("Unable to register errfd %d in the event loop"),
- cmd->errfd);
- if (cmd->inWatch != -1) {
- virEventRemoveHandle(cmd->inWatch);
- cmd->inWatch = -1;
- }
- if (cmd->outWatch != -1) {
- virEventRemoveHandle(cmd->outWatch);
- cmd->outWatch = -1;
- }
- goto cleanup;
- }
-
- ret = 0;
-
-cleanup:
- return ret;
}
int
virCommandRunAsync(virCommandPtr cmd, pid_t *pid)
{
- int ret;
+ int ret = -1;
char *str;
int i;
bool synchronous = false;
synchronous = cmd->flags & VIR_EXEC_RUN_SYNC;
cmd->flags &= ~VIR_EXEC_RUN_SYNC;
- /* Buffer management can only be requested via virCommandRun, unless help
- * from the event loop has been requested via virCommandDoAsyncIO. */
- if (cmd->flags & VIR_EXEC_ASYNC_IO) {
- /* If we have an input buffer, we need
- * a pipe to feed the data to the child */
- if (cmd->inbuf && cmd->infd == -1) {
- if (pipe2(infd, O_CLOEXEC) < 0) {
- virReportSystemError(errno, "%s",
- _("unable to open pipe"));
- cmd->has_error = -1;
- return -1;
- }
- cmd->infd = infd[0];
+ /* Buffer management can only be requested via virCommandRun or
+ * virCommandDoAsyncIO. */
+ if (cmd->inbuf && cmd->infd == -1 &&
+ (synchronous || cmd->flags & VIR_EXEC_ASYNC_IO)) {
+ if (pipe2(infd, O_CLOEXEC) < 0) {
+ virReportSystemError(errno, "%s",
+ _("unable to open pipe"));
+ cmd->has_error = -1;
+ return -1;
}
+ cmd->infd = infd[0];
+ cmd->inpipe = infd[1];
} else if ((cmd->inbuf && cmd->infd == -1) ||
- (cmd->outbuf && cmd->outfdptr != &cmd->outfd) ||
- (cmd->errbuf && cmd->errfdptr != &cmd->errfd)) {
+ (cmd->outbuf && cmd->outfdptr != &cmd->outfd) ||
+ (cmd->errbuf && cmd->errfdptr != &cmd->errfd)) {
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
_("cannot mix string I/O with asynchronous command"));
return -1;
virReportError(VIR_ERR_INTERNAL_ERROR,
_("command is already running as pid %lld"),
(long long) cmd->pid);
- return -1;
+ goto cleanup;
}
if (!synchronous && (cmd->flags & VIR_EXEC_DAEMON)) {
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
_("daemonized command cannot use virCommandRunAsync"));
- return -1;
+ goto cleanup;
}
if (cmd->pwd && (cmd->flags & VIR_EXEC_DAEMON)) {
virReportError(VIR_ERR_INTERNAL_ERROR,
_("daemonized command cannot set working directory %s"),
cmd->pwd);
- return -1;
+ goto cleanup;
}
if (cmd->pidfile && !(cmd->flags & VIR_EXEC_DAEMON)) {
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
_("creation of pid file requires daemonized command"));
- return -1;
+ goto cleanup;
}
str = virCommandToString(cmd);
cmd->reap = true;
if (ret == 0 && cmd->flags & VIR_EXEC_ASYNC_IO) {
- cmd->flags &= ~VIR_EXEC_ASYNC_IO;
- if (cmd->inbuf && cmd->infd != -1) {
- /* close the read end of infd and replace it with the write end */
+ if (cmd->inbuf)
VIR_FORCE_CLOSE(cmd->infd);
- cmd->infd = infd[1];
+ /* clear any error so we can catch if the helper thread reports one */
+ cmd->has_error = 0;
+ if (VIR_ALLOC(cmd->asyncioThread) < 0 ||
+ virThreadCreate(cmd->asyncioThread, true,
+ virCommandDoAsyncIOHelper, cmd) < 0) {
+ virReportSystemError(errno, "%s",
+ _("Unable to create thread "
+ "to process command's IO"));
+ VIR_FREE(cmd->asyncioThread);
+ virCommandAbort(cmd);
+ ret = -1;
}
- ret = virCommandRegisterEventLoop(cmd);
}
+cleanup:
+ if (ret < 0) {
+ VIR_FORCE_CLOSE(cmd->infd);
+ VIR_FORCE_CLOSE(cmd->inpipe);
+ }
return ret;
}
{
int ret;
int status = 0;
- const int events = VIR_EVENT_HANDLE_READABLE | VIR_EVENT_HANDLE_HANGUP;
if (!cmd ||cmd->has_error == ENOMEM) {
virReportOOMError();
* guarantee that virProcessWait only fails due to failure to wait,
* and repeat the exitstatus check code ourselves. */
ret = virProcessWait(cmd->pid, exitstatus ? exitstatus : &status);
-
- if (cmd->inWatch != -1) {
- virEventRemoveHandle(cmd->inWatch);
- cmd->inWatch = -1;
- }
-
- if (cmd->outWatch != -1) {
- virEventRemoveHandle(cmd->outWatch);
- virCommandHandleReadWrite(cmd->outWatch, cmd->outfd, events, cmd);
- cmd->outWatch = -1;
- }
-
- if (cmd->errWatch != -1) {
- virEventRemoveHandle(cmd->errWatch);
- virCommandHandleReadWrite(cmd->errWatch, cmd->errfd, events, cmd);
- cmd->errWatch = -1;
+ if (cmd->flags & VIR_EXEC_ASYNC_IO) {
+ cmd->flags &= ~VIR_EXEC_ASYNC_IO;
+ virThreadJoin(cmd->asyncioThread);
+ VIR_FREE(cmd->asyncioThread);
+ VIR_FORCE_CLOSE(cmd->inpipe);
+ if (cmd->has_error) {
+ const char *msg = _("Error while processing command's IO");
+ if (cmd->has_error < 0)
+ virReportError(VIR_ERR_INTERNAL_ERROR, "%s", msg);
+ else
+ virReportSystemError(cmd->has_error, "%s", msg);
+ ret = -1;
+ }
}
-
if (ret == 0) {
cmd->pid = -1;
cmd->reap = false;
VIR_FORCE_CLOSE(cmd->transfer[i]);
}
+ if (cmd->asyncioThread) {
+ virThreadJoin(cmd->asyncioThread);
+ VIR_FREE(cmd->asyncioThread);
+ }
VIR_FREE(cmd->inbuf);
VIR_FORCE_CLOSE(cmd->outfd);
VIR_FORCE_CLOSE(cmd->errfd);