]> xenbits.xensource.com Git - libvirt.git/commitdiff
admin: Introduce virAdmServerSetThreadPoolParameters
authorErik Skultety <eskultet@redhat.com>
Mon, 22 Feb 2016 13:24:04 +0000 (14:24 +0100)
committerErik Skultety <eskultet@redhat.com>
Mon, 18 Apr 2016 15:07:46 +0000 (17:07 +0200)
Since threadpool increments the current number of threads according to current
load, i.e. how many jobs are waiting in the queue. The count however, is
constrained by max and min limits of workers. The logic of this new API works
like this:
    1) setting the minimum
        a) When the limit is increased, depending on the current number of
           threads, new threads are possibly spawned if the current number of
           threads is less than the new minimum limit
        b) Decreasing the minimum limit has no possible effect on the current
           number of threads
    2) setting the maximum
        a) Icreasing the maximum limit has no immediate effect on the current
           number of threads, it only allows the threadpool to spawn more
           threads when new jobs, that would otherwise end up queued, arrive.
        b) Decreasing the maximum limit may affect the current number of
           threads, if the current number of threads is less than the new
           maximum limit. Since there may be some ongoing time-consuming jobs
           that would effectively block this API from killing any threads.
           Therefore, this API is asynchronous with best-effort execution,
           i.e. the necessary number of workers will be terminated once they
           finish their previous job, unless other workers had already
           terminated, decreasing the limit to the requested value.
    3) setting priority workers
        - both increase and decrease in count of these workers have an
          immediate impact on the current number of workers, new ones will be
          spawned or some of them get terminated respectively.

Signed-off-by: Erik Skultety <eskultet@redhat.com>
15 files changed:
daemon/admin.c
daemon/admin_server.c
daemon/admin_server.h
include/libvirt/libvirt-admin.h
src/admin/admin_protocol.x
src/admin/admin_remote.c
src/admin_protocol-structs
src/libvirt-admin.c
src/libvirt_admin_private.syms
src/libvirt_admin_public.syms
src/libvirt_private.syms
src/rpc/virnetserver.c
src/rpc/virnetserver.h
src/util/virthreadpool.c
src/util/virthreadpool.h

index 4589eb618f026c30e8f89bc1f02af02cf370ea8a..00e7dc3c00acf97a0f538301bf217b0fc0ac7f10 100644 (file)
@@ -178,4 +178,47 @@ adminDispatchServerGetThreadpoolParameters(virNetServerPtr server ATTRIBUTE_UNUS
     virObjectUnref(srv);
     return rv;
 }
+
+static int
+adminDispatchServerSetThreadpoolParameters(virNetServerPtr server ATTRIBUTE_UNUSED,
+                                           virNetServerClientPtr client,
+                                           virNetMessagePtr msg ATTRIBUTE_UNUSED,
+                                           virNetMessageErrorPtr rerr,
+                                           struct admin_server_set_threadpool_parameters_args *args)
+{
+    int rv = -1;
+    virNetServerPtr srv = NULL;
+    virTypedParameterPtr params = NULL;
+    int nparams = 0;
+    struct daemonAdmClientPrivate *priv =
+        virNetServerClientGetPrivateData(client);
+
+    if (!(srv = virNetDaemonGetServer(priv->dmn, args->srv.name))) {
+        virReportError(VIR_ERR_NO_SERVER,
+                       _("no server with matching name '%s' found"),
+                       args->srv.name);
+        goto cleanup;
+    }
+
+    if (virTypedParamsDeserialize((virTypedParameterRemotePtr) args->params.params_val,
+                                  args->params.params_len,
+                                  ADMIN_SERVER_THREADPOOL_PARAMETERS_MAX,
+                                  &params,
+                                  &nparams) < 0)
+        goto cleanup;
+
+
+    if (adminServerSetThreadPoolParameters(srv, params,
+                                           nparams, args->flags) < 0)
+        goto cleanup;
+
+    rv = 0;
+ cleanup:
+    if (rv < 0)
+        virNetMessageSaveError(rerr);
+
+    virTypedParamsFree(params, nparams);
+    virObjectUnref(srv);
+    return rv;
+}
 #include "admin_dispatch.h"
index 10c00f6386b4cc2c6d8494bb102144d92cff38dc..e39a9bd805e751608c8efc967a1ef9ce8469f15c 100644 (file)
@@ -32,6 +32,7 @@
 #include "virnetserver.h"
 #include "virstring.h"
 #include "virthreadpool.h"
