int events,
void *opaque);
+/* Append a call to the end of the list */
+static void virNetClientCallQueue(virNetClientCallPtr *head,
+ virNetClientCallPtr call)
+{
+ virNetClientCallPtr tmp = *head;
+ while (tmp && tmp->next) {
+ tmp = tmp->next;
+ }
+ if (tmp)
+ tmp->next = call;
+ else
+ *head = call;
+ call->next = NULL;
+}
+
+#if 0
+/* Obtain a call from the head of the list */
+static virNetClientCallPtr virNetClientCallServe(virNetClientCallPtr *head)
+{
+ virNetClientCallPtr tmp = *head;
+ if (tmp)
+ *head = tmp->next;
+ else
+ *head = NULL;
+ tmp->next = NULL;
+ return tmp;
+}
+#endif
+
+/* Remove a call from anywhere in the list */
+static void virNetClientCallRemove(virNetClientCallPtr *head,
+ virNetClientCallPtr call)
+{
+ virNetClientCallPtr tmp = *head;
+ virNetClientCallPtr prev = NULL;
+ while (tmp) {
+ if (tmp == call) {
+ if (prev)
+ prev->next = tmp->next;
+ else
+ *head = tmp->next;
+ tmp->next = NULL;
+ return;
+ }
+ prev = tmp;
+ tmp = tmp->next;
+ }
+}
+
+/* Predicate returns true if matches */
+typedef bool (*virNetClientCallPredicate)(virNetClientCallPtr call, void *opaque);
+
+/* Remove a list of calls from the list based on a predicate */
+static void virNetClientCallRemovePredicate(virNetClientCallPtr *head,
+ virNetClientCallPredicate pred,
+ void *opaque)
+{
+ virNetClientCallPtr tmp = *head;
+ virNetClientCallPtr prev = NULL;
+ while (tmp) {
+ virNetClientCallPtr next = tmp->next;
+ tmp->next = NULL; /* Temp unlink */
+ if (pred(tmp, opaque)) {
+ if (prev)
+ prev->next = next;
+ else
+ *head = next;
+ } else {
+ tmp->next = next; /* Reverse temp unlink */
+ prev = tmp;
+ }
+ tmp = next;
+ }
+}
+
+/* Returns true if the predicate matches at least one call in the list */
+static bool virNetClientCallMatchPredicate(virNetClientCallPtr head,
+ virNetClientCallPredicate pred,
+ void *opaque)
+{
+ virNetClientCallPtr tmp = head;
+ while (tmp) {
+ if (pred(tmp, opaque)) {
+ return true;
+ }
+ tmp = tmp->next;
+ }
+ return false;
+}
+
+
static void virNetClientEventFree(void *opaque)
{
virNetClientPtr client = opaque;
}
+static bool virNetClientIOEventLoopPollEvents(virNetClientCallPtr call,
+ void *opaque)
+{
+ struct pollfd *fd = opaque;
+
+ if (call->mode == VIR_NET_CLIENT_MODE_WAIT_RX)
+ fd->events |= POLLIN;
+ if (call->mode == VIR_NET_CLIENT_MODE_WAIT_TX)
+ fd->events |= POLLOUT;
+
+ return false;
+}
+
+
+static bool virNetClientIOEventLoopRemoveDone(virNetClientCallPtr call,
+ void *opaque)
+{
+ virNetClientCallPtr thiscall = opaque;
+
+ if (call == thiscall)
+ return false;
+
+ if (call->mode != VIR_NET_CLIENT_MODE_COMPLETE)
+ return false;
+
+ /*
+ * ...they won't actually wakeup until
+ * we release our mutex a short while
+ * later...
+ */
+ VIR_DEBUG("Waking up sleeping call %p", call);
+ virCondSignal(&call->cond);
+
+ return true;
+}
+
/*
* Process all calls pending dispatch/receive until we
* get a reply to our own call. Then quit and pass the buck
fds[1].fd = client->wakeupReadFD;
for (;;) {
- virNetClientCallPtr tmp = client->waitDispatch;
- virNetClientCallPtr prev;
char ignore;
sigset_t oldmask, blockedsigs;
int timeout = -1;
fds[1].events = fds[1].revents = 0;
fds[1].events = POLLIN;
- while (tmp) {
- if (tmp->mode == VIR_NET_CLIENT_MODE_WAIT_RX)
- fds[0].events |= POLLIN;
- if (tmp->mode == VIR_NET_CLIENT_MODE_WAIT_TX)
- fds[0].events |= POLLOUT;
- tmp = tmp->next;
- }
+ /* Calculate poll events for calls */
+ virNetClientCallMatchPredicate(client->waitDispatch,
+ virNetClientIOEventLoopPollEvents,
+ &fds[0]);
/* We have to be prepared to receive stream data
* regardless of whether any of the calls waiting
/* Iterate through waiting threads and if
* any are complete then tell 'em to wakeup
*/
- tmp = client->waitDispatch;
- prev = NULL;
- while (tmp) {
- if (tmp != thiscall &&
- tmp->mode == VIR_NET_CLIENT_MODE_COMPLETE) {
- /* Take them out of the list */
- if (prev)
- prev->next = tmp->next;
- else
- client->waitDispatch = tmp->next;
-
- /* And wake them up....
- * ...they won't actually wakeup until
- * we release our mutex a short while
- * later...
- */
- VIR_DEBUG("Waking up sleep %p %p", tmp, client->waitDispatch);
- virCondSignal(&tmp->cond);
- } else {
- prev = tmp;
- }
- tmp = tmp->next;
- }
+ virNetClientCallRemovePredicate(&client->waitDispatch,
+ virNetClientIOEventLoopRemoveDone,
+ thiscall);
/* Now see if *we* are done */
if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) {
- /* We're at head of the list already, so
- * remove us
- */
- client->waitDispatch = thiscall->next;
+ virNetClientCallRemove(&client->waitDispatch, thiscall);
+
VIR_DEBUG("Giving up the buck %p %p", thiscall, client->waitDispatch);
+
/* See if someone else is still waiting
* and if so, then pass the buck ! */
if (client->waitDispatch) {
error:
- client->waitDispatch = thiscall->next;
+ virNetClientCallRemove(&client->waitDispatch, thiscall);
VIR_DEBUG("Giving up the buck due to I/O error %p %p", thiscall, client->waitDispatch);
/* See if someone else is still waiting
* and if so, then pass the buck ! */
/* Check to see if another thread is dispatching */
if (client->waitDispatch) {
- /* Stick ourselves on the end of the wait queue */
- virNetClientCallPtr tmp = client->waitDispatch;
char ignore = 1;
- while (tmp && tmp->next)
- tmp = tmp->next;
- if (tmp)
- tmp->next = thiscall;
- else
- client->waitDispatch = thiscall;
+ /* Stick ourselves on the end of the wait queue */
+ virNetClientCallQueue(&client->waitDispatch, thiscall);
/* Force other thread to wakeup from poll */
if (safewrite(client->wakeupSendFD, &ignore, sizeof(ignore)) != sizeof(ignore)) {
- if (tmp)
- tmp->next = NULL;
- else
- client->waitDispatch = NULL;
+ virNetClientCallRemove(&client->waitDispatch, thiscall);
virReportSystemError(errno, "%s",
_("failed to wake up polling thread"));
return -1;
VIR_DEBUG("Going to sleep %p %p", client->waitDispatch, thiscall);
/* Go to sleep while other thread is working... */
if (virCondWait(&thiscall->cond, &client->lock) < 0) {
- if (client->waitDispatch == thiscall) {
- client->waitDispatch = thiscall->next;
- } else {
- tmp = client->waitDispatch;
- while (tmp && tmp->next &&
- tmp->next != thiscall) {
- tmp = tmp->next;
- }
- if (tmp && tmp->next == thiscall)
- tmp->next = thiscall->next;
- }
+ virNetClientCallRemove(&client->waitDispatch, thiscall);
virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
_("failed to wait on condition"));
return -1;
}
/* Grr, someone passed the buck onto us ... */
-
} else {
- /* We're first to catch the buck */
- client->waitDispatch = thiscall;
+ /* We're the first to arrive */
+ virNetClientCallQueue(&client->waitDispatch, thiscall);
}
VIR_DEBUG("We have the buck %p %p", client->waitDispatch, thiscall);