- Threading: the RULES.
- ====================
+ Threading in the libvirtd daemon
+ ================================
-If you don't understand this, don't touch the code. Ask for
-further advice / explanation on the mailing list first.
+To allow efficient processing of RPC requests, the libvirtd daemon
+makes use of threads.
+
+ - The process leader. This is the initial thread of control
+ when the daemon starts running. It is responsible for
+ initializing all the state, and starting the event loop.
+ Once that's all done, this thread does nothing except
+ wait for the event loop to quit, thus indicating an orderly
+ shutdown is required.
+
+ - The event loop. This thread runs the event loop, sitting
+ in poll() on all monitored file handles, and calculating
+ and dispatching any timers that may be registered. When
+ this thread quits, the entire daemon will shutdown.
+
+ - The workers. These 'n' threads all sit around waiting to
+ process incoming RPC requests. Since RPC requests may take
+ a long time to complete, with long idle periods, there will
+ be quite a few workers running.
+
+The use of threads obviously requires locking to ensure safety when
+accessing/changing data structures.
- the top level lock is on 'struct qemud_server'. This must be
held before acquiring any other lock
this as a caller of virEvent APIs.
-The server lock is only needed / used once the daemon has entered
-its main loop, which is the qemudRunLoop() . The initial thread
-acquires the lock upon entering this method.
-
-It immediatelty spawns 'n' worker threads, whose main loop is
-the qemudWorker() method. The workers will immediately try to
-acquire the server lock, and thus block since its held by the
-initial thread.
-
-When the initial thread enters the poll() call, it drops the
-server lock. The worker locks now each wakeup, acquire the
-server lock and go into a condition wait on the 'job' condition
-variable. The workers are now all 'primed' for incoming RPC
-calls.
-
-
-
-A file descriptor event now occurrs, causing the initial thread
-to exit poll(). It invokes the registered callback associated
-with the file descriptors on which the event occurrs. The callbacks
-are required to immediately acquire the server lock.
-
-If the callback is dealing with a client event, it will then
-acquire the client lock, and drop the server lock.
-
-The callback will now handle the I/O event, reading or writing
-a RPC message. Once a complete RPC message has been read the
-client is marked as being in state QEMUD_MODE_WAIT_DISPATCH,
-and the 'job' condition variable is signaled. The callback
-now drops the client lock and goes back into the poll() loop
-waiting for more I/O events.
+The server lock is used in conjunction with a condition variable
+to pass jobs from the event loop thread to the workers. The main
+event loop thread handles I/O from the client socket, and once a
+complete RPC message has been read off the wire (and optionally
+decrypted), it will be placed onto the 'dx' job queue for the
+associated client object. The job condition will be signalled and
+a worker will wakup and process it.
-Meanwhile one of the worker threads wakes up from its condition
-variable sleep, holding the server lock. It now searches for a
-client in state QEMUD_MODE_WAIT_DISPATCH. If it doesn't find
-one, it goes back to sleep. If it does find one, then it calls
-into the remoteDispatchClientRequest() method de-serialize the
-incoming message into an XDR object and invoke the helper method
-for the associated RPC call.
+The worker thread must quickly drop its locks on the server and
+client to allow the main event loop thread to continue running
+with its other work. Critically important, is that now libvirt
+API call will ever be made with the server or client locks held.
-While the helper method is executing, no locks are held on either
-the client or server, but the ref count on the 'struct qemud_client'
-object is incremented to ensure its not deleted. The helper can
-now safely invoke the necessary libvirt API call.
+-- End
case SIGQUIT:
case SIGTERM:
VIR_WARN(_("Shutting down on signal %d"), siginfo.si_signo);
- server->shutdown = 1;
+ server->quitEventThread = 1;
break;
default:
}
if (ret != 0)
- server->shutdown = 1;
+ server->quitEventThread = 1;
virMutexUnlock(&server->lock);
}
goto cleanup;
}
- if ((sock->watch = virEventAddHandleImpl(sock->fd,
- VIR_EVENT_HANDLE_READABLE |
- VIR_EVENT_HANDLE_ERROR |
- VIR_EVENT_HANDLE_HANGUP,
- qemudDispatchServerEvent,
- server, NULL)) < 0) {
- VIR_ERROR0(_("Failed to add server event callback"));
- goto cleanup;
- }
-
sock->next = server->sockets;
server->sockets = sock;
server->nsockets++;
virStrerror (errno, ebuf, sizeof ebuf));
goto cleanup;
}
-
- if ((sock->watch = virEventAddHandleImpl(sock->fd,
- VIR_EVENT_HANDLE_READABLE |
- VIR_EVENT_HANDLE_ERROR |
- VIR_EVENT_HANDLE_HANGUP,
- qemudDispatchServerEvent,
- server, NULL)) < 0) {
- VIR_ERROR0(_("Failed to add server event callback"));
- goto cleanup;
- }
-
}
return 0;
return -1;
}
+static int qemudNetworkEnable(struct qemud_server *server) {
+ struct qemud_socket *sock;
+
+ sock = server->sockets;
+ while (sock) {
+ if ((sock->watch = virEventAddHandleImpl(sock->fd,
+ VIR_EVENT_HANDLE_READABLE |
+ VIR_EVENT_HANDLE_ERROR |
+ VIR_EVENT_HANDLE_HANGUP,
+ qemudDispatchServerEvent,
+ server, NULL)) < 0) {
+ VIR_ERROR0(_("Failed to add server event callback"));
+ return -1;
+ }
+
+ sock = sock->next;
+ }
+ return 0;
+}
static gnutls_session_t
remoteInitializeTLSSession (void)
virEventUpdateTimeoutImpl(timerid, -1);
} else {
DEBUG0("Timer expired and inactive, shutting down");
- server->shutdown = 1;
+ server->quitEventThread = 1;
}
}
VIR_FREE(client);
}
-static int qemudRunLoop(struct qemud_server *server) {
+static void *qemudRunLoop(void *opaque) {
+ struct qemud_server *server = opaque;
int timerid = -1;
- int ret = -1, i;
+ int i;
int timerActive = 0;
virMutexLock(&server->lock);
qemudInactiveTimer,
server, NULL)) < 0) {
VIR_ERROR0(_("Failed to register shutdown timeout"));
- return -1;
+ return NULL;
}
if (min_workers > max_workers)
server->nworkers = max_workers;
if (VIR_ALLOC_N(server->workers, server->nworkers) < 0) {
VIR_ERROR0(_("Failed to allocate workers"));
- return -1;
+ return NULL;
}
for (i = 0 ; i < min_workers ; i++) {
server->nactiveworkers++;
}
- for (;;) {
+ for (;!server->quitEventThread;) {
/* A shutdown timeout is specified, so check
* if any drivers have active state, if not
* shutdown after timeout seconds
server->nactiveworkers--;
}
}
-
- if (server->shutdown) {
- ret = 0;
- break;
- }
}
cleanup:
VIR_FREE(server->workers);
virMutexUnlock(&server->lock);
- return ret;
+ return NULL;
}
+
+static int qemudStartEventLoop(struct qemud_server *server) {
+ pthread_attr_t attr;
+ pthread_attr_init(&attr);
+ /* We want to join the eventloop, so don't detach it */
+ /*pthread_attr_setdetachstate(&attr, 1);*/
+
+ if (pthread_create(&server->eventThread,
+ &attr,
+ qemudRunLoop,
+ server) != 0)
+ return -1;
+
+ server->hasEventThread = 1;
+
+ return 0;
+}
+
+
static void qemudCleanup(struct qemud_server *server) {
struct qemud_socket *sock;
statuswrite = -1;
}
+ /* Start the event loop in a background thread, since
+ * state initialization needs events to be being processed */
+ if (qemudStartEventLoop(server) < 0) {
+ VIR_ERROR0("Event thread startup failed");
+ goto error;
+ }
+
/* Start the stateful HV drivers
* This is delibrately done after telling the parent process
* we're ready, since it can take a long time and this will
goto error;
}
- qemudRunLoop(server);
+ /* Start accepting new clients from network */
+ virMutexLock(&server->lock);
+ if (qemudNetworkEnable(server) < 0) {
+ VIR_ERROR0("Network event loop enablement failed");
+ goto shutdown;
+ }
+ virMutexUnlock(&server->lock);
+
ret = 0;
+shutdown:
+ /* In a non-0 shutdown scenario we need to tell event loop
+ * to quit immediately. Otherwise in normal case we just
+ * sit in the thread join forever. Sure this means the
+ * main thread doesn't do anything useful ever, but that's
+ * not too much of drain on resources
+ */
+ if (ret != 0) {
+ virMutexLock(&server->lock);
+ if (server->hasEventThread)
+ /* This SIGQUIT triggers the shutdown process */
+ kill(getpid(), SIGQUIT);
+ virMutexUnlock(&server->lock);
+ }
+ pthread_join(server->eventThread, NULL);
+
error:
if (statuswrite != -1) {
if (ret != 0) {