+#include "virtypedparam.h"
 
 #define VIR_FROM_THIS VIR_FROM_ADMIN
 
@@ -135,3 +136,45 @@ adminServerGetThreadPoolParameters(virNetServerPtr srv,
     virTypedParamsFree(tmpparams, *nparams);
     return ret;
 }
+
+int
+adminServerSetThreadPoolParameters(virNetServerPtr srv,
+                                   virTypedParameterPtr params,
+                                   int nparams,
+                                   unsigned int flags)
+{
+    long long int minWorkers = -1;
+    long long int maxWorkers = -1;
+    long long int prioWorkers = -1;
+    virTypedParameterPtr param = NULL;
+
+    virCheckFlags(0, -1);
+
+    if (virTypedParamsValidate(params, nparams,
+                               VIR_THREADPOOL_WORKERS_MIN,
+                               VIR_TYPED_PARAM_UINT,
+                               VIR_THREADPOOL_WORKERS_MAX,
+                               VIR_TYPED_PARAM_UINT,
+                               VIR_THREADPOOL_WORKERS_PRIORITY,
+                               VIR_TYPED_PARAM_UINT,
+                               NULL) < 0)
+        return -1;
+
+    if ((param = virTypedParamsGet(params, nparams,
+                                   VIR_THREADPOOL_WORKERS_MIN)))
+        minWorkers = param->value.ui;
+
+    if ((param = virTypedParamsGet(params, nparams,
+                                   VIR_THREADPOOL_WORKERS_MAX)))
+        maxWorkers = param->value.ui;
+
+    if ((param = virTypedParamsGet(params, nparams,
+                                   VIR_THREADPOOL_WORKERS_PRIORITY)))
+        prioWorkers = param->value.ui;
+
+    if (virNetServerSetThreadPoolParameters(srv, minWorkers,
+                                            maxWorkers, prioWorkers) < 0)
+        return -1;
+
+    return 0;
+}
index 2ddaecc707e9a17e976e23ff2c107e99ccf71294..756e049ff0591d2f5f2270b26058a3ec8af46dba 100644 (file)
@@ -40,5 +40,10 @@ adminServerGetThreadPoolParameters(virNetServerPtr srv,
                                    virTypedParameterPtr *params,
                                    int *nparams,
                                    unsigned int flags);
+int
+adminServerSetThreadPoolParameters(virNetServerPtr srv,
+                                   virTypedParameterPtr params,
+                                   int nparams,
+                                   unsigned int flags);
 
 #endif /* __LIBVIRTD_ADMIN_SERVER_H__ */
index bb132505138c2928f38a5f1a5021fb9a44402677..bce6034d97ac03c45a4bb92b76027724e2c9d271 100644 (file)
@@ -177,6 +177,11 @@ int virAdmServerGetThreadPoolParameters(virAdmServerPtr srv,
                                         int *nparams,
                                         unsigned int flags);
 
+int virAdmServerSetThreadPoolParameters(virAdmServerPtr srv,
+                                        virTypedParameterPtr params,
+                                        int nparams,
+                                        unsigned int flags);
+
 # ifdef __cplusplus
 }
 # endif
index b1093d8889b44878b88b4f8902d096abcee0967e..c701698e9cf080b5808db392a4ba916f8bf034ef 100644 (file)
@@ -110,6 +110,12 @@ struct admin_server_get_threadpool_parameters_ret {
     admin_typed_param params<ADMIN_SERVER_THREADPOOL_PARAMETERS_MAX>;
 };
 
+struct admin_server_set_threadpool_parameters_args {
+    admin_nonnull_server srv;
+    admin_typed_param params<ADMIN_SERVER_THREADPOOL_PARAMETERS_MAX>;
+    unsigned int flags;
+};
+
 /* Define the program number, protocol version and procedure numbers here. */
 const ADMIN_PROGRAM = 0x06900690;
 const ADMIN_PROTOCOL_VERSION = 1;
@@ -160,5 +166,10 @@ enum admin_procedure {
     /**
      * @generate: none
      */
-    ADMIN_PROC_SERVER_GET_THREADPOOL_PARAMETERS = 6
+    ADMIN_PROC_SERVER_GET_THREADPOOL_PARAMETERS = 6,
+
+    /**
+     * @generate: none
+     */
+    ADMIN_PROC_SERVER_SET_THREADPOOL_PARAMETERS = 7
 };
