#include <netdb.h>
+#include <poll.h>
+
/* AI_ADDRCONFIG is missing on some systems. */
#ifndef AI_ADDRCONFIG
# define AI_ADDRCONFIG 0
#include "util.h"
#include "event.h"
+#ifdef WIN32
+#define pipe(fds) _pipe(fds,4096, _O_BINARY)
+#endif
+
+
static int inside_daemon = 0;
+struct remote_thread_call;
+
+
+enum {
+ REMOTE_MODE_WAIT_TX,
+ REMOTE_MODE_WAIT_RX,
+ REMOTE_MODE_COMPLETE,
+ REMOTE_MODE_ERROR,
+};
+
+struct remote_thread_call {
+ int mode;
+
+ /* 4 byte length, followed by RPC message header+body */
+ char buffer[4 + REMOTE_MESSAGE_MAX];
+ unsigned int bufferLength;
+ unsigned int bufferOffset;
+
+ unsigned int serial;
+ unsigned int proc_nr;
+
+ virCond cond;
+
+ xdrproc_t ret_filter;
+ char *ret;
+
+ remote_error err;
+
+ struct remote_thread_call *next;
+};
+
struct private_data {
virMutex lock;
int localUses; /* Ref count for private data */
char *hostname; /* Original hostname */
FILE *debugLog; /* Debug remote protocol */
+
#if HAVE_SASL
sasl_conn_t *saslconn; /* SASL context */
+
const char *saslDecoded;
unsigned int saslDecodedLength;
unsigned int saslDecodedOffset;
+
+ const char *saslEncoded;
+ unsigned int saslEncodedLength;
+ unsigned int saslEncodedOffset;
#endif
+
+ /* 4 byte length, followed by RPC message header+body */
+ char buffer[4 + REMOTE_MESSAGE_MAX];
+ unsigned int bufferLength;
+ unsigned int bufferOffset;
+
/* The list of domain event callbacks */
virDomainEventCallbackListPtr callbackList;
/* The queue of domain events generated
virDomainEventQueuePtr domainEvents;
/* Timer for flushing domainEvents queue */
int eventFlushTimer;
+
+ /* Self-pipe to wakeup threads waiting in poll() */
+ int wakeupSendFD;
+ int wakeupReadFD;
+
+ /* List of threads currently waiting for dispatch */
+ struct remote_thread_call *waitDispatch;
};
enum {
static void make_nonnull_storage_pool (remote_nonnull_storage_pool *pool_dst, virStoragePoolPtr vol_src);
static void make_nonnull_storage_vol (remote_nonnull_storage_vol *vol_dst, virStorageVolPtr vol_src);
void remoteDomainEventFired(int watch, int fd, int event, void *data);
-static void remoteDomainProcessEvent(virConnectPtr conn, XDR *xdr);
static void remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr);
void remoteDomainEventQueueFlush(int timer, void *opaque);
/*----------------------------------------------------------------------*/
virConnectAuthPtr auth ATTRIBUTE_UNUSED,
int flags)
{
+ int wakeupFD[2];
char *transport_str = NULL;
if (conn->uri) {
} /* switch (transport) */
+ if (virSetNonBlock(priv->sock) < 0) {
+ errorf (conn, VIR_ERR_SYSTEM_ERROR,
+ _("unable to make socket non-blocking %s"),
+ strerror(errno));
+ goto failed;
+ }
+
+ if (pipe(wakeupFD) < 0) {
+ errorf (conn, VIR_ERR_SYSTEM_ERROR,
+ _("unable to make pipe %s"),
+ strerror(errno));
+ goto failed;
+ }
+ priv->wakeupReadFD = wakeupFD[0];
+ priv->wakeupSendFD = wakeupFD[1];
/* Try and authenticate with server */
if (remoteAuthenticate(conn, priv, 1, auth, authtype) == -1)
DEBUG0("virEventAddTimeout failed: No addTimeoutImpl defined. "
"continuing without events.");
virEventRemoveHandle(priv->watch);
+ priv->watch = -1;
}
}
/* Successful. */
}
remoteDriverLock(priv);
priv->localUses = 1;
+ priv->watch = -1;
if (flags & VIR_CONNECT_RO)
rflags |= VIR_DRV_OPEN_REMOTE_RO;
virEventRemoveTimeout(priv->eventFlushTimer);
/* Remove handle for remote events */
virEventRemoveHandle(priv->watch);
+ priv->watch = -1;
}
/* Close socket. */
/*----------------------------------------------------------------------*/
-static int really_write (virConnectPtr conn, struct private_data *priv,
- int in_open, char *bytes, int len);
-static int really_read (virConnectPtr conn, struct private_data *priv,
- int in_open, char *bytes, int len);
-/* This function performs a remote procedure call to procedure PROC_NR.
- *
- * NB. This does not free the args structure (not desirable, since you
- * often want this allocated on the stack or else it contains strings
- * which come from the user). It does however free any intermediate
- * results, eg. the error structure if there is one.
- *
- * NB(2). Make sure to memset (&ret, 0, sizeof ret) before calling,
- * else Bad Things will happen in the XDR code.
- */
-static int
-doCall (virConnectPtr conn, struct private_data *priv,
- int flags /* if we are in virConnectOpen */,
- int proc_nr,
- xdrproc_t args_filter, char *args,
- xdrproc_t ret_filter, char *ret)
-{
- char buffer[REMOTE_MESSAGE_MAX];
- char buffer2[4];
- struct remote_message_header hdr;
+static struct remote_thread_call *
+prepareCall(virConnectPtr conn,
+ struct private_data *priv,
+ int flags,
+ int proc_nr,
+ xdrproc_t args_filter, char *args,
+ xdrproc_t ret_filter, char *ret)
+{
XDR xdr;
- int len;
- struct remote_error rerror;
+ struct remote_message_header hdr;
+ struct remote_thread_call *rv;
+
+ if (VIR_ALLOC(rv) < 0)
+ return NULL;
+
+ if (virCondInit(&rv->cond) < 0) {
+ VIR_FREE(rv);
+ error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
+ VIR_ERR_INTERNAL_ERROR,
+ _("cannot initialize mutex"));
+ return NULL;
+ }
/* Get a unique serial number for this message. */
- int serial = priv->counter++;
+ rv->serial = priv->counter++;
+ rv->proc_nr = proc_nr;
+ rv->ret_filter = ret_filter;
+ rv->ret = ret;
hdr.prog = REMOTE_PROGRAM;
hdr.vers = REMOTE_PROTOCOL_VERSION;
hdr.proc = proc_nr;
hdr.direction = REMOTE_CALL;
- hdr.serial = serial;
+ hdr.serial = rv->serial;
hdr.status = REMOTE_OK;
/* Serialise header followed by args. */
- xdrmem_create (&xdr, buffer, sizeof buffer, XDR_ENCODE);
+ xdrmem_create (&xdr, rv->buffer+4, REMOTE_MESSAGE_MAX, XDR_ENCODE);
if (!xdr_remote_message_header (&xdr, &hdr)) {
error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
VIR_ERR_RPC, _("xdr_remote_message_header failed"));
- return -1;
+ goto error;
}
if (!(*args_filter) (&xdr, args)) {
error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC,
_("marshalling args"));
- return -1;
+ goto error;
}
/* Get the length stored in buffer. */
- len = xdr_getpos (&xdr);
+ rv->bufferLength = xdr_getpos (&xdr);
xdr_destroy (&xdr);
/* Length must include the length word itself (always encoded in
* 4 bytes as per RFC 4506).
*/
- len += 4;
+ rv->bufferLength += 4;
/* Encode the length word. */
- xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_ENCODE);
- if (!xdr_int (&xdr, &len)) {
+ xdrmem_create (&xdr, rv->buffer, 4, XDR_ENCODE);
+ if (!xdr_int (&xdr, (int *)&rv->bufferLength)) {
error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC,
_("xdr_int (length word)"));
- return -1;
+ goto error;
}
xdr_destroy (&xdr);
- /* Send length word followed by header+args. */
- if (really_write (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer2, sizeof buffer2) == -1 ||
- really_write (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer, len-4) == -1)
- return -1;
+ return rv;
+
+error:
+ xdr_destroy (&xdr);
+ VIR_FREE(rv);
+ return NULL;
+}
+
+
+
+static int
+processCallWrite(virConnectPtr conn,
+ struct private_data *priv,
+ int in_open /* if we are in virConnectOpen */,
+ const char *bytes, int len)
+{
+ int ret;
+
+ if (priv->uses_tls) {
+ tls_resend:
+ ret = gnutls_record_send (priv->session, bytes, len);
+ if (ret < 0) {
+ if (ret == GNUTLS_E_INTERRUPTED)
+ goto tls_resend;
+ if (ret == GNUTLS_E_AGAIN)
+ return 0;
+
+ error (in_open ? NULL : conn,
+ VIR_ERR_GNUTLS_ERROR, gnutls_strerror (ret));
+ return -1;
+ }
+ } else {
+ resend:
+ ret = send (priv->sock, bytes, len, 0);
+ if (ret == -1) {
+ if (errno == EINTR)
+ goto resend;
+ if (errno == EWOULDBLOCK)
+ return 0;
+
+ error (in_open ? NULL : conn,
+ VIR_ERR_SYSTEM_ERROR, strerror (errno));
+ return -1;
+
+ }
+ }
+
+ return ret;
+}
+
+
+static int
+processCallRead(virConnectPtr conn,
+ struct private_data *priv,
+ int in_open /* if we are in virConnectOpen */,
+ char *bytes, int len)
+{
+ int ret;
+
+ if (priv->uses_tls) {
+ tls_resend:
+ ret = gnutls_record_recv (priv->session, bytes, len);
+ if (ret == GNUTLS_E_INTERRUPTED)
+ goto tls_resend;
+ if (ret == GNUTLS_E_AGAIN)
+ return 0;
+
+ /* Treat 0 == EOF as an error */
+ if (ret <= 0) {
+ if (ret < 0)
+ errorf (in_open ? NULL : conn,
+ VIR_ERR_GNUTLS_ERROR,
+ _("failed to read from TLS socket %s"),
+ gnutls_strerror (ret));
+ else
+ errorf (in_open ? NULL : conn,
+ VIR_ERR_SYSTEM_ERROR,
+ "%s", _("server closed connection"));
+ return -1;
+ }
+ } else {
+ resend:
+ ret = recv (priv->sock, bytes, len, 0);
+ if (ret <= 0) {
+ if (ret == -1) {
+ if (errno == EINTR)
+ goto resend;
+ if (errno == EWOULDBLOCK)
+ return 0;
+
+ errorf (in_open ? NULL : conn,
+ VIR_ERR_SYSTEM_ERROR,
+ _("failed to read from socket %s"),
+ strerror (errno));
+ } else {
+ errorf (in_open ? NULL : conn,
+ VIR_ERR_SYSTEM_ERROR,
+ "%s", _("server closed connection"));
+ }
+ return -1;
+ }
+ }
+
+ return ret;
+}
+
+
+static int
+processCallSendOne(virConnectPtr conn,
+ struct private_data *priv,
+ int in_open,
+ struct remote_thread_call *thecall)
+{
+#if HAVE_SASL
+ if (priv->saslconn) {
+ const char *output;
+ unsigned int outputlen;
+ int err, ret;
+
+ if (!priv->saslEncoded) {
+ err = sasl_encode(priv->saslconn,
+ thecall->buffer + thecall->bufferOffset,
+ thecall->bufferLength - thecall->bufferOffset,
+ &output, &outputlen);
+ if (err != SASL_OK) {
+ errorf (in_open ? NULL : conn, VIR_ERR_INTERNAL_ERROR,
+ _("failed to encode SASL data: %s"),
+ sasl_errstring(err, NULL, NULL));
+ return -1;
+ }
+ priv->saslEncoded = output;
+ priv->saslEncodedLength = outputlen;
+ priv->saslEncodedOffset = 0;
+
+ thecall->bufferOffset = thecall->bufferLength;
+ }
+
+ ret = processCallWrite(conn, priv, in_open,
+ priv->saslEncoded + priv->saslEncodedOffset,
+ priv->saslEncodedLength - priv->saslEncodedOffset);
+ if (ret < 0)
+ return ret;
+ priv->saslEncodedOffset += ret;
+
+ if (priv->saslEncodedOffset == priv->saslEncodedLength) {
+ priv->saslEncoded = NULL;
+ priv->saslEncodedOffset = priv->saslEncodedLength = 0;
+ thecall->mode = REMOTE_MODE_WAIT_RX;
+ }
+ } else {
+#endif
+ int ret;
+ ret = processCallWrite(conn, priv, in_open,
+ thecall->buffer + thecall->bufferOffset,
+ thecall->bufferLength - thecall->bufferOffset);
+ if (ret < 0)
+ return ret;
+ thecall->bufferOffset += ret;
+
+ if (thecall->bufferOffset == thecall->bufferLength) {
+ thecall->bufferOffset = thecall->bufferLength = 0;
+ thecall->mode = REMOTE_MODE_WAIT_RX;
+ }
+#if HAVE_SASL
+ }
+#endif
+ return 0;
+}
+
+
+static int
+processCallSend(virConnectPtr conn, struct private_data *priv,
+ int in_open) {
+ struct remote_thread_call *thecall = priv->waitDispatch;
+
+ while (thecall &&
+ thecall->mode != REMOTE_MODE_WAIT_TX)
+ thecall = thecall->next;
+
+ if (!thecall)
+ return -1; /* Shouldn't happen, but you never know... */
+
+ while (thecall) {
+ int ret = processCallSendOne(conn, priv, in_open, thecall);
+ if (ret < 0)
+ return ret;
+
+ if (thecall->mode == REMOTE_MODE_WAIT_TX)
+ return 0; /* Blocking write, to back to event loop */
+
+ thecall = thecall->next;
+ }
+
+ return 0; /* No more calls to send, all done */
+}
+
+static int
+processCallRecvSome(virConnectPtr conn, struct private_data *priv,
+ int in_open) {
+ unsigned int wantData;
+
+ /* Start by reading length word */
+ if (priv->bufferLength == 0)
+ priv->bufferLength = 4;
+
+ wantData = priv->bufferLength - priv->bufferOffset;
+
+#if HAVE_SASL
+ if (priv->saslconn) {
+ if (priv->saslDecoded == NULL) {
+ char encoded[8192];
+ unsigned int encodedLen = sizeof(encoded);
+ int ret, err;
+ ret = processCallRead(conn, priv, in_open,
+ encoded, encodedLen);
+ if (ret < 0)
+ return -1;
+ if (ret == 0)
+ return 0;
+
+ err = sasl_decode(priv->saslconn, encoded, ret,
+ &priv->saslDecoded, &priv->saslDecodedLength);
+ if (err != SASL_OK) {
+ errorf (in_open ? NULL : conn, VIR_ERR_INTERNAL_ERROR,
+ _("failed to decode SASL data: %s"),
+ sasl_errstring(err, NULL, NULL));
+ return -1;
+ }
+ priv->saslDecodedOffset = 0;
+ }
+
+ if ((priv->saslDecodedLength - priv->saslDecodedOffset) < wantData)
+ wantData = (priv->saslDecodedLength - priv->saslDecodedOffset);
+
+ memcpy(priv->buffer + priv->bufferOffset,
+ priv->saslDecoded + priv->saslDecodedOffset,
+ wantData);
+ priv->saslDecodedOffset += wantData;
+ priv->bufferOffset += wantData;
+ if (priv->saslDecodedOffset == priv->saslDecodedLength) {
+ priv->saslDecodedLength = priv->saslDecodedLength = 0;
+ priv->saslDecoded = NULL;
+ }
+
+ return wantData;
+ } else {
+#endif
+ int ret;
+
+ ret = processCallRead(conn, priv, in_open,
+ priv->buffer + priv->bufferOffset,
+ wantData);
+ if (ret < 0)
+ return -1;
+ if (ret == 0)
+ return 0;
+
+ priv->bufferOffset += ret;
+
+ return ret;
+#if HAVE_SASL
+ }
+#endif
+}
-retry_read:
- /* Read and deserialise length word. */
- if (really_read (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer2, sizeof buffer2) == -1)
- return -1;
- xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_DECODE);
+static void
+processCallAsyncEvent(virConnectPtr conn, struct private_data *priv,
+ int in_open,
+ remote_message_header *hdr,
+ XDR *xdr) {
+ /* An async message has come in while we were waiting for the
+ * response. Process it to pull it off the wire, and try again
+ */
+ DEBUG0("Encountered an event while waiting for a response");
+
+ if (in_open) {
+ DEBUG("Ignoring bogus event %d received while in open", hdr->proc);
+ return;
+ }
+
+ if (hdr->proc == REMOTE_PROC_DOMAIN_EVENT) {
+ remoteDomainQueueEvent(conn, xdr);
+ virEventUpdateTimeout(priv->eventFlushTimer, 0);
+ } else {
+ DEBUG("Unexpected event proc %d", hdr->proc);
+ }
+}
+
+static int
+processCallRecvLen(virConnectPtr conn, struct private_data *priv,
+ int in_open) {
+ XDR xdr;
+ int len;
+
+ xdrmem_create (&xdr, priv->buffer, priv->bufferLength, XDR_DECODE);
if (!xdr_int (&xdr, &len)) {
- error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
+ error (in_open ? NULL : conn,
VIR_ERR_RPC, _("xdr_int (length word, reply)"));
return -1;
}
len -= 4;
if (len < 0 || len > REMOTE_MESSAGE_MAX) {
- error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
+ error (in_open ? NULL : conn,
VIR_ERR_RPC, _("packet received from server too large"));
return -1;
}
- /* Read reply header and what follows (either a ret or an error). */
- if (really_read (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer, len) == -1)
- return -1;
+ /* Extend our declared buffer length and carry
+ on reading the header + payload */
+ priv->bufferLength += len;
+ DEBUG("Got length, now need %d total (%d more)", priv->bufferLength, len);
+ return 0;
+}
+
+
+static int
+processCallRecvMsg(virConnectPtr conn, struct private_data *priv,
+ int in_open) {
+ XDR xdr;
+ struct remote_message_header hdr;
+ int len = priv->bufferLength - 4;
+ struct remote_thread_call *thecall;
/* Deserialise reply header. */
- xdrmem_create (&xdr, buffer, len, XDR_DECODE);
+ xdrmem_create (&xdr, priv->buffer + 4, len, XDR_DECODE);
if (!xdr_remote_message_header (&xdr, &hdr)) {
- error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
+ error (in_open ? NULL : conn,
VIR_ERR_RPC, _("invalid header in reply"));
return -1;
}
/* Check program, version, etc. are what we expect. */
if (hdr.prog != REMOTE_PROGRAM) {
- virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
- NULL, NULL, VIR_FROM_REMOTE,
- VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
- _("unknown program (received %x, expected %x)"),
- hdr.prog, REMOTE_PROGRAM);
+ virRaiseError (in_open ? NULL : conn,
+ NULL, NULL, VIR_FROM_REMOTE,
+ VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
+ _("unknown program (received %x, expected %x)"),
+ hdr.prog, REMOTE_PROGRAM);
return -1;
}
if (hdr.vers != REMOTE_PROTOCOL_VERSION) {
- virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
- NULL, NULL, VIR_FROM_REMOTE,
- VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
- _("unknown protocol version (received %x, expected %x)"),
- hdr.vers, REMOTE_PROTOCOL_VERSION);
+ virRaiseError (in_open ? NULL : conn,
+ NULL, NULL, VIR_FROM_REMOTE,
+ VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
+ _("unknown protocol version (received %x, expected %x)"),
+ hdr.vers, REMOTE_PROTOCOL_VERSION);
return -1;
}
- if (hdr.proc == REMOTE_PROC_DOMAIN_EVENT &&
- hdr.direction == REMOTE_MESSAGE) {
- /* An async message has come in while we were waiting for the
- * response. Process it to pull it off the wire, and try again
- */
- DEBUG0("Encountered an event while waiting for a response");
-
- remoteDomainQueueEvent(conn, &xdr);
- virEventUpdateTimeout(priv->eventFlushTimer, 0);
+ /* Async events from server need special handling */
+ if (hdr.direction == REMOTE_MESSAGE) {
+ processCallAsyncEvent(conn, priv, in_open,
+ &hdr, &xdr);
+ xdr_destroy(&xdr);
+ return 0;
+ }
- DEBUG0("Retrying read");
- xdr_destroy (&xdr);
- goto retry_read;
- }
- if (hdr.proc != proc_nr) {
- virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
- NULL, NULL, VIR_FROM_REMOTE,
- VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
- _("unknown procedure (received %x, expected %x)"),
- hdr.proc, proc_nr);
+ if (hdr.direction != REMOTE_REPLY) {
+ virRaiseError (in_open ? NULL : conn,
+ NULL, NULL, VIR_FROM_REMOTE,
+ VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
+ _("got unexpected RPC call %d from server"),
+ hdr.proc);
+ xdr_destroy(&xdr);
return -1;
}
- if (hdr.direction != REMOTE_REPLY) {
- virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
- NULL, NULL, VIR_FROM_REMOTE,
- VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
- _("unknown direction (received %x, expected %x)"),
- hdr.direction, REMOTE_REPLY);
+
+ /* Ok, definitely got an RPC reply now find
+ out who's been waiting for it */
+
+ thecall = priv->waitDispatch;
+ while (thecall &&
+ thecall->serial != hdr.serial)
+ thecall = thecall->next;
+
+ if (!thecall) {
+ virRaiseError (in_open ? NULL : conn,
+ NULL, NULL, VIR_FROM_REMOTE,
+ VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
+ _("no call waiting for reply with serial %d"),
+ hdr.serial);
+ xdr_destroy(&xdr);
return -1;
}
- if (hdr.serial != serial) {
- virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, NULL, NULL, VIR_FROM_REMOTE,
- VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
- _("unknown serial (received %x, expected %x)"),
- hdr.serial, serial);
+
+ if (hdr.proc != thecall->proc_nr) {
+ virRaiseError (in_open ? NULL : conn,
+ NULL, NULL, VIR_FROM_REMOTE,
+ VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
+ _("unknown procedure (received %x, expected %x)"),
+ hdr.proc, thecall->proc_nr);
+ xdr_destroy (&xdr);
return -1;
}
*/
switch (hdr.status) {
case REMOTE_OK:
- if (!(*ret_filter) (&xdr, ret)) {
- error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC,
+ if (!(*thecall->ret_filter) (&xdr, thecall->ret)) {
+ error (in_open ? NULL : conn, VIR_ERR_RPC,
_("unmarshalling ret"));
return -1;
}
+ thecall->mode = REMOTE_MODE_COMPLETE;
xdr_destroy (&xdr);
return 0;
case REMOTE_ERROR:
- memset (&rerror, 0, sizeof rerror);
- if (!xdr_remote_error (&xdr, &rerror)) {
- error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
+ memset (&thecall->err, 0, sizeof thecall->err);
+ if (!xdr_remote_error (&xdr, &thecall->err)) {
+ error (in_open ? NULL : conn,
VIR_ERR_RPC, _("unmarshalling remote_error"));
return -1;
}
xdr_destroy (&xdr);
- /* See if caller asked us to keep quiet about missing RPCs
- * eg for interop with older servers */
- if (flags & REMOTE_CALL_QUIET_MISSING_RPC &&
- rerror.domain == VIR_FROM_REMOTE &&
- rerror.code == VIR_ERR_RPC &&
- rerror.level == VIR_ERR_ERROR &&
- STRPREFIX(*rerror.message, "unknown procedure")) {
- return -2;
- }
- server_error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, &rerror);
- xdr_free ((xdrproc_t) xdr_remote_error, (char *) &rerror);
- return -1;
+ thecall->mode = REMOTE_MODE_ERROR;
+ return 0;
default:
- virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, NULL, NULL, VIR_FROM_REMOTE,
- VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
- _("unknown status (received %x)"),
- hdr.status);
+ virRaiseError (in_open ? NULL : conn, NULL, NULL, VIR_FROM_REMOTE,
+ VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
+ _("unknown status (received %x)"),
+ hdr.status);
xdr_destroy (&xdr);
return -1;
}
static int
-call (virConnectPtr conn, struct private_data *priv,
- int flags /* if we are in virConnectOpen */,
- int proc_nr,
- xdrproc_t args_filter, char *args,
- xdrproc_t ret_filter, char *ret)
-{
- int rv;
- /*
- * Avoid needless wake-ups of the event loop in the
- * case where this call is being made from a different
- * thread than the event loop. These wake-ups would
- * cause the event loop thread to be blocked on the
- * mutex for the duration of the call
+processCallRecv(virConnectPtr conn, struct private_data *priv,
+ int in_open) {
+ int ret;
+
+ /* Read as much data as is available, until we get
+ * EGAIN
*/
- if (priv->watch >= 0)
- virEventUpdateHandle(priv->watch, 0);
+ for (;;) {
+ ret = processCallRecvSome(conn, priv, in_open);
- rv = doCall(conn, priv,flags, proc_nr,
- args_filter, args,
- ret_filter, ret);
+ if (ret < 0)
+ return -1;
+ if (ret == 0)
+ return 0; /* Blocking on read */
- if (priv->watch >= 0)
- virEventUpdateHandle(priv->watch, VIR_EVENT_HANDLE_READABLE);
- return rv;
+ /* Check for completion of our goal */
+ if (priv->bufferOffset == priv->bufferLength) {
+ if (priv->bufferOffset == 4) {
+ ret = processCallRecvLen(conn, priv, in_open);
+ } else {
+ ret = processCallRecvMsg(conn, priv, in_open);
+ priv->bufferOffset = priv->bufferLength = 0;
+ }
+ if (ret < 0)
+ return -1;
+ }
+ }
}
+/*
+ * Process all calls pending dispatch/receive until we
+ * get a reply to our own call. Then quit and pass the buck
+ * to someone else.
+ */
static int
-really_write_buf (virConnectPtr conn, struct private_data *priv,
- int in_open /* if we are in virConnectOpen */,
- const char *bytes, int len)
+processCalls(virConnectPtr conn,
+ struct private_data *priv,
+ int in_open,
+ struct remote_thread_call *thiscall)
{
- const char *p;
- int err;
+ struct pollfd fds[2];
+ int ret;
- p = bytes;
- if (priv->uses_tls) {
- do {
- err = gnutls_record_send (priv->session, p, len);
- if (err < 0) {
- if (err == GNUTLS_E_INTERRUPTED || err == GNUTLS_E_AGAIN)
- continue;
- error (in_open ? NULL : conn,
- VIR_ERR_GNUTLS_ERROR, gnutls_strerror (err));
+ fds[0].fd = priv->sock;
+ fds[1].fd = priv->wakeupReadFD;
+
+ for (;;) {
+ struct remote_thread_call *tmp = priv->waitDispatch;
+ struct remote_thread_call *prev;
+ char ignore;
+
+ fds[0].events = fds[0].revents = 0;
+ fds[1].events = fds[1].revents = 0;
+
+ fds[1].events = POLLIN;
+ while (tmp) {
+ if (tmp->mode == REMOTE_MODE_WAIT_RX)
+ fds[0].events |= POLLIN;
+ if (tmp->mode == REMOTE_MODE_WAIT_TX)
+ fds[0].events |= POLLOUT;
+
+ tmp = tmp->next;
+ }
+
+ /* Release lock while poll'ing so other threads
+ * can stuff themselves on the queue */
+ remoteDriverUnlock(priv);
+
+ repoll:
+ ret = poll(fds, ARRAY_CARDINALITY(fds), -1);
+ if (ret < 0 && errno == EINTR)
+ goto repoll;
+ remoteDriverLock(priv);
+
+ if (fds[1].revents) {
+ DEBUG0("Woken up from poll by other thread");
+ saferead(priv->wakeupReadFD, &ignore, sizeof(ignore));
+ }
+
+ if (ret < 0) {
+ if (errno == EWOULDBLOCK)
+ continue;
+ errorf (in_open ? NULL : conn, VIR_ERR_INTERNAL_ERROR,
+ _("poll on socket failed %s"), strerror(errno));
+ return -1;
+ }
+
+ if (fds[0].revents & POLLOUT) {
+ if (processCallSend(conn, priv, in_open) < 0)
return -1;
- }
- len -= err;
- p += err;
}
- while (len > 0);
- } else {
- do {
- err = send (priv->sock, p, len, 0);
- if (err == -1) {
- if (errno == EINTR || errno == EAGAIN)
- continue;
- error (in_open ? NULL : conn,
- VIR_ERR_SYSTEM_ERROR, strerror (errno));
+
+ if (fds[0].revents & POLLIN) {
+ if (processCallRecv(conn, priv, in_open) < 0)
return -1;
+ }
+
+ /* Iterate through waiting threads and if
+ * any are complete then tell 'em to wakeup
+ */
+ tmp = priv->waitDispatch;
+ prev = NULL;
+ while (tmp) {
+ if (tmp != thiscall &&
+ (tmp->mode == REMOTE_MODE_COMPLETE ||
+ tmp->mode == REMOTE_MODE_ERROR)) {
+ /* Take them out of the list */
+ if (prev)
+ prev->next = tmp->next;
+ else
+ priv->waitDispatch = tmp->next;
+
+ /* And wake them up....
+ * ...they won't actually wakeup until
+ * we release our mutex a short while
+ * later...
+ */
+ DEBUG("Waking up sleep %d %p %p", tmp->proc_nr, tmp, priv->waitDispatch);
+ virCondSignal(&tmp->cond);
}
- len -= err;
- p += err;
+ prev = tmp;
+ tmp = tmp->next;
}
- while (len > 0);
- }
- return 0;
-}
+ /* Now see if *we* are done */
+ if (thiscall->mode == REMOTE_MODE_COMPLETE ||
+ thiscall->mode == REMOTE_MODE_ERROR) {
+ /* We're at head of the list already, so
+ * remove us
+ */
+ priv->waitDispatch = thiscall->next;
+ DEBUG("Giving up the buck %d %p %p", thiscall->proc_nr, thiscall, priv->waitDispatch);
+ /* See if someone else is still waiting
+ * and if so, then pass the buck ! */
+ if (priv->waitDispatch) {
+ DEBUG("Passing the buck to %d %p", priv->waitDispatch->proc_nr, priv->waitDispatch);
+ virCondSignal(&priv->waitDispatch->cond);
+ }
+ return 0;
+ }
-static int
-really_write_plain (virConnectPtr conn, struct private_data *priv,
- int in_open /* if we are in virConnectOpen */,
- char *bytes, int len)
-{
- return really_write_buf(conn, priv, in_open, bytes, len);
+
+ if (fds[0].revents & (POLLHUP | POLLERR)) {
+ errorf(in_open ? NULL : conn, VIR_ERR_INTERNAL_ERROR,
+ "%s", _("received hangup / error event on socket"));
+ return -1;
+ }
+ }
}
-#if HAVE_SASL
+/*
+ * This function performs a remote procedure call to procedure PROC_NR.
+ *
+ * NB. This does not free the args structure (not desirable, since you
+ * often want this allocated on the stack or else it contains strings
+ * which come from the user). It does however free any intermediate
+ * results, eg. the error structure if there is one.
+ *
+ * NB(2). Make sure to memset (&ret, 0, sizeof ret) before calling,
+ * else Bad Things will happen in the XDR code.
+ *
+ * NB(3) You must have the private_data lock before calling this
+ *
+ * NB(4) This is very complicated. Due to connection cloning, multiple
+ * threads can want to use the socket at once. Obviously only one of
+ * them can. So if someone's using the socket, other threads are put
+ * to sleep on condition variables. THe existing thread may completely
+ * send & receive their RPC call/reply while they're asleep. Or it
+ * may only get around to dealing with sending the call. Or it may
+ * get around to neither. So upon waking up from slumber, the other
+ * thread may or may not have more work todo.
+ *
+ * We call this dance 'passing the buck'
+ *
+ * http://en.wikipedia.org/wiki/Passing_the_buck
+ *
+ * "Buck passing or passing the buck is the action of transferring
+ * responsibility or blame unto another person. It is also used as
+ * a strategy in power politics when the actions of one country/
+ * nation are blamed on another, providing an opportunity for war."
+ *
+ * NB(5) Don't Panic!
+ */
static int
-really_write_sasl (virConnectPtr conn, struct private_data *priv,
- int in_open /* if we are in virConnectOpen */,
- char *bytes, int len)
+call (virConnectPtr conn, struct private_data *priv,
+ int flags /* if we are in virConnectOpen */,
+ int proc_nr,
+ xdrproc_t args_filter, char *args,
+ xdrproc_t ret_filter, char *ret)
{
- const char *output;
- unsigned int outputlen;
- int err;
+ int rv;
+ struct remote_thread_call *thiscall;
- err = sasl_encode(priv->saslconn, bytes, len, &output, &outputlen);
- if (err != SASL_OK) {
+ DEBUG("Doing call %d %p", proc_nr, priv->waitDispatch);
+ thiscall = prepareCall(conn, priv, flags, proc_nr,
+ args_filter, args,
+ ret_filter, ret);
+
+ if (!thiscall) {
+ error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
+ VIR_ERR_NO_MEMORY, NULL);
return -1;
}
- return really_write_buf(conn, priv, in_open, output, outputlen);
-}
-#endif
-
-static int
-really_write (virConnectPtr conn, struct private_data *priv,
- int in_open /* if we are in virConnectOpen */,
- char *bytes, int len)
-{
-#if HAVE_SASL
- if (priv->saslconn)
- return really_write_sasl(conn, priv, in_open, bytes, len);
- else
-#endif
- return really_write_plain(conn, priv, in_open, bytes, len);
-}
+ /* Check to see if another thread is dispatching */
+ if (priv->waitDispatch) {
+ /* Stick ourselves on the end of the wait queue */
+ struct remote_thread_call *tmp = priv->waitDispatch;
+ char ignore = 1;
+ while (tmp && tmp->next)
+ tmp = tmp->next;
+ if (tmp)
+ tmp->next = thiscall;
+ else
+ priv->waitDispatch = thiscall;
-static int
-really_read_buf (virConnectPtr conn, struct private_data *priv,
- int in_open /* if we are in virConnectOpen */,
- char *bytes, int len)
-{
- int err;
+ /* Force other thread to wakup from poll */
+ safewrite(priv->wakeupSendFD, &ignore, sizeof(ignore));
- if (priv->uses_tls) {
- tlsreread:
- err = gnutls_record_recv (priv->session, bytes, len);
- if (err < 0) {
- if (err == GNUTLS_E_INTERRUPTED)
- goto tlsreread;
- error (in_open ? NULL : conn,
- VIR_ERR_GNUTLS_ERROR, gnutls_strerror (err));
+ DEBUG("Going to sleep %d %p %p", proc_nr, priv->waitDispatch, thiscall);
+ /* Go to sleep while other thread is working... */
+ if (virCondWait(&thiscall->cond, &priv->lock) < 0) {
+ if (priv->waitDispatch == thiscall) {
+ priv->waitDispatch = thiscall->next;
+ } else {
+ tmp = priv->waitDispatch;
+ while (tmp && tmp->next &&
+ tmp->next != thiscall) {
+ tmp = tmp->next;
+ }
+ if (tmp && tmp->next == thiscall)
+ tmp->next = thiscall->next;
+ }
+ errorf(flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
+ VIR_ERR_INTERNAL_ERROR, "%s",
+ _("failed to wait on condition"));
+ VIR_FREE(thiscall);
return -1;
}
- if (err == 0) {
- error (in_open ? NULL : conn,
- VIR_ERR_RPC, _("socket closed unexpectedly"));
- return -1;
+
+ DEBUG("Wokeup from sleep %d %p %p", proc_nr, priv->waitDispatch, thiscall);
+ /* Two reasons we can be woken up
+ * 1. Other thread has got our reply ready for us
+ * 2. Other thread is all done, and it is our turn to
+ * be the dispatcher to finish waiting for
+ * our reply
+ */
+ if (thiscall->mode == REMOTE_MODE_COMPLETE ||
+ thiscall->mode == REMOTE_MODE_ERROR) {
+ /*
+ * We avoided catching the buck and our reply is ready !
+ * We've already had 'thiscall' removed from the list
+ * so just need to (maybe) handle errors & free it
+ */
+ goto cleanup;
}
+
+ /* Grr, someone passed the buck onto us ... */
+
} else {
- reread:
- err = recv (priv->sock, bytes, len, 0);
- if (err == -1) {
- if (errno == EINTR)
- goto reread;
- error (in_open ? NULL : conn,
- VIR_ERR_SYSTEM_ERROR, strerror (errno));
- return -1;
- }
- if (err == 0) {
- error (in_open ? NULL : conn,
- VIR_ERR_RPC, _("socket closed unexpectedly"));
- return -1;
+ /* We're first to catch the buck */
+ priv->waitDispatch = thiscall;
+ }
+
+ DEBUG("We have the buck %d %p %p", proc_nr, priv->waitDispatch, thiscall);
+ /*
+ * The buck stops here!
+ *
+ * At this point we're about to own the dispatch
+ * process...
+ */
+
+ /*
+ * Avoid needless wake-ups of the event loop in the
+ * case where this call is being made from a different
+ * thread than the event loop. These wake-ups would
+ * cause the event loop thread to be blocked on the
+ * mutex for the duration of the call
+ */
+ if (priv->watch >= 0)
+ virEventUpdateHandle(priv->watch, 0);
+
+ rv = processCalls(conn, priv,
+ flags & REMOTE_CALL_IN_OPEN ? 1 : 0,
+ thiscall);
+
+ if (priv->watch >= 0)
+ virEventUpdateHandle(priv->watch, VIR_EVENT_HANDLE_READABLE);
+
+ if (rv < 0) {
+ VIR_FREE(thiscall);
+ return -1;
+ }
+
+cleanup:
+ DEBUG("All done with our call %d %p %p", proc_nr, priv->waitDispatch, thiscall);
+ if (thiscall->mode == REMOTE_MODE_ERROR) {
+ /* See if caller asked us to keep quiet about missing RPCs
+ * eg for interop with older servers */
+ if (flags & REMOTE_CALL_QUIET_MISSING_RPC &&
+ thiscall->err.domain == VIR_FROM_REMOTE &&
+ thiscall->err.code == VIR_ERR_RPC &&
+ thiscall->err.level == VIR_ERR_ERROR &&
+ STRPREFIX(*thiscall->err.message, "unknown procedure")) {
+ rv = -2;
+ } else {
+ server_error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
+ &thiscall->err);
+ rv = -1;
}
+ } else {
+ rv = 0;
}
+ VIR_FREE(thiscall);
+ return rv;
+}
- return err;
+/**
+ * remoteDomainReadEvent
+ *
+ * Read the event data off the wire
+ */
+static virDomainEventPtr
+remoteDomainReadEvent(virConnectPtr conn, XDR *xdr)
+{
+ remote_domain_event_ret ret;
+ virDomainPtr dom;
+ virDomainEventPtr event = NULL;
+ memset (&ret, 0, sizeof ret);
+
+ /* unmarshall parameters, and process it*/
+ if (! xdr_remote_domain_event_ret(xdr, &ret) ) {
+ error (conn, VIR_ERR_RPC,
+ _("remoteDomainProcessEvent: unmarshalling ret"));
+ return NULL;
+ }
+
+ dom = get_nonnull_domain(conn,ret.dom);
+ if (!dom)
+ return NULL;
+
+ event = virDomainEventNewFromDom(dom, ret.event, ret.detail);
+
+ virDomainFree(dom);
+ return event;
}
-static int
-really_read_plain (virConnectPtr conn, struct private_data *priv,
- int in_open /* if we are in virConnectOpen */,
- char *bytes, int len)
+static void
+remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr)
{
- do {
- int ret = really_read_buf (conn, priv, in_open, bytes, len);
- if (ret < 0)
- return -1;
+ struct private_data *priv = conn->privateData;
+ virDomainEventPtr event;
- len -= ret;
- bytes += ret;
- } while (len > 0);
+ event = remoteDomainReadEvent(conn, xdr);
+ if (!event)
+ return;
- return 0;
+ if (virDomainEventQueuePush(priv->domainEvents,
+ event) < 0) {
+ DEBUG0("Error adding event to queue");
+ virDomainEventFree(event);
+ }
}
-#if HAVE_SASL
-static int
-really_read_sasl (virConnectPtr conn, struct private_data *priv,
- int in_open /* if we are in virConnectOpen */,
- char *bytes, int len)
+/** remoteDomainEventFired:
+ *
+ * The callback for monitoring the remote socket
+ * for event data
+ */
+void
+remoteDomainEventFired(int watch,
+ int fd,
+ int event,
+ void *opaque)
{
- do {
- int want, got;
- if (priv->saslDecoded == NULL) {
- char encoded[8192];
- int encodedLen = sizeof(encoded);
- int err, ret;
- ret = really_read_buf (conn, priv, in_open, encoded, encodedLen);
- if (ret < 0)
- return -1;
+ virConnectPtr conn = opaque;
+ struct private_data *priv = conn->privateData;
- err = sasl_decode(priv->saslconn, encoded, ret,
- &priv->saslDecoded, &priv->saslDecodedLength);
- }
+ remoteDriverLock(priv);
- got = priv->saslDecodedLength - priv->saslDecodedOffset;
- want = len;
- if (want > got)
- want = got;
+ /* This should be impossible, but it doesn't hurt to check */
+ if (priv->waitDispatch)
+ goto done;
- memcpy(bytes, priv->saslDecoded + priv->saslDecodedOffset, want);
- priv->saslDecodedOffset += want;
- if (priv->saslDecodedOffset == priv->saslDecodedLength) {
- priv->saslDecoded = NULL;
- priv->saslDecodedOffset = priv->saslDecodedLength = 0;
- }
- bytes += want;
- len -= want;
- } while (len > 0);
+ DEBUG("Event fired %d %d %d %X", watch, fd, event, event);
- return 0;
+ if (event & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) {
+ DEBUG("%s : VIR_EVENT_HANDLE_HANGUP or "
+ "VIR_EVENT_HANDLE_ERROR encountered", __FUNCTION__);
+ virEventRemoveHandle(watch);
+ priv->watch = -1;
+ goto done;
+ }
+
+ if (fd != priv->sock) {
+ virEventRemoveHandle(watch);
+ priv->watch = -1;
+ goto done;
+ }
+
+ if (processCallRecv(conn, priv, 0) < 0)
+ DEBUG0("Something went wrong during async message processing");
+
+done:
+ remoteDriverUnlock(priv);
}
-#endif
-static int
-really_read (virConnectPtr conn, struct private_data *priv,
- int in_open /* if we are in virConnectOpen */,
- char *bytes, int len)
+void
+remoteDomainEventQueueFlush(int timer ATTRIBUTE_UNUSED, void *opaque)
{
-#if HAVE_SASL
- if (priv->saslconn)
- return really_read_sasl (conn, priv, in_open, bytes, len);
- else
-#endif
- return really_read_plain (conn, priv, in_open, bytes, len);
+ virConnectPtr conn = opaque;
+ struct private_data *priv = conn->privateData;
+
+ remoteDriverLock(priv);
+
+ virDomainEventQueueDispatch(priv->domainEvents, priv->callbackList,
+ virDomainEventDispatchDefaultFunc, NULL);
+ virEventUpdateTimeout(priv->eventFlushTimer, -1);
+
+ remoteDriverUnlock(priv);
}
+
/* For errors internal to this library. */
static void
error (virConnectPtr conn, virErrorNumber code, const char *info)
return 0;
}
-/**
- * remoteDomainReadEvent
- *
- * Read the event data off the wire
- */
-static virDomainEventPtr
-remoteDomainReadEvent(virConnectPtr conn, XDR *xdr)
-{
- remote_domain_event_ret ret;
- virDomainPtr dom;
- virDomainEventPtr event = NULL;
- memset (&ret, 0, sizeof ret);
-
- /* unmarshall parameters, and process it*/
- if (! xdr_remote_domain_event_ret(xdr, &ret) ) {
- error (conn, VIR_ERR_RPC,
- _("remoteDomainProcessEvent: unmarshalling ret"));
- return NULL;
- }
-
- dom = get_nonnull_domain(conn,ret.dom);
- if (!dom)
- return NULL;
-
- event = virDomainEventNewFromDom(dom, ret.event, ret.detail);
-
- virDomainFree(dom);
- return event;
-}
-
-static void
-remoteDomainProcessEvent(virConnectPtr conn, XDR *xdr)
-{
- struct private_data *priv = conn->privateData;
- virDomainEventPtr event;
-
- event = remoteDomainReadEvent(conn, xdr);
- if (!event)
- return;
-
- DEBUG0("Calling domain event callbacks (no queue)");
- virDomainEventDispatch(event, priv->callbackList,
- virDomainEventDispatchDefaultFunc, NULL);
- virDomainEventFree(event);
-}
-
-static void
-remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr)
-{
- struct private_data *priv = conn->privateData;
- virDomainEventPtr event;
-
- event = remoteDomainReadEvent(conn, xdr);
- if (!event)
- return;
-
- if (virDomainEventQueuePush(priv->domainEvents,
- event) < 0) {
- DEBUG0("Error adding event to queue");
- virDomainEventFree(event);
- }
-}
-
-/** remoteDomainEventFired:
- *
- * The callback for monitoring the remote socket
- * for event data
- */
-void
-remoteDomainEventFired(int watch,
- int fd,
- int event,
- void *opaque)
-{
- char buffer[REMOTE_MESSAGE_MAX];
- char buffer2[4];
- struct remote_message_header hdr;
- XDR xdr;
- int len;
-
- virConnectPtr conn = opaque;
- struct private_data *priv = conn->privateData;
-
- remoteDriverLock(priv);
-
- DEBUG("Event fired %d %d %d %X", watch, fd, event, event);
-
- if (event & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) {
- DEBUG("%s : VIR_EVENT_HANDLE_HANGUP or "
- "VIR_EVENT_HANDLE_ERROR encountered", __FUNCTION__);
- virEventRemoveHandle(watch);
- goto done;
- }
-
- if (fd != priv->sock) {
- virEventRemoveHandle(watch);
- goto done;
- }
-
- /* Read and deserialise length word. */
- if (really_read (conn, priv, 0, buffer2, sizeof buffer2) == -1)
- goto done;
-
- xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_DECODE);
- if (!xdr_int (&xdr, &len)) {
- error (conn, VIR_ERR_RPC, _("xdr_int (length word, reply)"));
- goto done;
- }
- xdr_destroy (&xdr);
-
- /* Length includes length word - adjust to real length to read. */
- len -= 4;
-
- if (len < 0 || len > REMOTE_MESSAGE_MAX) {
- error (conn, VIR_ERR_RPC, _("packet received from server too large"));
- goto done;
- }
-
- /* Read reply header and what follows (either a ret or an error). */
- if (really_read (conn, priv, 0, buffer, len) == -1) {
- error (conn, VIR_ERR_RPC, _("error reading buffer from memory"));
- goto done;
- }
-
- /* Deserialise reply header. */
- xdrmem_create (&xdr, buffer, len, XDR_DECODE);
- if (!xdr_remote_message_header (&xdr, &hdr)) {
- error (conn, VIR_ERR_RPC, _("invalid header in event firing"));
- goto done;
- }
-
- if (hdr.proc == REMOTE_PROC_DOMAIN_EVENT &&
- hdr.direction == REMOTE_MESSAGE) {
- DEBUG0("Encountered an async event");
- remoteDomainProcessEvent(conn, &xdr);
- } else {
- DEBUG0("invalid proc in event firing");
- error (conn, VIR_ERR_RPC, _("invalid proc in event firing"));
- }
-
-done:
- remoteDriverUnlock(priv);
-}
-
-void
-remoteDomainEventQueueFlush(int timer ATTRIBUTE_UNUSED, void *opaque)
-{
- virConnectPtr conn = opaque;
- struct private_data *priv = conn->privateData;
-
- remoteDriverLock(priv);
-
- virDomainEventQueueDispatch(priv->domainEvents, priv->callbackList,
- virDomainEventDispatchDefaultFunc, NULL);
- virEventUpdateTimeout(priv->eventFlushTimer, -1);
-
- remoteDriverUnlock(priv);
-}