direct-io.hg
changeset 13581:248a9c36d816
[XEND] Add Task support in Xen API implementation.
Added progress tracking to some common methods like VM.start so the
progress during async invocation.
Signed-off-by: Alastair Tse <atse@xensource.com>
Added progress tracking to some common methods like VM.start so the
progress during async invocation.
Signed-off-by: Alastair Tse <atse@xensource.com>
author | Alastair Tse <atse@xensource.com> |
---|---|
date | Wed Jan 24 13:26:26 2007 +0000 (2007-01-24) |
parents | 259470f0856b |
children | 9292da5e9a27 |
files | tools/python/xen/xend/XendAPI.py tools/python/xen/xend/XendAPIConstants.py tools/python/xen/xend/XendDomainInfo.py tools/python/xen/xend/XendTask.py tools/python/xen/xend/XendTaskManager.py tools/python/xen/xend/server/XMLRPCServer.py |
line diff
1.1 --- a/tools/python/xen/xend/XendAPI.py Wed Jan 24 14:36:03 2007 +0000 1.2 +++ b/tools/python/xen/xend/XendAPI.py Wed Jan 24 13:26:26 2007 +0000 1.3 @@ -22,12 +22,13 @@ import sys 1.4 import traceback 1.5 1.6 from xen.xend import XendDomain, XendDomainInfo, XendNode 1.7 -from xen.xend import XendLogging 1.8 +from xen.xend import XendLogging, XendTaskManager 1.9 1.10 from xen.xend.XendAuthSessions import instance as auth_manager 1.11 from xen.xend.XendError import * 1.12 from xen.xend.XendClient import ERROR_INVALID_DOMAIN 1.13 from xen.xend.XendLogging import log 1.14 +from xen.xend.XendTask import XendTask 1.15 1.16 from xen.xend.XendAPIConstants import * 1.17 from xen.util.xmlrpclib2 import stringify 1.18 @@ -237,16 +238,27 @@ def valid_sr(func): 1.19 'SR_HANDLE_INVALID', func, *args, **kwargs) 1.20 1.21 def valid_pif(func): 1.22 - """Decorator to verify if sr_ref is valid before calling 1.23 + """Decorator to verify if pif_ref is valid before calling 1.24 method. 1.25 1.26 - @param func: function with params: (self, session, sr_ref) 1.27 + @param func: function with params: (self, session, pif_ref) 1.28 @rtype: callable object 1.29 """ 1.30 return lambda *args, **kwargs: \ 1.31 _check_ref(lambda r: r in XendNode.instance().pifs, 1.32 'PIF_HANDLE_INVALID', func, *args, **kwargs) 1.33 1.34 +def valid_task(func): 1.35 + """Decorator to verify if task_ref is valid before calling 1.36 + method. 1.37 + 1.38 + @param func: function with params: (self, session, task_ref) 1.39 + @rtype: callable object 1.40 + """ 1.41 + return lambda *args, **kwargs: \ 1.42 + _check_ref(XendTaskManager.get_task, 1.43 + 'TASK_HANDLE_INVALID', func, *args, **kwargs) 1.44 + 1.45 # ----------------------------- 1.46 # Bridge to Legacy XM API calls 1.47 # ----------------------------- 1.48 @@ -288,18 +300,17 @@ class XendAPI: 1.49 def __init__(self, auth): 1.50 self.auth = auth 1.51 1.52 - 1.53 Base_attr_ro = ['uuid'] 1.54 Base_attr_rw = [] 1.55 - Base_methods = ['destroy', 'get_by_uuid', 'get_record'] 1.56 - Base_funcs = ['create', 'get_all'] 1.57 + Base_methods = [('destroy', None), ('get_record', 'Struct')] 1.58 + Base_funcs = [('get_all', 'Set'), ('get_by_uuid', None)] 1.59 1.60 # Xen API: Class Session 1.61 # ---------------------------------------------------------------- 1.62 # NOTE: Left unwrapped by __init__ 1.63 1.64 session_attr_ro = ['this_host', 'this_user'] 1.65 - session_methods = ['logout'] 1.66 + session_methods = [('logout', None)] 1.67 # session_funcs = ['login_with_password'] 1.68 1.69 def session_login_with_password(self, *args): 1.70 @@ -346,7 +357,77 @@ class XendAPI: 1.71 1.72 # Xen API: Class Tasks 1.73 # ---------------------------------------------------------------- 1.74 - # TODO: NOT IMPLEMENTED YET 1.75 + 1.76 + task_attr_ro = ['status', 1.77 + 'progress', 1.78 + 'eta', 1.79 + 'type', 1.80 + 'result', 1.81 + 'error_code', 1.82 + 'error_info'] 1.83 + 1.84 + task_attr_rw = ['name_label', 1.85 + 'name_description'] 1.86 + 1.87 + task_funcs = [('get_by_name_label', 'Set(task)')] 1.88 + 1.89 + def task_get_status(self, session, task_ref): 1.90 + task = XendTaskManager.get_task(task_ref) 1.91 + return xen_api_success(task.get_status()) 1.92 + 1.93 + def task_get_progress(self, session, task_ref): 1.94 + task = XendTaskManager.get_task(task_ref) 1.95 + return xen_api_success(task.progress) 1.96 + 1.97 + def task_get_eta(self, session, task_ref): 1.98 + task = XendTaskManager.get_task(task_ref) 1.99 + return xen_api_success(task.eta) 1.100 + 1.101 + def task_get_type(self, session, task_ref): 1.102 + task = XendTaskManager.get_task(task_ref) 1.103 + return xen_api_success(task.type) 1.104 + 1.105 + def task_get_result(self, session, task_ref): 1.106 + task = XendTaskManager.get_task(task_ref) 1.107 + return xen_api_success(task.result) 1.108 + 1.109 + def task_get_error_code(self, session, task_ref): 1.110 + task = XendTaskManager.get_task(task_ref) 1.111 + return xen_api_success(task.error_code) 1.112 + 1.113 + def task_get_error_info(self, session, task_ref): 1.114 + task = XendTaskManager.get_task(task_ref) 1.115 + return xen_api_success(task.error_info) 1.116 + 1.117 + def task_get_name_label(self, session, task_ref): 1.118 + task = XendTaskManager.get_task(task_ref) 1.119 + return xen_api_success(task.name_label) 1.120 + 1.121 + def task_get_name_description(self, session, task_ref): 1.122 + task = XendTaskManager.get_task(task_ref) 1.123 + return xen_api_success(task.name_description) 1.124 + 1.125 + def task_set_name_label(self, session, task_ref, label): 1.126 + task = XendTaskManager.get_task(task_ref) 1.127 + task.name_label = label 1.128 + return xen_api_success_void() 1.129 + 1.130 + def task_set_name_description(self, session, task_ref, desc): 1.131 + task = XendTaskManager.get_task(task_ref) 1.132 + task.name_description = desc 1.133 + return xen_api_success_void() 1.134 + 1.135 + def task_get_all(self, session): 1.136 + tasks = XendTaskManager.get_all_tasks() 1.137 + return xen_api_success(tasks) 1.138 + 1.139 + def task_destroy(self, session, task_uuid): 1.140 + XendTaskManager.destroy_task(task_uuid) 1.141 + return xen_api_success_void() 1.142 + 1.143 + def task_get_record(self, session, task_ref): 1.144 + task = XendTaskManager.get_task(task_ref) 1.145 + return xen_api_success(task.get_record()) 1.146 1.147 # Xen API: Class Host 1.148 # ---------------------------------------------------------------- 1.149 @@ -358,12 +439,12 @@ class XendAPI: 1.150 host_attr_rw = ['name_label', 1.151 'name_description'] 1.152 1.153 - host_methods = ['disable', 1.154 - 'enable', 1.155 - 'reboot', 1.156 - 'shutdown'] 1.157 + host_methods = [('disable', None), 1.158 + ('enable', None), 1.159 + ('reboot', None), 1.160 + ('shutdown', None)] 1.161 1.162 - host_funcs = ['get_by_name_label'] 1.163 + host_funcs = [('get_by_name_label', 'Set(host)')] 1.164 1.165 # attributes 1.166 def host_get_name_label(self, session, host_ref): 1.167 @@ -456,8 +537,6 @@ class XendAPI: 1.168 # class methods 1.169 def host_cpu_get_all(self, session): 1.170 return xen_api_success(XendNode.instance().get_host_cpu_refs()) 1.171 - def host_cpu_create(self, session, struct): 1.172 - return xen_api_error(XEND_ERROR_UNSUPPORTED) 1.173 1.174 1.175 # Xen API: Class network 1.176 @@ -468,7 +547,9 @@ class XendAPI: 1.177 'name_description', 1.178 'default_gateway', 1.179 'default_netmask'] 1.180 - 1.181 + 1.182 + network_funcs = [('create', 'network')] 1.183 + 1.184 def network_create(self, _, name_label, name_description, 1.185 default_gateway, default_netmask): 1.186 return xen_api_success( 1.187 @@ -534,7 +615,7 @@ class XendAPI: 1.188 1.189 PIF_attr_inst = PIF_attr_rw 1.190 1.191 - PIF_methods = ['create_VLAN'] 1.192 + PIF_methods = [('create_VLAN', 'int')] 1.193 1.194 def _get_PIF(self, ref): 1.195 return XendNode.instance().pifs[ref] 1.196 @@ -659,18 +740,19 @@ class XendAPI: 1.197 'platform_keymap', 1.198 'otherConfig'] 1.199 1.200 - VM_methods = ['clone', 1.201 - 'start', 1.202 - 'pause', 1.203 - 'unpause', 1.204 - 'clean_shutdown', 1.205 - 'clean_reboot', 1.206 - 'hard_shutdown', 1.207 - 'hard_reboot', 1.208 - 'suspend', 1.209 - 'resume'] 1.210 + VM_methods = [('clone', 'VM'), 1.211 + ('start', None), 1.212 + ('pause', None), 1.213 + ('unpause', None), 1.214 + ('clean_shutdown', None), 1.215 + ('clean_reboot', None), 1.216 + ('hard_shutdown', None), 1.217 + ('hard_reboot', None), 1.218 + ('suspend', None), 1.219 + ('resume', None)] 1.220 1.221 - VM_funcs = ['get_by_name_label'] 1.222 + VM_funcs = [('create', 'VM'), 1.223 + ('get_by_name_label', 'Set(VM)')] 1.224 1.225 # parameters required for _create() 1.226 VM_attr_inst = [ 1.227 @@ -991,7 +1073,8 @@ class XendAPI: 1.228 1.229 def VM_create(self, session, vm_struct): 1.230 xendom = XendDomain.instance() 1.231 - domuuid = xendom.create_domain(vm_struct) 1.232 + domuuid = XendTask.log_progress(0, 100, 1.233 + xendom.create_domain, vm_struct) 1.234 return xen_api_success(domuuid) 1.235 1.236 # object methods 1.237 @@ -1052,31 +1135,49 @@ class XendAPI: 1.238 def VM_clean_reboot(self, session, vm_ref): 1.239 xendom = XendDomain.instance() 1.240 xeninfo = xendom.get_vm_by_uuid(vm_ref) 1.241 - xeninfo.shutdown("reboot") 1.242 + XendTask.log_progress(0, 100, xeninfo.shutdown, "reboot") 1.243 return xen_api_success_void() 1.244 + 1.245 def VM_clean_shutdown(self, session, vm_ref): 1.246 xendom = XendDomain.instance() 1.247 xeninfo = xendom.get_vm_by_uuid(vm_ref) 1.248 - xeninfo.shutdown("poweroff") 1.249 + XendTask.log_progress(0, 100, xeninfo.shutdown, "poweroff") 1.250 return xen_api_success_void() 1.251 + 1.252 def VM_clone(self, session, vm_ref): 1.253 return xen_api_error(XEND_ERROR_UNSUPPORTED) 1.254 + 1.255 def VM_destroy(self, session, vm_ref): 1.256 - return do_vm_func("domain_delete", vm_ref) 1.257 + return XendTask.log_progress(0, 100, do_vm_func, 1.258 + "domain_delete", vm_ref) 1.259 + 1.260 def VM_hard_reboot(self, session, vm_ref): 1.261 - return xen_api_error(XEND_ERROR_UNSUPPORTED) 1.262 + return xen_api_error(XEND_ERROR_UNSUPPORTED) 1.263 + 1.264 def VM_hard_shutdown(self, session, vm_ref): 1.265 - return do_vm_func("domain_destroy", vm_ref) 1.266 + return XendTask.log_progress(0, 100, do_vm_func, 1.267 + "domain_destroy", vm_ref) 1.268 def VM_pause(self, session, vm_ref): 1.269 - return do_vm_func("domain_pause", vm_ref) 1.270 + return XendTask.log_progress(0, 100, do_vm_func, 1.271 + "domain_pause", vm_ref) 1.272 + 1.273 def VM_resume(self, session, vm_ref, start_paused): 1.274 - return do_vm_func("domain_resume", vm_ref, start_paused = start_paused) 1.275 + return XendTask.log_progress(0, 100, do_vm_func, 1.276 + "domain_resume", vm_ref, 1.277 + start_paused = start_paused) 1.278 + 1.279 def VM_start(self, session, vm_ref, start_paused): 1.280 - return do_vm_func("domain_start", vm_ref, start_paused = start_paused) 1.281 + return XendTask.log_progress(0, 100, do_vm_func, 1.282 + "domain_start", vm_ref, 1.283 + start_paused = start_paused) 1.284 + 1.285 def VM_suspend(self, session, vm_ref): 1.286 - return do_vm_func("domain_suspend", vm_ref) 1.287 + return XendTask.log_progress(0, 100, do_vm_func, 1.288 + "domain_suspend", vm_ref) 1.289 + 1.290 def VM_unpause(self, session, vm_ref): 1.291 - return do_vm_func("domain_unpause", vm_ref) 1.292 + return XendTask.log_progress(0, 100, do_vm_func, 1.293 + "domain_unpause", vm_ref) 1.294 1.295 # Xen API: Class VBD 1.296 # ---------------------------------------------------------------- 1.297 @@ -1095,8 +1196,9 @@ class XendAPI: 1.298 1.299 VBD_attr_inst = VBD_attr_rw + ['image'] 1.300 1.301 - VBD_methods = ['media_change'] 1.302 - 1.303 + VBD_methods = [('media_change', None)] 1.304 + VBD_funcs = [('create', 'VBD')] 1.305 + 1.306 # object methods 1.307 def VBD_get_record(self, session, vbd_ref): 1.308 xendom = XendDomain.instance() 1.309 @@ -1191,6 +1293,9 @@ class XendAPI: 1.310 1.311 VIF_attr_inst = VIF_attr_rw 1.312 1.313 + VIF_funcs = [('create', 'VIF')] 1.314 + 1.315 + 1.316 # object methods 1.317 def VIF_get_record(self, session, vif_ref): 1.318 xendom = XendDomain.instance() 1.319 @@ -1242,8 +1347,9 @@ class XendAPI: 1.320 'read_only'] 1.321 VDI_attr_inst = VDI_attr_ro + VDI_attr_rw 1.322 1.323 - VDI_methods = ['snapshot'] 1.324 - VDI_funcs = ['get_by_name_label'] 1.325 + VDI_methods = [('snapshot', 'VDI')] 1.326 + VDI_funcs = [('create', 'VDI'), 1.327 + ('get_by_name_label', 'Set(VDI)')] 1.328 1.329 def _get_VDI(self, ref): 1.330 return XendNode.instance().get_sr().xen_api_get_by_uuid(ref) 1.331 @@ -1369,6 +1475,8 @@ class XendAPI: 1.332 1.333 VTPM_attr_inst = VTPM_attr_rw 1.334 1.335 + VTPM_funcs = [('create', 'VTPM')] 1.336 + 1.337 # object methods 1.338 def VTPM_get_record(self, session, vtpm_ref): 1.339 xendom = XendDomain.instance() 1.340 @@ -1467,8 +1575,9 @@ class XendAPI: 1.341 'name_label', 1.342 'name_description'] 1.343 1.344 - SR_methods = ['clone'] 1.345 - SR_funcs = ['get_by_name_label'] 1.346 + SR_methods = [('clone', 'SR')] 1.347 + SR_funcs = [('get_by_name_label', 'Set(SR)'), 1.348 + ('get_by_uuid', 'SR')] 1.349 1.350 # Class Functions 1.351 def SR_get_all(self, session): 1.352 @@ -1543,6 +1652,67 @@ class XendAPI: 1.353 return xen_api_success_void() 1.354 1.355 1.356 +class XendAPIAsyncProxy: 1.357 + """ A redirector for Async.Class.function calls to XendAPI 1.358 + but wraps the call for use with the XendTaskManager. 1.359 + 1.360 + @ivar xenapi: Xen API instance 1.361 + @ivar method_map: Mapping from XMLRPC method name to callable objects. 1.362 + """ 1.363 + 1.364 + method_prefix = 'Async.' 1.365 + 1.366 + def __init__(self, xenapi): 1.367 + """Initialises the Async Proxy by making a map of all 1.368 + implemented Xen API methods for use with XendTaskManager. 1.369 + 1.370 + @param xenapi: XendAPI instance 1.371 + """ 1.372 + self.xenapi = xenapi 1.373 + self.method_map = {} 1.374 + for method_name in dir(self.xenapi): 1.375 + method = getattr(self.xenapi, method_name) 1.376 + if method_name[0] != '_' and hasattr(method, 'async') \ 1.377 + and method.async == True: 1.378 + self.method_map[method.api] = method 1.379 + 1.380 + def _dispatch(self, method, args): 1.381 + """Overridden method so that SimpleXMLRPCServer will 1.382 + resolve methods through this method rather than through 1.383 + inspection. 1.384 + 1.385 + @param method: marshalled method name from XMLRPC. 1.386 + @param args: marshalled arguments from XMLRPC. 1.387 + """ 1.388 + 1.389 + # Only deal with method names that start with "Async." 1.390 + if not method.startswith(self.method_prefix): 1.391 + raise Exception('Method %s not supported' % method) 1.392 + 1.393 + # Require 'session' argument to be present. 1.394 + if len(args) < 1: 1.395 + raise Exception('Not enough arguments') 1.396 + 1.397 + # Lookup synchronous version of the method 1.398 + synchronous_method_name = method[len(self.method_prefix):] 1.399 + if synchronous_method_name not in self.method_map: 1.400 + raise Exception('Method %s not supported' % method) 1.401 + 1.402 + method = self.method_map[synchronous_method_name] 1.403 + 1.404 + # Validate the session before proceeding 1.405 + session = args[0] 1.406 + if not auth_manager().is_session_valid(session): 1.407 + return xen_api_error(['SESSION_INVALID', session]) 1.408 + 1.409 + # create and execute the task, and return task_uuid 1.410 + return_type = getattr(method, 'return_type', None) 1.411 + task_uuid = XendTaskManager.create_task(method, args, 1.412 + synchronous_method_name, 1.413 + return_type, 1.414 + synchronous_method_name) 1.415 + return xen_api_success(task_uuid) 1.416 + 1.417 def _decorate(): 1.418 """Initialise Xen API wrapper by making sure all functions 1.419 have the correct validation decorators such as L{valid_host} 1.420 @@ -1561,7 +1731,8 @@ def _decorate(): 1.421 'VDI' : valid_vdi, 1.422 'VTPM' : valid_vtpm, 1.423 'SR' : valid_sr, 1.424 - 'PIF' : valid_pif 1.425 + 'PIF' : valid_pif, 1.426 + 'task' : valid_task, 1.427 } 1.428 1.429 # Cheat methods 1.430 @@ -1582,17 +1753,11 @@ def _decorate(): 1.431 setattr(XendAPI, get_by_uuid, _get_by_uuid) 1.432 setattr(XendAPI, get_uuid, _get_uuid) 1.433 1.434 - # 2. get_record is just getting all the attributes, so provide 1.435 - # a fake template implementation. 1.436 - # 1.437 - # TODO: ... 1.438 - 1.439 - 1.440 # Wrapping validators around XMLRPC calls 1.441 # --------------------------------------- 1.442 1.443 for cls, validator in classes.items(): 1.444 - def doit(n, takes_instance): 1.445 + def doit(n, takes_instance, async_support = False, return_type = None): 1.446 n_ = n.replace('.', '_') 1.447 try: 1.448 f = getattr(XendAPI, n_) 1.449 @@ -1604,6 +1769,10 @@ def _decorate(): 1.450 for v in validators: 1.451 f = v(f) 1.452 f.api = n 1.453 + f.async = async_support 1.454 + if return_type: 1.455 + f.return_type = return_type 1.456 + 1.457 setattr(XendAPI, n_, f) 1.458 except AttributeError: 1.459 log.warn("API call: %s not found" % n) 1.460 @@ -1616,19 +1785,21 @@ def _decorate(): 1.461 1.462 # wrap validators around readable class attributes 1.463 for attr_name in ro_attrs + rw_attrs + XendAPI.Base_attr_ro: 1.464 - doit('%s.get_%s' % (cls, attr_name), True) 1.465 + doit('%s.get_%s' % (cls, attr_name), True, async_support = False) 1.466 1.467 # wrap validators around writable class attrributes 1.468 for attr_name in rw_attrs + XendAPI.Base_attr_rw: 1.469 - doit('%s.set_%s' % (cls, attr_name), True) 1.470 + doit('%s.set_%s' % (cls, attr_name), True, async_support = False) 1.471 1.472 # wrap validators around methods 1.473 - for method_name in methods + XendAPI.Base_methods: 1.474 - doit('%s.%s' % (cls, method_name), True) 1.475 + for method_name, return_type in methods + XendAPI.Base_methods: 1.476 + doit('%s.%s' % (cls, method_name), True, async_support = True) 1.477 1.478 # wrap validators around class functions 1.479 - for func_name in funcs + XendAPI.Base_funcs: 1.480 - doit('%s.%s' % (cls, func_name), False) 1.481 + for func_name, return_type in funcs + XendAPI.Base_funcs: 1.482 + doit('%s.%s' % (cls, func_name), False, async_support = True, 1.483 + return_type = return_type) 1.484 + 1.485 1.486 _decorate() 1.487
2.1 --- a/tools/python/xen/xend/XendAPIConstants.py Wed Jan 24 14:36:03 2007 +0000 2.2 +++ b/tools/python/xen/xend/XendAPIConstants.py Wed Jan 24 13:26:26 2007 +0000 2.3 @@ -74,3 +74,4 @@ XEN_API_VBD_MODE = ['RO', 'RW'] 2.4 XEN_API_VDI_TYPE = ['system', 'user', 'ephemeral'] 2.5 XEN_API_DRIVER_TYPE = ['ioemu', 'paravirtualised'] 2.6 XEN_API_VBD_TYPE = ['CD', 'Disk'] 2.7 +XEN_API_TASK_STATUS_TYPE = ['pending', 'success', 'failure']
3.1 --- a/tools/python/xen/xend/XendDomainInfo.py Wed Jan 24 14:36:03 2007 +0000 3.2 +++ b/tools/python/xen/xend/XendDomainInfo.py Wed Jan 24 13:26:26 2007 +0000 3.3 @@ -44,6 +44,7 @@ from xen.xend.XendConfig import scrub_pa 3.4 from xen.xend.XendBootloader import bootloader, bootloader_tidy 3.5 from xen.xend.XendError import XendError, VmError 3.6 from xen.xend.XendDevices import XendDevices 3.7 +from xen.xend.XendTask import XendTask 3.8 from xen.xend.xenstore.xstransact import xstransact, complete 3.9 from xen.xend.xenstore.xsutil import GetDomainPath, IntroduceDomain, ResumeDomain 3.10 from xen.xend.xenstore.xswatch import xswatch 3.11 @@ -387,12 +388,13 @@ class XendDomainInfo: 3.12 3.13 if self.state == DOM_STATE_HALTED: 3.14 try: 3.15 - self._constructDomain() 3.16 - self._initDomain() 3.17 - self._storeVmDetails() 3.18 - self._storeDomDetails() 3.19 - self._registerWatches() 3.20 - self.refreshShutdown() 3.21 + XendTask.log_progress(0, 30, self._constructDomain) 3.22 + XendTask.log_progress(31, 60, self._initDomain) 3.23 + 3.24 + XendTask.log_progress(61, 70, self._storeVmDetails) 3.25 + XendTask.log_progress(71, 80, self._storeDomDetails) 3.26 + XendTask.log_progress(81, 90, self._registerWatches) 3.27 + XendTask.log_progress(91, 100, self.refreshShutdown) 3.28 3.29 # save running configuration if XendDomains believe domain is 3.30 # persistent
4.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 4.2 +++ b/tools/python/xen/xend/XendTask.py Wed Jan 24 13:26:26 2007 +0000 4.3 @@ -0,0 +1,226 @@ 4.4 +#=========================================================================== 4.5 +# This library is free software; you can redistribute it and/or 4.6 +# modify it under the terms of version 2.1 of the GNU Lesser General Public 4.7 +# License as published by the Free Software Foundation. 4.8 +# 4.9 +# This library is distributed in the hope that it will be useful, 4.10 +# but WITHOUT ANY WARRANTY; without even the implied warranty of 4.11 +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 4.12 +# Lesser General Public License for more details. 4.13 +# 4.14 +# You should have received a copy of the GNU Lesser General Public 4.15 +# License along with this library; if not, write to the Free Software 4.16 +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 4.17 +#============================================================================ 4.18 +# Copyright (C) 2007 XenSource Ltd 4.19 +#============================================================================ 4.20 + 4.21 +from xen.xend.XendAPIConstants import XEN_API_TASK_STATUS_TYPE 4.22 +from xen.xend.XendLogging import log 4.23 +import thread 4.24 +import threading 4.25 + 4.26 +class XendTask(threading.Thread): 4.27 + """Represents a Asynchronous Task used by Xen API. 4.28 + 4.29 + Basically proxies the callable object in a thread and returns the 4.30 + results via self.{type,result,error_code,error_info}. 4.31 + 4.32 + @cvar task_progress: Thread local storage for progress tracking. 4.33 + It is a dict indexed by thread_id. Note that the 4.34 + thread_id may be reused when the previous 4.35 + thread with the thread_id ends. 4.36 + 4.37 + @cvar task_progress_lock: lock on thread access to task_progress 4.38 + 4.39 + """ 4.40 + 4.41 + # progress stack: 4.42 + # thread_id : [(start_task, end_task), 4.43 + # (start_sub_task, end_sub_task)..] 4.44 + # example : (0, 100), (50, 100) (50, 100) ... 4.45 + # That would mean that the task is 75% complete. 4.46 + # as it is 50% of the last 50% of the task. 4.47 + 4.48 + task_progress = {} 4.49 + task_progress_lock = threading.Lock() 4.50 + 4.51 + def __init__(self, uuid, func, args, func_name, return_type = None, 4.52 + label = None, desc = None): 4.53 + """ 4.54 + @param uuid: UUID of the task 4.55 + @type uuid: string 4.56 + @param func: Method to call (from XendAPI) 4.57 + @type func: callable object 4.58 + @param args: arguments to pass to function 4.59 + @type args: list or tuple 4.60 + @param label: name label of the task. 4.61 + @type label: string 4.62 + @param desc: name description of the task. 4.63 + @type desc: string 4.64 + @param func_name: function name, eg ('VM.start') 4.65 + @type desc: string 4.66 + """ 4.67 + 4.68 + threading.Thread.__init__(self) 4.69 + self.status_lock = threading.Lock() 4.70 + self.status = XEN_API_TASK_STATUS_TYPE[0] 4.71 + 4.72 + self.progress = 0 4.73 + self.eta = None # TODO: we have no time estimates 4.74 + self.type = return_type 4.75 + self.uuid = uuid 4.76 + 4.77 + self.result = None 4.78 + self.error_code = '' 4.79 + self.error_info = [] 4.80 + 4.81 + self.name_label = label or func.__name__ 4.82 + self.name_description = desc 4.83 + self.thread_id = 0 4.84 + 4.85 + self.func_name = func_name 4.86 + self.func = func 4.87 + self.args = args 4.88 + 4.89 + def set_status(self, new_status): 4.90 + self.status_lock.acquire() 4.91 + try: 4.92 + self.status = new_status 4.93 + finally: 4.94 + self.status_lock.release() 4.95 + 4.96 + def get_status(self): 4.97 + self.status_lock.acquire() 4.98 + try: 4.99 + return self.status 4.100 + finally: 4.101 + self.status_lock.release() 4.102 + 4.103 + def run(self): 4.104 + """Runs the method and stores the result for later access. 4.105 + 4.106 + Is invoked by threading.Thread.start(). 4.107 + """ 4.108 + 4.109 + self.thread_id = thread.get_ident() 4.110 + self.task_progress_lock.acquire() 4.111 + try: 4.112 + self.task_progress[self.thread_id] = {} 4.113 + self.progress = 0 4.114 + finally: 4.115 + self.task_progress_lock.release() 4.116 + 4.117 + try: 4.118 + result = self.func(*self.args) 4.119 + if result['Status'] == 'Success': 4.120 + self.result = result['Value'] 4.121 + self.set_status(XEN_API_TASK_STATUS_TYPE[1]) 4.122 + else: 4.123 + self.error_code = result['ErrorDescription'][0] 4.124 + self.error_info = result['ErrorDescription'][1:] 4.125 + self.set_status(XEN_API_TASK_STATUS_TYPE[2]) 4.126 + except Exception, e: 4.127 + log.exception('Error running Async Task') 4.128 + self.error_code = 'INTERNAL ERROR' 4.129 + self.error_info = [str(e)] 4.130 + self.set_status(XEN_API_TASK_STATUS_TYPE[2]) 4.131 + 4.132 + self.task_progress_lock.acquire() 4.133 + try: 4.134 + del self.task_progress[self.thread_id] 4.135 + self.progress = 100 4.136 + finally: 4.137 + self.task_progress_lock.release() 4.138 + 4.139 + def get_record(self): 4.140 + """Returns a Xen API compatible record.""" 4.141 + return { 4.142 + 'uuid': self.uuid, 4.143 + 'name_label': self.name_label, 4.144 + 'name_description': self.name_description, 4.145 + 'status': self.status, 4.146 + 'progress': self.get_progress(), 4.147 + 'eta': self.eta, 4.148 + 'type': self.type, 4.149 + 'result': self.result, 4.150 + 'error_code': self.error_code, 4.151 + 'error_info': self.error_info, 4.152 + } 4.153 + 4.154 + def get_progress(self): 4.155 + """ Checks the thread local progress storage. """ 4.156 + if self.status != XEN_API_TASK_STATUS_TYPE[0]: 4.157 + return 100 4.158 + 4.159 + self.task_progress_lock.acquire() 4.160 + try: 4.161 + # Pop each progress range in the stack and map it on to 4.162 + # the next progress range until we find out cumulative 4.163 + # progress based on the (start, end) range of each level 4.164 + start = 0 4.165 + prog_stack = self.task_progress.get(self.thread_id, [])[:] 4.166 + if len(prog_stack) > 0: 4.167 + start, stop = prog_stack.pop() 4.168 + while prog_stack: 4.169 + new_start, new_stop = prog_stack.pop() 4.170 + start = new_start + ((new_stop - new_start)/100.0 * start) 4.171 + 4.172 + # only update progress if it increases, this will prevent 4.173 + # progress from going backwards when tasks are popped off 4.174 + # the stack 4.175 + if start > self.progress: 4.176 + self.progress = int(start) 4.177 + finally: 4.178 + self.task_progress_lock.release() 4.179 + 4.180 + return self.progress 4.181 + 4.182 + 4.183 + def log_progress(cls, progress_min, progress_max, 4.184 + func, *args, **kwds): 4.185 + """ Callable function wrapper that logs the progress of the 4.186 + function to thread local storage for task progress calculation. 4.187 + 4.188 + This is a class method so other parts of Xend will update 4.189 + the task progress by calling: 4.190 + 4.191 + XendTask.push_progress(progress_min, progress_max, 4.192 + func, *args, **kwds) 4.193 + 4.194 + The results of the progress is stored in thread local storage 4.195 + and the result of the func(*args, **kwds) is returned back 4.196 + to the caller. 4.197 + 4.198 + """ 4.199 + thread_id = thread.get_ident() 4.200 + retval = None 4.201 + 4.202 + # Log the start of the method 4.203 + cls.task_progress_lock.acquire() 4.204 + try: 4.205 + if type(cls.task_progress.get(thread_id)) != list: 4.206 + cls.task_progress[thread_id] = [] 4.207 + 4.208 + cls.task_progress[thread_id].append((progress_min, 4.209 + progress_max)) 4.210 + finally: 4.211 + cls.task_progress_lock.release() 4.212 + 4.213 + # Execute the method 4.214 + retval = func(*args, **kwds) 4.215 + 4.216 + # Log the end of the method by popping the progress range 4.217 + # off the stack. 4.218 + cls.task_progress_lock.acquire() 4.219 + try: 4.220 + cls.task_progress[thread_id].pop() 4.221 + finally: 4.222 + cls.task_progress_lock.release() 4.223 + 4.224 + return retval 4.225 + 4.226 + log_progress = classmethod(log_progress) 4.227 + 4.228 + 4.229 +
5.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 5.2 +++ b/tools/python/xen/xend/XendTaskManager.py Wed Jan 24 13:26:26 2007 +0000 5.3 @@ -0,0 +1,110 @@ 5.4 +#=========================================================================== 5.5 +# This library is free software; you can redistribute it and/or 5.6 +# modify it under the terms of version 2.1 of the GNU Lesser General Public 5.7 +# License as published by the Free Software Foundation. 5.8 +# 5.9 +# This library is distributed in the hope that it will be useful, 5.10 +# but WITHOUT ANY WARRANTY; without even the implied warranty of 5.11 +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 5.12 +# Lesser General Public License for more details. 5.13 +# 5.14 +# You should have received a copy of the GNU Lesser General Public 5.15 +# License along with this library; if not, write to the Free Software 5.16 +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 5.17 +#============================================================================ 5.18 +# Copyright (C) 2007 XenSource Ltd 5.19 +#============================================================================ 5.20 + 5.21 +""" 5.22 +Task Manager for Xen API asynchronous tasks. 5.23 + 5.24 +Stores all tasks in a simple dictionary in module's own local storage to 5.25 +avoid the 'instance()' methods. 5.26 + 5.27 +Tasks are indexed by UUID. 5.28 + 5.29 +""" 5.30 + 5.31 +from xen.xend.XendTask import XendTask 5.32 +from xen.xend import uuid 5.33 +import threading 5.34 + 5.35 +tasks = {} 5.36 +tasks_lock = threading.Lock() 5.37 + 5.38 +def create_task(func, args, func_name, return_type = None, label = ''): 5.39 + """Creates a new Task and registers it with the XendTaskManager. 5.40 + 5.41 + @param func: callable object XMLRPC method 5.42 + @type func: callable object 5.43 + @param args: tuple or list of arguments 5.44 + @type args: tuple or list 5.45 + @param func_name: XMLRPC method name, so we can estimate the progress 5.46 + @type func_name: string 5.47 + 5.48 + @return: Task UUID 5.49 + @rtype: string. 5.50 + """ 5.51 + task_uuid = uuid.createString() 5.52 + try: 5.53 + tasks_lock.acquire() 5.54 + task = XendTask(task_uuid, func, args, func_name, 5.55 + return_type = return_type, label = label) 5.56 + tasks[task_uuid] = task 5.57 + finally: 5.58 + tasks_lock.release() 5.59 + 5.60 + task.start() 5.61 + 5.62 + return task_uuid 5.63 + 5.64 +def destroy_task(task_uuid): 5.65 + """Destroys a task. 5.66 + 5.67 + @param task_uuid: Task UUID 5.68 + @type task_uuid: string. 5.69 + """ 5.70 + try: 5.71 + tasks_lock.acquire() 5.72 + if task_uuid in tasks: 5.73 + del tasks[task_uuid] 5.74 + finally: 5.75 + tasks_lock.release() 5.76 + 5.77 +def get_all_tasks(): 5.78 + """ Returns all the UUID of tracked tasks, completed or pending. 5.79 + 5.80 + @returns: list of UUIDs 5.81 + @rtype: list of strings 5.82 + """ 5.83 + try: 5.84 + tasks_lock.acquire() 5.85 + return tasks.keys() 5.86 + finally: 5.87 + tasks_lock.release() 5.88 + 5.89 +def get_task(task_uuid): 5.90 + """ Retrieves a task by UUID. 5.91 + 5.92 + @rtype: XendTask or None 5.93 + @return: Task denoted by UUID. 5.94 + """ 5.95 + try: 5.96 + tasks_lock.acquire() 5.97 + return tasks.get(task_uuid) 5.98 + finally: 5.99 + tasks_lock.release() 5.100 + 5.101 +def get_tasks_by_name(task_name): 5.102 + """ Retrieves a task by UUID. 5.103 + 5.104 + @rtype: XendTask or None 5.105 + @return: Task denoted by UUID. 5.106 + """ 5.107 + try: 5.108 + tasks_lock.acquire() 5.109 + return [t.uuid for t in tasks if t.name_label == name] 5.110 + finally: 5.111 + tasks_lock.release() 5.112 + 5.113 +
6.1 --- a/tools/python/xen/xend/server/XMLRPCServer.py Wed Jan 24 14:36:03 2007 +0000 6.2 +++ b/tools/python/xen/xend/server/XMLRPCServer.py Wed Jan 24 13:26:26 2007 +0000 6.3 @@ -139,6 +139,8 @@ class XMLRPCServer: 6.4 meth = getattr(self.xenapi, meth_name) 6.5 if callable(meth) and hasattr(meth, 'api'): 6.6 self.server.register_function(meth, getattr(meth, 'api')) 6.7 + 6.8 + self.server.register_instance(XendAPI.XendAPIAsyncProxy(self.xenapi)) 6.9 6.10 # Legacy deprecated xm xmlrpc api 6.11 # --------------------------------------------------------------------