index ac38ce91a9469a63f03d616604a48f21d6837b08..2dd692b342abf640098f686d4878838edd6434ca 100644 (file)
@@ -267,3 +267,38 @@ remoteAdminServerGetThreadPoolParameters(virAdmServerPtr srv,
     virObjectUnlock(priv);
     return rv;
 }
+
+static int
+remoteAdminServerSetThreadPoolParameters(virAdmServerPtr srv,
+                                         virTypedParameterPtr params,
+                                         int nparams,
+                                         unsigned int flags)
+{
+    int rv = -1;
+    remoteAdminPrivPtr priv = srv->conn->privateData;
+    admin_server_set_threadpool_parameters_args args;
+
+    args.flags = flags;
+    make_nonnull_server(&args.srv, srv);
+
+    virObjectLock(priv);
+
+    if (virTypedParamsSerialize(params, nparams,
+                                (virTypedParameterRemotePtr *) &args.params.params_val,
+                                &args.params.params_len,
+                                0) < 0)
+        goto cleanup;
+
+
+    if (call(srv->conn, 0, ADMIN_PROC_SERVER_SET_THREADPOOL_PARAMETERS,
+             (xdrproc_t)xdr_admin_server_set_threadpool_parameters_args, (char *) &args,
+             (xdrproc_t)xdr_void, (char *) NULL) == -1)
+        goto cleanup;
+
+    rv = 0;
+ cleanup:
+    virTypedParamsRemoteFree((virTypedParameterRemotePtr) args.params.params_val,
+                             args.params.params_len);
+    virObjectUnlock(priv);
+    return rv;
+}
index c4e679a900d8ca2016e828ee52e4526c68163834..650d31d06a2a46bb46dbb1ac3d136b8e67b482b2 100644 (file)
@@ -61,6 +61,14 @@ struct admin_server_get_threadpool_parameters_ret {
                 admin_typed_param * params_val;
         } params;
 };
