virObjectUnref(srv);
return rv;
}
+
+static int
+adminDispatchServerSetThreadpoolParameters(virNetServerPtr server ATTRIBUTE_UNUSED,
+ virNetServerClientPtr client,
+ virNetMessagePtr msg ATTRIBUTE_UNUSED,
+ virNetMessageErrorPtr rerr,
+ struct admin_server_set_threadpool_parameters_args *args)
+{
+ int rv = -1;
+ virNetServerPtr srv = NULL;
+ virTypedParameterPtr params = NULL;
+ int nparams = 0;
+ struct daemonAdmClientPrivate *priv =
+ virNetServerClientGetPrivateData(client);
+
+ if (!(srv = virNetDaemonGetServer(priv->dmn, args->srv.name))) {
+ virReportError(VIR_ERR_NO_SERVER,
+ _("no server with matching name '%s' found"),
+ args->srv.name);
+ goto cleanup;
+ }
+
+ if (virTypedParamsDeserialize((virTypedParameterRemotePtr) args->params.params_val,
+ args->params.params_len,
+ ADMIN_SERVER_THREADPOOL_PARAMETERS_MAX,
+ ¶ms,
+ &nparams) < 0)
+ goto cleanup;
+
+
+ if (adminServerSetThreadPoolParameters(srv, params,
+ nparams, args->flags) < 0)
+ goto cleanup;
+
+ rv = 0;
+ cleanup:
+ if (rv < 0)
+ virNetMessageSaveError(rerr);
+
+ virTypedParamsFree(params, nparams);
+ virObjectUnref(srv);
+ return rv;
+}
#include "admin_dispatch.h"
#include "virnetserver.h"
#include "virstring.h"
#include "virthreadpool.h"
+#include "virtypedparam.h"
#define VIR_FROM_THIS VIR_FROM_ADMIN
virTypedParamsFree(tmpparams, *nparams);
return ret;
}
+
+int
+adminServerSetThreadPoolParameters(virNetServerPtr srv,
+ virTypedParameterPtr params,
+ int nparams,
+ unsigned int flags)
+{
+ long long int minWorkers = -1;
+ long long int maxWorkers = -1;
+ long long int prioWorkers = -1;
+ virTypedParameterPtr param = NULL;
+
+ virCheckFlags(0, -1);
+
+ if (virTypedParamsValidate(params, nparams,
+ VIR_THREADPOOL_WORKERS_MIN,
+ VIR_TYPED_PARAM_UINT,
+ VIR_THREADPOOL_WORKERS_MAX,
+ VIR_TYPED_PARAM_UINT,
+ VIR_THREADPOOL_WORKERS_PRIORITY,
+ VIR_TYPED_PARAM_UINT,
+ NULL) < 0)
+ return -1;
+
+ if ((param = virTypedParamsGet(params, nparams,
+ VIR_THREADPOOL_WORKERS_MIN)))
+ minWorkers = param->value.ui;
+
+ if ((param = virTypedParamsGet(params, nparams,
+ VIR_THREADPOOL_WORKERS_MAX)))
+ maxWorkers = param->value.ui;
+
+ if ((param = virTypedParamsGet(params, nparams,
+ VIR_THREADPOOL_WORKERS_PRIORITY)))
+ prioWorkers = param->value.ui;
+
+ if (virNetServerSetThreadPoolParameters(srv, minWorkers,
+ maxWorkers, prioWorkers) < 0)
+ return -1;
+
+ return 0;
+}
virTypedParameterPtr *params,
int *nparams,
unsigned int flags);
+int
+adminServerSetThreadPoolParameters(virNetServerPtr srv,
+ virTypedParameterPtr params,
+ int nparams,
+ unsigned int flags);
#endif /* __LIBVIRTD_ADMIN_SERVER_H__ */
int *nparams,
unsigned int flags);
+int virAdmServerSetThreadPoolParameters(virAdmServerPtr srv,
+ virTypedParameterPtr params,
+ int nparams,
+ unsigned int flags);
+
# ifdef __cplusplus
}
# endif
admin_typed_param params<ADMIN_SERVER_THREADPOOL_PARAMETERS_MAX>;
};
+struct admin_server_set_threadpool_parameters_args {
+ admin_nonnull_server srv;
+ admin_typed_param params<ADMIN_SERVER_THREADPOOL_PARAMETERS_MAX>;
+ unsigned int flags;
+};
+
/* Define the program number, protocol version and procedure numbers here. */
const ADMIN_PROGRAM = 0x06900690;
const ADMIN_PROTOCOL_VERSION = 1;
/**
* @generate: none
*/
- ADMIN_PROC_SERVER_GET_THREADPOOL_PARAMETERS = 6
+ ADMIN_PROC_SERVER_GET_THREADPOOL_PARAMETERS = 6,
+
+ /**
+ * @generate: none
+ */
+ ADMIN_PROC_SERVER_SET_THREADPOOL_PARAMETERS = 7
};
virObjectUnlock(priv);
return rv;
}
+
+static int
+remoteAdminServerSetThreadPoolParameters(virAdmServerPtr srv,
+ virTypedParameterPtr params,
+ int nparams,
+ unsigned int flags)
+{
+ int rv = -1;
+ remoteAdminPrivPtr priv = srv->conn->privateData;
+ admin_server_set_threadpool_parameters_args args;
+
+ args.flags = flags;
+ make_nonnull_server(&args.srv, srv);
+
+ virObjectLock(priv);
+
+ if (virTypedParamsSerialize(params, nparams,
+ (virTypedParameterRemotePtr *) &args.params.params_val,
+ &args.params.params_len,
+ 0) < 0)
+ goto cleanup;
+
+
+ if (call(srv->conn, 0, ADMIN_PROC_SERVER_SET_THREADPOOL_PARAMETERS,
+ (xdrproc_t)xdr_admin_server_set_threadpool_parameters_args, (char *) &args,
+ (xdrproc_t)xdr_void, (char *) NULL) == -1)
+ goto cleanup;
+
+ rv = 0;
+ cleanup:
+ virTypedParamsRemoteFree((virTypedParameterRemotePtr) args.params.params_val,
+ args.params.params_len);
+ virObjectUnlock(priv);
+ return rv;
+}
admin_typed_param * params_val;
} params;
};
+struct admin_server_set_threadpool_parameters_args {
+ admin_nonnull_server srv;
+ struct {
+ u_int params_len;
+ admin_typed_param * params_val;
+ } params;
+ u_int flags;
+};
enum admin_procedure {
ADMIN_PROC_CONNECT_OPEN = 1,
ADMIN_PROC_CONNECT_CLOSE = 2,
ADMIN_PROC_CONNECT_LIST_SERVERS = 4,
ADMIN_PROC_CONNECT_LOOKUP_SERVER = 5,
ADMIN_PROC_SERVER_GET_THREADPOOL_PARAMETERS = 6,
+ ADMIN_PROC_SERVER_SET_THREADPOOL_PARAMETERS = 7,
};
virDispatchError(NULL);
return -1;
}
+
+/**
+ * virAdmServerSetThreadPoolParameters:
+ * @srv: a valid server object reference
+ * @params: pointer to threadpool typed parameter objects
+ * @nparams: number of parameters in @params
+ * @flags: extra flags; not used yet, so callers should always pass 0
+ *
+ * Change server threadpool parameters according to @params. Note that some
+ * tunables are read-only, thus any attempt to set them will result in a
+ * failure.
+ *
+ * Returns 0 on success, -1 in case of an error.
+ */
+int
+virAdmServerSetThreadPoolParameters(virAdmServerPtr srv,
+ virTypedParameterPtr params,
+ int nparams,
+ unsigned int flags)
+{
+ VIR_DEBUG("srv=%p, params=%p, nparams=%x, flags=%x",
+ srv, params, nparams, flags);
+
+ virResetLastError();
+
+ virCheckAdmServerReturn(srv, -1);
+ virCheckNonNullArgGoto(params, error);
+
+ if (remoteAdminServerSetThreadPoolParameters(srv, params,
+ nparams, flags) < 0)
+ goto error;
+
+ return 0;
+ error:
+ virDispatchError(NULL);
+ return -1;
+}
xdr_admin_connect_open_args;
xdr_admin_server_get_threadpool_parameters_args;
xdr_admin_server_get_threadpool_parameters_ret;
+xdr_admin_server_set_threadpool_parameters_args;
# datatypes.h
virAdmConnectClass;
virAdmServerGetThreadPoolParameters;
virAdmServerFree;
virAdmConnectLookupServer;
+ virAdmServerSetThreadPoolParameters;
};
virThreadPoolGetPriorityWorkers;
virThreadPoolNewFull;
virThreadPoolSendJob;
+virThreadPoolSetParameters;
# util/virtime.h
virObjectUnlock(srv);
return 0;
}
+
+int
+virNetServerSetThreadPoolParameters(virNetServerPtr srv,
+ long long int minWorkers,
+ long long int maxWorkers,
+ long long int prioWorkers)
+{
+ int ret;
+
+ virObjectLock(srv);
+ ret = virThreadPoolSetParameters(srv->workers, minWorkers,
+ maxWorkers, prioWorkers);
+ virObjectUnlock(srv);
+ return ret;
+}
size_t *nPrioWorkers,
size_t *jobQueueDepth);
+int virNetServerSetThreadPoolParameters(virNetServerPtr srv,
+ long long int minWorkers,
+ long long int maxWorkers,
+ long long int prioWorkers);
+
#endif /* __VIR_NET_SERVER_H__ */
size_t nWorkers;
virThreadPtr workers;
+ size_t maxPrioWorkers;
size_t nPrioWorkers;
virThreadPtr prioWorkers;
virCond prioCond;
bool priority;
};
+/* Test whether the worker needs to quit if the current number of workers @count
+ * is greater than @limit actually allows.
+ */
+static inline bool virThreadPoolWorkerQuitHelper(size_t count, size_t limit)
+{
+ return count > limit;
+}
+
static void virThreadPoolWorker(void *opaque)
{
struct virThreadPoolWorkerData *data = opaque;
virThreadPoolPtr pool = data->pool;
virCondPtr cond = data->cond;
bool priority = data->priority;
+ size_t *curWorkers = priority ? &pool->nPrioWorkers : &pool->nWorkers;
+ size_t *maxLimit = priority ? &pool->maxPrioWorkers : &pool->maxWorkers;
virThreadPoolJobPtr job = NULL;
VIR_FREE(data);
virMutexLock(&pool->mutex);
while (1) {
+ /* In order to support async worker termination, we need ensure that
+ * both busy and free workers know if they need to terminated. Thus,
+ * busy workers need to check for this fact before they start waiting for
+ * another job (and before taking another one from the queue); and
+ * free workers need to check for this right after waking up.
+ */
+ if (virThreadPoolWorkerQuitHelper(*curWorkers, *maxLimit))
+ goto out;
while (!pool->quit &&
((!priority && !pool->jobList.head) ||
(priority && !pool->jobList.firstPrio))) {
}
if (!priority)
pool->freeWorkers--;
+
+ if (virThreadPoolWorkerQuitHelper(*curWorkers, *maxLimit))
+ goto out;
}
if (pool->quit)
static int
virThreadPoolExpand(virThreadPoolPtr pool, size_t gain, bool priority)
{
- virThreadPtr workers = priority ? pool->prioWorkers : pool->workers;
+ virThreadPtr *workers = priority ? &pool->prioWorkers : &pool->workers;
size_t *curWorkers = priority ? &pool->nPrioWorkers : &pool->nWorkers;
size_t i = 0;
struct virThreadPoolWorkerData *data = NULL;
- if (VIR_EXPAND_N(workers, *curWorkers, gain) < 0)
+ if (VIR_EXPAND_N(*workers, *curWorkers, gain) < 0)
return -1;
for (i = 0; i < gain; i++) {
data->cond = priority ? &pool->prioCond : &pool->cond;
data->priority = priority;
- if (virThreadCreateFull(&workers[i],
+ if (virThreadCreateFull(&(*workers)[i],
false,
virThreadPoolWorker,
pool->jobFuncName,
pool->minWorkers = minWorkers;
pool->maxWorkers = maxWorkers;
+ pool->maxPrioWorkers = prioWorkers;
if (virThreadPoolExpand(pool, minWorkers, false) < 0)
goto error;
virMutexUnlock(&pool->mutex);
return -1;
}
+
+int
+virThreadPoolSetParameters(virThreadPoolPtr pool,
+ long long int minWorkers,
+ long long int maxWorkers,
+ long long int prioWorkers)
+{
+ size_t max;
+ size_t min;
+
+ virMutexLock(&pool->mutex);
+
+ max = maxWorkers >= 0 ? maxWorkers : pool->maxWorkers;
+ min = minWorkers >= 0 ? minWorkers : pool->minWorkers;
+ if (min > max) {
+ virReportError(VIR_ERR_INVALID_ARG, "%s",
+ _("minWorkers cannot be larger than maxWorkers"));
+ goto error;
+ }
+
+ if (minWorkers >= 0) {
+ if ((size_t) minWorkers > pool->nWorkers &&
+ virThreadPoolExpand(pool, minWorkers - pool->nWorkers,
+ false) < 0)
+ goto error;
+ pool->minWorkers = minWorkers;
+ }
+
+ if (maxWorkers >= 0) {
+ pool->maxWorkers = maxWorkers;
+ virCondBroadcast(&pool->cond);
+ }
+
+ if (prioWorkers >= 0) {
+ if (prioWorkers < pool->nPrioWorkers) {
+ virCondBroadcast(&pool->prioCond);
+ } else if ((size_t) prioWorkers > pool->nPrioWorkers &&
+ virThreadPoolExpand(pool, prioWorkers - pool->nPrioWorkers,
+ true) < 0) {
+ goto error;
+ }
+ pool->maxPrioWorkers = prioWorkers;
+ }
+
+ virMutexUnlock(&pool->mutex);
+ return 0;
+
+ error:
+ virMutexUnlock(&pool->mutex);
+ return -1;
+}
void *jobdata) ATTRIBUTE_NONNULL(1)
ATTRIBUTE_RETURN_CHECK;
+int virThreadPoolSetParameters(virThreadPoolPtr pool,
+ long long int minWorkers,
+ long long int maxWorkers,
+ long long int prioWorkers);
+
#endif