#include <config.h>
#include <unistd.h>
-#include <poll.h>
#include <signal.h>
#include <fcntl.h>
#include "virerror.h"
#include "virprobe.h"
#include "virstring.h"
+#include "vireventglibwatch.h"
#define VIR_FROM_THIS VIR_FROM_RPC
virNetSASLSessionPtr sasl;
#endif
- /* Self-pipe to wakeup threads waiting in poll() */
- int wakeupSendFD;
- int wakeupReadFD;
+ GMainLoop *eventLoop;
+ GMainContext *eventCtx;
/*
* List of calls currently waiting for dispatch
const char *hostname)
{
virNetClientPtr client = NULL;
- int wakeupFD[2] = { -1, -1 };
if (virNetClientInitialize() < 0)
goto error;
- if (pipe2(wakeupFD, O_CLOEXEC) < 0) {
- virReportSystemError(errno, "%s",
- _("unable to make pipe"));
- goto error;
- }
-
if (!(client = virObjectLockableNew(virNetClientClass)))
goto error;
client->sock = sock;
sock = NULL;
- client->wakeupReadFD = wakeupFD[0];
- client->wakeupSendFD = wakeupFD[1];
- wakeupFD[0] = wakeupFD[1] = -1;
+
+ client->eventCtx = g_main_context_new();
+ client->eventLoop = g_main_loop_new(client->eventCtx, FALSE);
client->hostname = g_strdup(hostname);
return client;
error:
- VIR_FORCE_CLOSE(wakeupFD[0]);
- VIR_FORCE_CLOSE(wakeupFD[1]);
virObjectUnref(client);
virObjectUnref(sock);
return NULL;
virObjectUnref(client->programs[i]);
VIR_FREE(client->programs);
- VIR_FORCE_CLOSE(client->wakeupSendFD);
- VIR_FORCE_CLOSE(client->wakeupReadFD);
+ g_main_loop_unref(client->eventLoop);
+ g_main_context_unref(client->eventCtx);
VIR_FREE(client->hostname);
}
}
+
static void virNetClientCloseInternal(virNetClientPtr client,
int reason)
{
* queue and close the client because we set client->wantClose.
*/
if (client->haveTheBuck) {
- char ignore = 1;
- size_t len = sizeof(ignore);
-
- if (safewrite(client->wakeupSendFD, &ignore, len) != len)
- VIR_ERROR(_("failed to wake up polling thread"));
+ g_main_loop_quit(client->eventLoop);
} else {
virNetClientIOEventLoopPassTheBuck(client, NULL);
}
#endif
+static gboolean
+virNetClientIOEventTLS(int fd,
+ GIOCondition ev,
+ gpointer opaque);
+
+static gboolean
+virNetClientTLSHandshake(virNetClientPtr client)
+{
+ GIOCondition ev;
+ int ret;
+
+ ret = virNetTLSSessionHandshake(client->tls);
+
+ if (ret <= 0)
+ return FALSE;
+
+ if (virNetTLSSessionGetHandshakeStatus(client->tls) ==
+ VIR_NET_TLS_HANDSHAKE_RECVING)
+ ev = G_IO_IN;
+ else
+ ev = G_IO_OUT;
+
+ virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock),
+ ev,
+ client->eventCtx,
+ virNetClientIOEventTLS, client, NULL);
+
+ return TRUE;
+}
+
+
+static gboolean
+virNetClientIOEventTLS(int fd G_GNUC_UNUSED,
+ GIOCondition ev G_GNUC_UNUSED,
+ gpointer opaque)
+{
+ virNetClientPtr client = opaque;
+
+ if (!virNetClientTLSHandshake(client))
+ g_main_loop_quit(client->eventLoop);
+
+ return G_SOURCE_REMOVE;
+}
+
+
+static gboolean
+virNetClientIOEventTLSConfirm(int fd G_GNUC_UNUSED,
+ GIOCondition ev G_GNUC_UNUSED,
+ gpointer opaque)
+{
+ virNetClientPtr client = opaque;
+
+ g_main_loop_quit(client->eventLoop);
+
+ return G_SOURCE_REMOVE;
+}
+
+
int virNetClientSetTLSSession(virNetClientPtr client,
virNetTLSContextPtr tls)
{
int ret;
char buf[1];
int len;
- struct pollfd fds[1];
#ifndef WIN32
sigset_t oldmask, blockedsigs;
virNetSocketSetTLSSession(client->sock, client->tls);
- for (;;) {
- ret = virNetTLSSessionHandshake(client->tls);
-
- if (ret < 0)
- goto error;
- if (ret == 0)
- break;
-
- fds[0].fd = virNetSocketGetFD(client->sock);
- fds[0].revents = 0;
- if (virNetTLSSessionGetHandshakeStatus(client->tls) ==
- VIR_NET_TLS_HANDSHAKE_RECVING)
- fds[0].events = POLLIN;
- else
- fds[0].events = POLLOUT;
-
+ virResetLastError();
+ if (virNetClientTLSHandshake(client)) {
#ifndef WIN32
/* Block SIGWINCH from interrupting poll in curses programs,
* then restore the original signal mask again immediately
ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask));
#endif /* !WIN32 */
- repoll:
- ret = poll(fds, G_N_ELEMENTS(fds), -1);
- if (ret < 0 && (errno == EAGAIN || errno == EINTR))
- goto repoll;
+ g_main_loop_run(client->eventLoop);
#ifndef WIN32
ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL));
#endif /* !WIN32 */
}
+ if (virGetLastErrorCode() != VIR_ERR_OK)
+ goto error;
+
ret = virNetTLSContextCheckCertificate(tls, client->tls);
if (ret < 0)
* etc. If we make the grade, it will send us a '\1' byte.
*/
- fds[0].fd = virNetSocketGetFD(client->sock);
- fds[0].revents = 0;
- fds[0].events = POLLIN;
+ virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock),
+ G_IO_IN,
+ client->eventCtx,
+ virNetClientIOEventTLSConfirm, client, NULL);
#ifndef WIN32
/* Block SIGWINCH from interrupting poll in curses programs */
ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask));
#endif /* !WIN32 */
- repoll2:
- ret = poll(fds, G_N_ELEMENTS(fds), -1);
- if (ret < 0 && (errno == EAGAIN || errno == EINTR))
- goto repoll2;
+ g_main_loop_run(client->eventLoop);
#ifndef WIN32
ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL));
static bool virNetClientIOEventLoopPollEvents(virNetClientCallPtr call,
void *opaque)
{
- struct pollfd *fd = opaque;
+ GIOCondition *ev = opaque;
if (call->mode == VIR_NET_CLIENT_MODE_WAIT_RX)
- fd->events |= POLLIN;
+ *ev |= G_IO_IN;
if (call->mode == VIR_NET_CLIENT_MODE_WAIT_TX)
- fd->events |= POLLOUT;
+ *ev |= G_IO_OUT;
return false;
}
}
+struct virNetClientIOEventData {
+ virNetClientPtr client;
+ GIOCondition rev;
+};
+
+static gboolean
+virNetClientIOEventFD(int fd G_GNUC_UNUSED,
+ GIOCondition ev,
+ gpointer opaque)
+{
+ struct virNetClientIOEventData *data = opaque;
+ data->rev = ev;
+ g_main_loop_quit(data->client->eventLoop);
+ return G_SOURCE_REMOVE;
+}
+
+
/*
* Process all calls pending dispatch/receive until we
* get a reply to our own call. Then quit and pass the buck
static int virNetClientIOEventLoop(virNetClientPtr client,
virNetClientCallPtr thiscall)
{
- struct pollfd fds[2];
bool error = false;
int closeReason;
- int ret;
-
- fds[0].fd = virNetSocketGetFD(client->sock);
- fds[1].fd = client->wakeupReadFD;
for (;;) {
- char ignore;
#ifndef WIN32
sigset_t oldmask, blockedsigs;
#endif /* !WIN32 */
int timeout = -1;
virNetMessagePtr msg = NULL;
+ GIOCondition ev = 0;
+ struct virNetClientIOEventData data = {
+ .client = client,
+ .rev = 0,
+ };
/* If we have existing SASL decoded data we don't want to sleep in
* the poll(), just check if any other FDs are also ready.
if (timeout == -1)
timeout = virKeepAliveTimeout(client->keepalive);
- fds[0].events = fds[0].revents = 0;
- fds[1].events = fds[1].revents = 0;
-
- fds[1].events = POLLIN;
-
/* Calculate poll events for calls */
virNetClientCallMatchPredicate(client->waitDispatch,
virNetClientIOEventLoopPollEvents,
- &fds[0]);
+ &ev);
/* We have to be prepared to receive stream data
* regardless of whether any of the calls waiting
* for dispatch are for streams.
*/
if (client->nstreams)
- fds[0].events |= POLLIN;
+ ev |= G_IO_IN;
+
+ virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock),
+ ev,
+ client->eventCtx,
+ virNetClientIOEventFD, &data, NULL);
/* Release lock while poll'ing so other threads
* can stuff themselves on the queue */
sigaddset(&blockedsigs, SIGCHLD);
# endif
sigaddset(&blockedsigs, SIGPIPE);
+
ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask));
#endif /* !WIN32 */
- repoll:
- ret = poll(fds, G_N_ELEMENTS(fds), timeout);
- if (ret < 0 && (errno == EAGAIN || errno == EINTR))
- goto repoll;
+ g_main_loop_run(client->eventLoop);
#ifndef WIN32
ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL));
virObjectLock(client);
- if (ret < 0) {
- virReportSystemError(errno,
- "%s", _("poll on socket failed"));
- goto error;
- }
-
if (virKeepAliveTrigger(client->keepalive, &msg)) {
virNetClientMarkClose(client, VIR_CONNECT_CLOSE_REASON_KEEPALIVE);
} else if (msg && virNetClientQueueNonBlocking(client, msg) < 0) {
* the socket became readable so we consume it
*/
if (virNetSocketHasCachedData(client->sock))
- fds[0].revents |= POLLIN;
+ data.rev |= G_IO_IN;
/* If wantClose flag is set, pretend there was an error on the socket,
* but still read and process any data we received so far.
if (client->wantClose)
error = true;
- if (fds[1].revents) {
- VIR_DEBUG("Woken up from poll by other thread");
- if (saferead(client->wakeupReadFD, &ignore, sizeof(ignore)) != sizeof(ignore)) {
- virReportSystemError(errno, "%s",
- _("read on wakeup fd failed"));
- virNetClientMarkClose(client, VIR_CONNECT_CLOSE_REASON_ERROR);
- error = true;
- /* Fall through to process any pending data. */
- }
- }
-
- if (fds[0].revents & POLLHUP)
+ if (data.rev & G_IO_HUP)
closeReason = VIR_CONNECT_CLOSE_REASON_EOF;
else
closeReason = VIR_CONNECT_CLOSE_REASON_ERROR;
- if (fds[0].revents & POLLOUT) {
+ if (data.rev & G_IO_OUT) {
if (virNetClientIOHandleOutput(client) < 0) {
virNetClientMarkClose(client, closeReason);
error = true;
}
}
- if (fds[0].revents & POLLIN) {
+ if (data.rev & G_IO_IN) {
if (virNetClientIOHandleInput(client) < 0) {
virNetClientMarkClose(client, closeReason);
error = true;
if (error)
goto error;
- if (fds[0].revents & POLLHUP) {
+ if (data.rev & G_IO_HUP) {
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
_("received hangup event on socket"));
virNetClientMarkClose(client, closeReason);
goto error;
}
- if (fds[0].revents & POLLERR) {
+ if (data.rev & G_IO_ERR) {
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
_("received error event on socket"));
virNetClientMarkClose(client, closeReason);
/* Check to see if another thread is dispatching */
if (client->haveTheBuck) {
- char ignore = 1;
-
/* Force other thread to wakeup from poll */
- if (safewrite(client->wakeupSendFD, &ignore, sizeof(ignore)) != sizeof(ignore)) {
- virNetClientCallRemove(&client->waitDispatch, thiscall);
- virReportSystemError(errno, "%s",
- _("failed to wake up polling thread"));
- return -1;
- }
+ g_main_loop_quit(client->eventLoop);
/* If we are non-blocking, detach the thread and keep the call in the
* queue. */