+struct admin_server_set_threadpool_parameters_args {
+        admin_nonnull_server       srv;
+        struct {
+                u_int              params_len;
+                admin_typed_param * params_val;
+        } params;
+        u_int                      flags;
+};
 enum admin_procedure {
         ADMIN_PROC_CONNECT_OPEN = 1,
         ADMIN_PROC_CONNECT_CLOSE = 2,
@@ -68,4 +76,5 @@ enum admin_procedure {
         ADMIN_PROC_CONNECT_LIST_SERVERS = 4,
         ADMIN_PROC_CONNECT_LOOKUP_SERVER = 5,
         ADMIN_PROC_SERVER_GET_THREADPOOL_PARAMETERS = 6,
+        ADMIN_PROC_SERVER_SET_THREADPOOL_PARAMETERS = 7,
 };
index 07d46c4db5b24fe8f8c7b8177e297703e8c59f25..df71649c2eba0966049646c2ea1da1c74159c082 100644 (file)
@@ -721,3 +721,40 @@ virAdmServerGetThreadPoolParameters(virAdmServerPtr srv,
     virDispatchError(NULL);
     return -1;
 }
+
+/**
+ * virAdmServerSetThreadPoolParameters:
+ * @srv: a valid server object reference
+ * @params: pointer to threadpool typed parameter objects
+ * @nparams: number of parameters in @params
+ * @flags: extra flags; not used yet, so callers should always pass 0
+ *
+ * Change server threadpool parameters according to @params. Note that some
+ * tunables are read-only, thus any attempt to set them will result in a
+ * failure.
+ *
+ * Returns 0 on success, -1 in case of an error.
+ */
+int
+virAdmServerSetThreadPoolParameters(virAdmServerPtr srv,
+                                    virTypedParameterPtr params,
+                                    int nparams,
+                                    unsigned int flags)
+{
+    VIR_DEBUG("srv=%p, params=%p, nparams=%x, flags=%x",
+              srv, params, nparams, flags);
+
+    virResetLastError();
+
+    virCheckAdmServerReturn(srv, -1);
+    virCheckNonNullArgGoto(params, error);
+
+    if (remoteAdminServerSetThreadPoolParameters(srv, params,
+                                                 nparams, flags) < 0)
+        goto error;
+
+    return 0;
+ error:
+    virDispatchError(NULL);
+    return -1;
+}
index b05067c7e34b69d0f74b69b71ae0ee02a260f6d4..b150d8a86144b853c60b2568811ab59e274265b7 100644 (file)
@@ -14,6 +14,7 @@ xdr_admin_connect_lookup_server_ret;
 xdr_admin_connect_open_args;
 xdr_admin_server_get_threadpool_parameters_args;
 xdr_admin_server_get_threadpool_parameters_ret;
+xdr_admin_server_set_threadpool_parameters_args;
 
 # datatypes.h
 virAdmConnectClass;
index 0a12b5fb3f0a7db0bd6e97ba36d0dcce5c34f732..0a164448166f270af7b40fe549a0fe73ff541415 100644 (file)
@@ -26,4 +26,5 @@ LIBVIRT_ADMIN_1.3.0 {
         virAdmServerGetThreadPoolParameters;
         virAdmServerFree;
         virAdmConnectLookupServer;
+        virAdmServerSetThreadPoolParameters;
 };
index 6d90eca6758f620f04cd274113856277a0527a2e..f046fbff9715595ef019cdfa426d71dd7a0b37cb 100644 (file)
@@ -2373,6 +2373,7 @@ virThreadPoolGetMinWorkers;
 virThreadPoolGetPriorityWorkers;
 virThreadPoolNewFull;
 virThreadPoolSendJob;
+virThreadPoolSetParameters;
 
 
 # util/virtime.h
index 3878547700df5e0eef752c069c3ea6f2c3fb365b..57bd95c22e851c11f9065c3624b6db322b6e65bd 100644 (file)
@@ -899,3 +899,18 @@ virNetServerGetThreadPoolParameters(virNetServerPtr srv,
     virObjectUnlock(srv);
     return 0;
 }
+
+int
+virNetServerSetThreadPoolParameters(virNetServerPtr srv,
+                                    long long int minWorkers,
+                                    long long int maxWorkers,
+                                    long long int prioWorkers)
+{
+    int ret;
+
+    virObjectLock(srv);
+    ret = virThreadPoolSetParameters(srv->workers, minWorkers,
+                                     maxWorkers, prioWorkers);
+    virObjectUnlock(srv);
+    return ret;
+}
index 6f17d1cc4cf074a15e61ac745a52ed97e41bf9ac..8b304f68e7f79a10f4b8b8e6f8572202fd3cf35c 100644 (file)
@@ -97,4 +97,9 @@ int virNetServerGetThreadPoolParameters(virNetServerPtr srv,
                                         size_t *nPrioWorkers,
                                         size_t *jobQueueDepth);
 
+int virNetServerSetThreadPoolParameters(virNetServerPtr srv,
+                                        long long int minWorkers,
+                                        long long int maxWorkers,
+                                        long long int prioWorkers);
+
 #endif /* __VIR_NET_SERVER_H__ */
index fec8620c10bb84059d243f9c2fed64438af3944d..10f2bd2c3a03ac1186d393258b643bcea2cadf96 100644 (file)
@@ -73,6 +73,7 @@ struct _virThreadPool {
     size_t nWorkers;
     virThreadPtr workers;
 
+    size_t maxPrioWorkers;
     size_t nPrioWorkers;
     virThreadPtr prioWorkers;
     virCond prioCond;
@@ -84,12 +85,22 @@ struct virThreadPoolWorkerData {
     bool priority;
 };
 
+/* Test whether the worker needs to quit if the current number of workers @count
+ * is greater than @limit actually allows.
+ */
+static inline bool virThreadPoolWorkerQuitHelper(size_t count, size_t limit)
+{
+    return count > limit;
+}
+
 static void virThreadPoolWorker(void *opaque)
 {
     struct virThreadPoolWorkerData *data = opaque;
     virThreadPoolPtr pool = data->pool;
     virCondPtr cond = data->cond;
     bool priority = data->priority;
+    size_t *curWorkers = priority ? &pool->nPrioWorkers : &pool->nWorkers;
+    size_t *maxLimit = priority ? &pool->maxPrioWorkers : &pool->maxWorkers;
     virThreadPoolJobPtr job = NULL;
 
     VIR_FREE(data);
@@ -97,6 +108,14 @@ static void virThreadPoolWorker(void *opaque)
     virMutexLock(&pool->mutex);
 
     while (1) {
+        /* In order to support async worker termination, we need ensure that
+         * both busy and free workers know if they need to terminated. Thus,
+         * busy workers need to check for this fact before they start waiting for
+         * another job (and before taking another one from the queue); and
+         * free workers need to check for this right after waking up.
+         */
+        if (virThreadPoolWorkerQuitHelper(*curWorkers, *maxLimit))
+            goto out;
         while (!pool->quit &&
                ((!priority && !pool->jobList.head) ||
                 (priority && !pool->jobList.firstPrio))) {
@@ -109,6 +128,9 @@ static void virThreadPoolWorker(void *opaque)
             }
             if (!priority)
                 pool->freeWorkers--;
+
+            if (virThreadPoolWorkerQuitHelper(*curWorkers, *maxLimit))
+                goto out;
         }
 
         if (pool->quit)
@@ -160,12 +182,12 @@ static void virThreadPoolWorker(void *opaque)
 static int
 virThreadPoolExpand(virThreadPoolPtr pool, size_t gain, bool priority)
 {
-    virThreadPtr workers = priority ? pool->prioWorkers : pool->workers;
+    virThreadPtr *workers = priority ? &pool->prioWorkers : &pool->workers;
     size_t *curWorkers = priority ? &pool->nPrioWorkers : &pool->nWorkers;
     size_t i = 0;
     struct virThreadPoolWorkerData *data = NULL;
 
-    if (VIR_EXPAND_N(workers, *curWorkers, gain) < 0)
+    if (VIR_EXPAND_N(*workers, *curWorkers, gain) < 0)
         return -1;
 
     for (i = 0; i < gain; i++) {
@@ -176,7 +198,7 @@ virThreadPoolExpand(virThreadPoolPtr pool, size_t gain, bool priority)
         data->cond = priority ? &pool->prioCond : &pool->cond;
         data->priority = priority;
 
-        if (virThreadCreateFull(&workers[i],
+        if (virThreadCreateFull(&(*workers)[i],
                                 false,
                                 virThreadPoolWorker,
                                 pool->jobFuncName,
@@ -226,6 +248,7 @@ virThreadPoolNewFull(size_t minWorkers,
 
     pool->minWorkers = minWorkers;
     pool->maxWorkers = maxWorkers;
+    pool->maxPrioWorkers = prioWorkers;
 
     if (virThreadPoolExpand(pool, minWorkers, false) < 0)
         goto error;
@@ -399,3 +422,54 @@ int virThreadPoolSendJob(virThreadPoolPtr pool,
     virMutexUnlock(&pool->mutex);
     return -1;
 }
+
+int
+virThreadPoolSetParameters(virThreadPoolPtr pool,
+                           long long int minWorkers,
+                           long long int maxWorkers,
+                           long long int prioWorkers)
+{
+    size_t max;
+    size_t min;
+
+    virMutexLock(&pool->mutex);
+
+    max = maxWorkers >= 0 ? maxWorkers : pool->maxWorkers;
+    min = minWorkers >= 0 ? minWorkers : pool->minWorkers;
+    if (min > max) {
+        virReportError(VIR_ERR_INVALID_ARG, "%s",
+                       _("minWorkers cannot be larger than maxWorkers"));
+        goto error;
+    }
+
+    if (minWorkers >= 0) {
+        if ((size_t) minWorkers > pool->nWorkers &&
+            virThreadPoolExpand(pool, minWorkers - pool->nWorkers,
+                                false) < 0)
+            goto error;
+        pool->minWorkers = minWorkers;
+    }
+
+    if (maxWorkers >= 0) {
+        pool->maxWorkers = maxWorkers;
+        virCondBroadcast(&pool->cond);
+    }
+
+    if (prioWorkers >= 0) {
+        if (prioWorkers < pool->nPrioWorkers) {
+            virCondBroadcast(&pool->prioCond);
+        } else if ((size_t) prioWorkers > pool->nPrioWorkers &&
+                   virThreadPoolExpand(pool, prioWorkers - pool->nPrioWorkers,
+                                       true) < 0) {
+            goto error;
+        }
+        pool->maxPrioWorkers = prioWorkers;
+    }
+
+    virMutexUnlock(&pool->mutex);
+    return 0;
+
+ error:
+    virMutexUnlock(&pool->mutex);
+    return -1;
+}
index bc0c90771be02f6fd81993ad0f0743ca0cea85d1..e1f362f5bb605f0e3604f5c031e33a0b868a4b0e 100644 (file)
@@ -57,4 +57,9 @@ int virThreadPoolSendJob(virThreadPoolPtr pool,
                          void *jobdata) ATTRIBUTE_NONNULL(1)
                                         ATTRIBUTE_RETURN_CHECK;
 
+int virThreadPoolSetParameters(virThreadPoolPtr pool,
+                               long long int minWorkers,
+                               long long int maxWorkers,
+                               long long int prioWorkers);
+
 #endif