Commit
2c85644b0b51fbe5b6244e6773531af29933a727 attempted to
fix a problem with tracking RPC messages from streams by doing
- if (msg->header.type == VIR_NET_REPLY) {
+ if (msg->header.type == VIR_NET_REPLY ||
+ (msg->header.type == VIR_NET_STREAM &&
+ msg->header.status != VIR_NET_CONTINUE)) {
client->nrequests--;
In other words any stream packet, with status NET_OK or NET_ERROR
would cause nrequests to be decremented. This is great if the
packet from from a synchronous virStreamFinish or virStreamAbort
API call, but wildly wrong if from a server initiated abort.
The latter resulted in 'nrequests' being decremented below zero.
This then causes all I/O for that client to be stopped.
Instead of trying to infer whether we need to decrement the
nrequests field, from the message type/status, introduce an
explicit 'bool tracked' field to mark whether the virNetMessagePtr
object is subject to tracking.
Also add a virNetMessageClear function to allow a message
contents to be cleared out, without adversely impacting the
'tracked' field as a naive memset() would do
* src/rpc/virnetmessage.c, src/rpc/virnetmessage.h: Add
a 'bool tracked' field and virNetMessageClear() API
* daemon/remote.c, daemon/stream.c, src/rpc/virnetclientprogram.c,
src/rpc/virnetclientstream.c, src/rpc/virnetserverclient.c,
src/rpc/virnetserverprogram.c: Switch over to use
virNetMessageClear() and pass in the 'bool tracked' value
when creating messages.
{
virNetMessagePtr msg;
- if (!(msg = virNetMessageNew()))
+ if (!(msg = virNetMessageNew(false)))
goto cleanup;
msg->header.prog = virNetServerProgramGetID(program);
virNetError(VIR_ERR_RPC,
"%s", _("stream had I/O failure"));
- msg = virNetMessageNew();
+ msg = virNetMessageNew(false);
if (!msg) {
ret = -1;
} else {
virNetMessagePtr tmp = msg->next;
if (client) {
/* Send a dummy reply to free up 'msg' & unblock client rx */
- memset(msg, 0, sizeof(*msg));
+ virNetMessageClear(msg);
msg->header.type = VIR_NET_REPLY;
if (virNetServerClientSendMessage(client, msg) < 0) {
virNetServerClientImmediateClose(client);
* its active request count / throttling
*/
if (msg->header.status == VIR_NET_CONTINUE) {
- memset(msg, 0, sizeof(*msg));
+ virNetMessageClear(msg);
msg->header.type = VIR_NET_REPLY;
if (virNetServerClientSendMessage(client, msg) < 0) {
virNetMessageFree(msg);
memset(&rerr, 0, sizeof(rerr));
- if (!(msg = virNetMessageNew()))
+ if (!(msg = virNetMessageNew(false)))
ret = -1;
else
ret = virNetServerProgramSendStreamError(remoteProgram,
stream->tx = 0;
if (ret == 0)
stream->recvEOF = 1;
- if (!(msg = virNetMessageNew()))
+ if (!(msg = virNetMessageNew(false)))
ret = -1;
if (msg) {
{
virNetMessagePtr msg;
- if (!(msg = virNetMessageNew()))
+ if (!(msg = virNetMessageNew(false)))
return -1;
msg->header.prog = prog->program;
bool wantReply;
VIR_DEBUG("st=%p status=%d data=%p nbytes=%zu", st, status, data, nbytes);
- if (!(msg = virNetMessageNew()))
+ if (!(msg = virNetMessageNew(false)))
return -1;
virMutexLock(&st->lock);
goto cleanup;
}
- if (!(msg = virNetMessageNew())) {
+ if (!(msg = virNetMessageNew(false))) {
virReportOOMError();
goto cleanup;
}
virReportErrorHelper(VIR_FROM_THIS, code, __FILE__, \
__FUNCTION__, __LINE__, __VA_ARGS__)
-virNetMessagePtr virNetMessageNew(void)
+virNetMessagePtr virNetMessageNew(bool tracked)
{
virNetMessagePtr msg;
return NULL;
}
- VIR_DEBUG("msg=%p", msg);
+ msg->tracked = tracked;
+ VIR_DEBUG("msg=%p tracked=%d", msg, tracked);
return msg;
}
+
+void virNetMessageClear(virNetMessagePtr msg)
+{
+ bool tracked = msg->tracked;
+ memset(msg, 0, sizeof(*msg));
+ msg->tracked = tracked;
+}
+
+
void virNetMessageFree(virNetMessagePtr msg)
{
if (!msg)
* use virNetMessageNew() to allocate on the heap
*/
struct _virNetMessage {
+ bool tracked;
+
char buffer[VIR_NET_MESSAGE_MAX + VIR_NET_MESSAGE_LEN_MAX];
size_t bufferLength;
size_t bufferOffset;
};
-virNetMessagePtr virNetMessageNew(void);
+virNetMessagePtr virNetMessageNew(bool tracked);
+
+void virNetMessageClear(virNetMessagePtr);
void virNetMessageFree(virNetMessagePtr msg);
return -1;
}
- if (!(confirm = virNetMessageNew()))
+ if (!(confirm = virNetMessageNew(false)))
return -1;
/* Checks have succeeded. Write a '\1' byte back to the client to
virNetTLSContextRef(tls);
/* Prepare one for packet receive */
- if (!(client->rx = virNetMessageNew()))
+ if (!(client->rx = virNetMessageNew(true)))
goto error;
client->rx->bufferLength = VIR_NET_MESSAGE_LEN_MAX;
client->nrequests = 1;
/* Possibly need to create another receive buffer */
if (client->nrequests < client->nrequests_max) {
- if (!(client->rx = virNetMessageNew())) {
+ if (!(client->rx = virNetMessageNew(true))) {
client->wantClose = true;
} else {
client->rx->bufferLength = VIR_NET_MESSAGE_LEN_MAX;
/* Get finished msg from head of tx queue */
msg = virNetMessageQueueServe(&client->tx);
- if (msg->header.type == VIR_NET_REPLY ||
- (msg->header.type == VIR_NET_STREAM &&
- msg->header.status != VIR_NET_CONTINUE)) {
+ if (msg->tracked) {
client->nrequests--;
/* See if the recv queue is currently throttled */
if (!client->rx &&
client->nrequests < client->nrequests_max) {
/* Ready to recv more messages */
+ virNetMessageClear(msg);
client->rx = msg;
- memset(client->rx, 0, sizeof(*client->rx));
client->rx->bufferLength = VIR_NET_MESSAGE_LEN_MAX;
msg = NULL;
client->nrequests++;
VIR_INFO("Ignoring unexpected stream data serial=%d proc=%d status=%d",
msg->header.serial, msg->header.proc, msg->header.status);
/* Send a dummy reply to free up 'msg' & unblock client rx */
- memset(msg, 0, sizeof(*msg));
+ virNetMessageClear(msg);
msg->header.type = VIR_NET_REPLY;
if (virNetServerClientSendMessage(client, msg) < 0) {
ret = -1;