libvirtd.c libvirtd.h \
remote.c remote.h \
dispatch.c dispatch.h \
+ stream.c stream.h \
remote_dispatch_prototypes.h \
remote_dispatch_table.h \
remote_dispatch_args.h \
{
remoteDispatchStringError(rerr,
VIR_ERR_NO_MEMORY,
- NULL);
+ "out of memory");
}
unsigned int len;
struct qemud_client_message *msg = NULL;
+ DEBUG("prog=%d ver=%d proc=%d type=%d serial=%d, msg=%s",
+ program, version, procedure, type, serial,
+ rerr->message ? *rerr->message : "(none)");
+
if (VIR_ALLOC(msg) < 0)
goto fatal_error;
*
* Returns 0 if the error was sent, -1 upon fatal error
*/
-static int
+int
remoteSerializeReplyError(struct qemud_client *client,
remote_error *rerr,
remote_message_header *req) {
+ /*
+ * For data streams, errors are sent back as data streams
+ * For method calls, errors are sent back as method replies
+ */
return remoteSerializeError(client,
rerr,
req->prog,
req->vers,
req->proc,
- REMOTE_REPLY,
+ req->type == REMOTE_STREAM ? REMOTE_STREAM : REMOTE_REPLY,
req->serial);
}
+int
+remoteSerializeStreamError(struct qemud_client *client,
+ remote_error *rerr,
+ int proc,
+ int serial)
+{
+ return remoteSerializeError(client,
+ rerr,
+ REMOTE_PROGRAM,
+ REMOTE_PROTOCOL_VERSION,
+ proc,
+ REMOTE_STREAM,
+ serial);
+}
+
/*
* @msg: the complete incoming message, whose header to decode
*
{
remote_error rerr;
+ DEBUG("prog=%d ver=%d type=%d satus=%d serial=%d proc=%d",
+ msg->hdr.prog, msg->hdr.vers, msg->hdr.type,
+ msg->hdr.status, msg->hdr.serial, msg->hdr.proc);
+
memset(&rerr, 0, sizeof rerr);
/* Check version, etc. */
case REMOTE_CALL:
return remoteDispatchClientCall(server, client, msg);
+ case REMOTE_STREAM:
+ /* Since stream data is non-acked, async, we may continue to received
+ * stream packets after we closed down a stream. Just drop & ignore
+ * these.
+ */
+ VIR_INFO("Ignoring unexpected stream data serial=%d proc=%d status=%d",
+ msg->hdr.serial, msg->hdr.proc, msg->hdr.status);
+ qemudClientMessageRelease(client, msg);
+ break;
+
default:
remoteDispatchFormatError (&rerr, _("type (%d) != REMOTE_CALL"),
(int) msg->hdr.type);
+ goto error;
}
+ return 0;
+
error:
return remoteSerializeReplyError(client, &rerr, &msg->hdr);
}
fatal_error:
return -1;
}
+
+
+int
+remoteSendStreamData(struct qemud_client *client,
+ struct qemud_client_stream *stream,
+ const char *data,
+ size_t len)
+{
+ struct qemud_client_message *msg;
+ XDR xdr;
+
+ DEBUG("client=%p stream=%p data=%p len=%d", client, stream, data, len);
+
+ if (VIR_ALLOC(msg) < 0) {
+ return -1;
+ }
+
+ /* Return header. We're re-using same message object, so
+ * only need to tweak type/status fields */
+ msg->hdr.prog = REMOTE_PROGRAM;
+ msg->hdr.vers = REMOTE_PROTOCOL_VERSION;
+ msg->hdr.proc = stream->procedure;
+ msg->hdr.type = REMOTE_STREAM;
+ msg->hdr.serial = stream->serial;
+ /*
+ * NB
+ * data != NULL + len > 0 => REMOTE_CONTINUE (Sending back data)
+ * data != NULL + len == 0 => REMOTE_CONTINUE (Sending read EOF)
+ * data == NULL => REMOTE_OK (Sending finish handshake confirmation)
+ */
+ msg->hdr.status = data ? REMOTE_CONTINUE : REMOTE_OK;
+
+ if (remoteEncodeClientMessageHeader(msg) < 0)
+ goto fatal_error;
+
+ if (data && len) {
+ if ((msg->bufferLength - msg->bufferOffset) < len)
+ goto fatal_error;
+
+ /* Now for the payload */
+ xdrmem_create (&xdr,
+ msg->buffer,
+ msg->bufferLength,
+ XDR_ENCODE);
+
+ /* Skip over existing header already written */
+ if (xdr_setpos(&xdr, msg->bufferOffset) == 0)
+ goto xdr_error;
+
+ memcpy(msg->buffer + msg->bufferOffset, data, len);
+ msg->bufferOffset += len;
+
+ /* Update the length word. */
+ len = msg->bufferOffset;
+ if (xdr_setpos (&xdr, 0) == 0)
+ goto xdr_error;
+
+ if (!xdr_u_int (&xdr, &len))
+ goto xdr_error;
+
+ xdr_destroy (&xdr);
+
+ DEBUG("Total %d", msg->bufferOffset);
+ }
+
+ /* Reset ready for I/O */
+ msg->bufferLength = msg->bufferOffset;
+ msg->bufferOffset = 0;
+
+ /* Put reply on end of tx queue to send out */
+ qemudClientMessageQueuePush(&client->tx, msg);
+ qemudUpdateClientEvent(client);
+
+ return 0;
+
+xdr_error:
+ xdr_destroy (&xdr);
+fatal_error:
+ VIR_FREE(msg);
+ return -1;
+}
void remoteDispatchConnError (remote_error *rerr,
virConnectPtr conn);
+
+int
+remoteSerializeReplyError(struct qemud_client *client,
+ remote_error *rerr,
+ remote_message_header *req);
+int
+remoteSerializeStreamError(struct qemud_client *client,
+ remote_error *rerr,
+ int proc,
+ int serial);
+
/* Having this here is dubious. It should be in remote.h
* but qemud.c shouldn't depend on that header directly.
* Refactor this later to deal with this properly.
void *opaque);
+int
+remoteSendStreamData(struct qemud_client *client,
+ struct qemud_client_stream *stream,
+ const char *data,
+ size_t len);
+
#endif /* __LIBVIRTD_DISPATCH_H__ */
#include "conf.h"
#include "event.h"
#include "memory.h"
+#include "stream.h"
#ifdef HAVE_AVAHI
#include "mdns.h"
#endif
/* Check if any filters match this message */
filter = client->filters;
while (filter) {
- if ((filter->query)(msg, filter->opaque)) {
- qemudClientMessageQueuePush(&filter->dx, msg);
+ int ret;
+ ret = (filter->query)(client, msg, filter->opaque);
+ if (ret == 1) {
msg = NULL;
break;
+ } else if (ret == -1) {
+ VIR_FREE(msg);
+ qemudDispatchClientFailure(client);
+ return;
}
filter = filter->next;
}
}
+void
+qemudClientMessageRelease(struct qemud_client *client,
+ struct qemud_client_message *msg)
+{
+ if (!msg->async)
+ client->nrequests--;
+
+ /* See if the recv queue is currently throttled */
+ if (!client->rx &&
+ client->nrequests < max_client_requests) {
+ /* Reset message record for next RX attempt */
+ memset(msg, 0, sizeof(*msg));
+ client->rx = msg;
+ /* Get ready to receive next message */
+ client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
+ } else {
+ VIR_FREE(msg);
+ }
+
+ qemudUpdateClientEvent(client);
+}
+
+
/*
* Process all queued client->tx messages until
* we would block on I/O
/* Get finished reply from head of tx queue */
reply = qemudClientMessageQueueServe(&client->tx);
- /* If its not an async message, then we have
- * just completed an RPC request */
- if (!reply->async)
- client->nrequests--;
-
- /* Move record to end of 'rx' ist */
- if (!client->rx &&
- client->nrequests < max_client_requests) {
- /* Reset message record for next RX attempt */
- client->rx = reply;
- client->rx->bufferOffset = 0;
- client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
- } else {
- VIR_FREE(reply);
- }
+ qemudClientMessageRelease(client, reply);
if (client->closing)
qemudDispatchClientFailure(client);
- else
- qemudUpdateClientEvent(client);
}
}
}
VIR_FREE(msg);
}
+ while (client->streams)
+ remoteRemoveClientStream(client, client->streams);
+
if (client->conn)
virConnectClose(client->conn);
virMutexDestroy(&client->lock);
unsigned int bufferLength;
unsigned int bufferOffset;
- int async : 1;
+ unsigned int async : 1;
remote_message_header hdr;
struct qemud_client_message *next;
};
+struct qemud_client;
+
/* Allow for filtering of incoming messages to a custom
* dispatch processing queue, instead of client->dx.
*/
-typedef int (*qemud_client_filter_func)(struct qemud_client_message *msg, void *opaque);
+typedef int (*qemud_client_filter_func)(struct qemud_client *client,
+ struct qemud_client_message *msg, void *opaque);
struct qemud_client_filter {
qemud_client_filter_func query;
void *opaque;
- struct qemud_client_message *dx;
-
struct qemud_client_filter *next;
};
+struct qemud_client_stream {
+ virStreamPtr st;
+ int procedure;
+ int serial;
+
+ unsigned int recvEOF : 1;
+ unsigned int closed : 1;
+
+ struct qemud_client_filter filter;
+
+ struct qemud_client_message *rx;
+ int tx;
+
+ struct qemud_client_stream *next;
+};
+
/* Stores the per-client connection state */
struct qemud_client {
virMutex lock;
* end up on the 'dx' queue */
struct qemud_client_filter *filters;
+ /* Data streams */
+ struct qemud_client_stream *streams;
+
+
/* This is only valid if a remote open call has been made on this
* connection, otherwise it will be NULL. Also if remote close is
* called, it will be set back to NULL if that succeeds.
struct qemud_client_message *
qemudClientMessageQueueServe(struct qemud_client_message **queue);
+void
+qemudClientMessageRelease(struct qemud_client *client,
+ struct qemud_client_message *msg);
#if HAVE_POLKIT
--- /dev/null
+/*
+ * stream.c: APIs for managing client streams
+ *
+ * Copyright (C) 2009 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * Author: Daniel P. Berrange <berrange@redhat.com>
+ */
+
+
+#include <config.h>
+
+#include "stream.h"
+#include "memory.h"
+#include "dispatch.h"
+#include "logging.h"
+
+
+/*
+ * @client: a locked client object
+ *
+ * Invoked by the main loop when filtering incoming messages.
+ *
+ * Returns 1 if the message was processed, 0 if skipped,
+ * -1 on fatal client error
+ */
+static int
+remoteStreamFilter(struct qemud_client *client ATTRIBUTE_UNUSED,
+ struct qemud_client_message *msg ATTRIBUTE_UNUSED,
+ void *opaque ATTRIBUTE_UNUSED)
+{
+ return 0;
+}
+
+
+/*
+ * @conn: a connection object to associate the stream with
+ * @hdr: the method call to associate with the stram
+ *
+ * Creates a new stream for this conn
+ *
+ * Returns a new stream object, or NULL upon OOM
+ */
+struct qemud_client_stream *
+remoteCreateClientStream(virConnectPtr conn,
+ remote_message_header *hdr)
+{
+ struct qemud_client_stream *stream;
+
+ DEBUG("proc=%d serial=%d", hdr->proc, hdr->serial);
+
+ if (VIR_ALLOC(stream) < 0)
+ return NULL;
+
+ stream->procedure = hdr->proc;
+ stream->serial = hdr->serial;
+
+ stream->st = virStreamNew(conn, VIR_STREAM_NONBLOCK);
+ if (!stream->st) {
+ VIR_FREE(stream);
+ return NULL;
+ }
+
+ stream->filter.query = remoteStreamFilter;
+ stream->filter.opaque = stream;
+
+ return stream;
+}
+
+/*
+ * @stream: an unused client stream
+ *
+ * Frees the memory associated with this inactive client
+ * stream
+ */
+void remoteFreeClientStream(struct qemud_client *client,
+ struct qemud_client_stream *stream)
+{
+ struct qemud_client_message *msg;
+
+ if (!stream)
+ return;
+
+ DEBUG("proc=%d serial=%d", stream->procedure, stream->serial);
+
+ msg = stream->rx;
+ while (msg) {
+ struct qemud_client_message *tmp = msg->next;
+ qemudClientMessageRelease(client, msg);
+ msg = tmp;
+ }
+
+ virStreamFree(stream->st);
+ VIR_FREE(stream);
+}
+
+
+/*
+ * @client: a locked client to add the stream to
+ * @stream: a stream to add
+ */
+int remoteAddClientStream(struct qemud_client *client,
+ struct qemud_client_stream *stream)
+{
+ struct qemud_client_stream *tmp = client->streams;
+
+ DEBUG("client=%p proc=%d serial=%d", client, stream->procedure, stream->serial);
+
+ if (tmp) {
+ while (tmp->next)
+ tmp = tmp->next;
+ tmp->next = stream;
+ } else {
+ client->streams = stream;
+ }
+
+ stream->filter.next = client->filters;
+ client->filters = &stream->filter;
+
+ stream->tx = 1;
+
+ return 0;
+}
+
+
+/*
+ * @client: a locked client object
+ * @procedure: procedure associated with the stream
+ * @serial: serial number associated with the stream
+ *
+ * Finds a existing active stream
+ *
+ * Returns a stream object matching the procedure+serial number, or NULL
+ */
+struct qemud_client_stream *
+remoteFindClientStream(struct qemud_client *client,
+ virStreamPtr st)
+{
+ struct qemud_client_stream *stream = client->streams;
+
+ while (stream) {
+ if (stream->st == st)
+ return stream;
+ stream = stream->next;
+ }
+
+ return NULL;
+}
+
+
+/*
+ * @client: a locked client object
+ * @stream: an inactive, closed stream object
+ *
+ * Removes a stream from the list of active streams for the client
+ *
+ * Returns 0 if the stream was removd, -1 if it doesn't exist
+ */
+int
+remoteRemoveClientStream(struct qemud_client *client,
+ struct qemud_client_stream *stream)
+{
+ DEBUG("client=%p proc=%d serial=%d", client, stream->procedure, stream->serial);
+
+ struct qemud_client_stream *curr = client->streams;
+ struct qemud_client_stream *prev = NULL;
+ struct qemud_client_filter *filter = NULL;
+
+ if (client->filters == &stream->filter) {
+ client->filters = client->filters->next;
+ } else {
+ filter = client->filters;
+ while (filter) {
+ if (filter->next == &stream->filter) {
+ filter->next = filter->next->next;
+ break;
+ }
+ }
+ }
+
+ if (!stream->closed)
+ virStreamAbort(stream->st);
+
+ while (curr) {
+ if (curr == stream) {
+ if (prev)
+ prev->next = curr->next;
+ else
+ client->streams = curr->next;
+ remoteFreeClientStream(client, stream);
+ return 0;
+ }
+ prev = curr;
+ curr = curr->next;
+ }
+ return -1;
+}
--- /dev/null
+/*
+ * stream.h: APIs for managing client streams
+ *
+ * Copyright (C) 2009 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * Author: Daniel P. Berrange <berrange@redhat.com>
+ */
+
+
+#ifndef __LIBVIRTD_STREAM_H__
+#define __LIBVIRTD_STREAM_H__
+
+#include "libvirtd.h"
+
+
+
+struct qemud_client_stream *
+remoteCreateClientStream(virConnectPtr conn,
+ remote_message_header *hdr);
+
+void remoteFreeClientStream(struct qemud_client *client,
+ struct qemud_client_stream *stream);
+
+int remoteAddClientStream(struct qemud_client *client,
+ struct qemud_client_stream *stream);
+
+struct qemud_client_stream *
+remoteFindClientStream(struct qemud_client *client,
+ virStreamPtr stream);
+
+int
+remoteRemoveClientStream(struct qemud_client *client,
+ struct qemud_client_stream *stream);
+
+#endif /* __LIBVIRTD_STREAM_H__ */
#include "internal.h"
#include <arpa/inet.h>
#define REMOTE_MESSAGE_MAX 262144
+#define REMOTE_MESSAGE_HEADER_MAX 24
+#define REMOTE_MESSAGE_PAYLOAD_MAX 262120
#define REMOTE_STRING_MAX 65536
typedef char *remote_nonnull_string;
REMOTE_CALL = 0,
REMOTE_REPLY = 1,
REMOTE_MESSAGE = 2,
+ REMOTE_STREAM = 3,
};
typedef enum remote_message_type remote_message_type;
enum remote_message_status {
REMOTE_OK = 0,
REMOTE_ERROR = 1,
+ REMOTE_CONTINUE = 2,
};
typedef enum remote_message_status remote_message_status;
#define REMOTE_MESSAGE_HEADER_XDR_LEN 4
/* Maximum total message size (serialised). */
const REMOTE_MESSAGE_MAX = 262144;
+/* Size of struct remote_message_header (serialized)*/
+const REMOTE_MESSAGE_HEADER_MAX = 24;
+
+/* Size of message payload */
+const REMOTE_MESSAGE_PAYLOAD_MAX = 262120;
+
/* Length of long, but not unbounded, strings.
* This is an arbitrary limit designed to stop the decoder from trying
* to allocate unbounded amounts of memory when fed with a bad message.
* * serial matches that from the corresponding REMOTE_CALL
*
* - type == REMOTE_MESSAGE
- * * serial matches that from the corresponding REMOTE_CALL, or zero
+ * * serial is always zero
+ *
+ * - type == REMOTE_STREAM
+ * * serial matches that from the corresponding REMOTE_CALL
*
+ * and the 'status' field varies according to:
+ *
+ * - type == REMOTE_CALL
+ * * REMOTE_OK always
+ *
+ * - type == REMOTE_REPLY
+ * * REMOTE_OK if RPC finished successfully
+ * * REMOTE_ERROR if something failed
+ *
+ * - type == REMOTE_MESSAGE
+ * * REMOTE_OK always
+ *
+ * - type == REMOTE_STREAM
+ * * REMOTE_CONTINUE if more data is following
+ * * REMOTE_OK if stream is complete
+ * * REMOTE_ERROR if stream had an error
*
* Payload varies according to type and status:
*
* * status == REMOTE_ERROR
* remote_error Error information
*
+ * - type == REMOTE_STREAM
+ * * status == REMOTE_CONTINUE
+ * byte[] raw stream data
+ * * status == REMOTE_ERROR
+ * remote_error error information
+ * * status == REMOTE_OK
+ * <empty>
*/
enum remote_message_type {
/* client -> server. args from a method call */
/* server -> client. reply/error from a method call */
REMOTE_REPLY = 1,
/* either direction. async notification */
- REMOTE_MESSAGE = 2
+ REMOTE_MESSAGE = 2,
+ /* either direction. stream data packet */
+ REMOTE_STREAM = 3
};
enum remote_message_status {
/* For replies, indicates that an error happened, and a struct
* remote_error follows.
*/
- REMOTE_ERROR = 1
+ REMOTE_ERROR = 1,
+
+ /* For streams, indicates that more data is still expected
+ */
+ REMOTE_CONTINUE = 2
};
/* 4 byte length word per header */