ia64/xen-unstable

view tools/python/xen/xend/XendMigrate.py @ 2570:30a9b33481dc

bitkeeper revision 1.1159.90.2 (415be9d5hTw1zLV9fA-AYcekmwhMwg)

Discard devices early for local migrate.
author mjw@wray-m-3.hpl.hp.com
date Thu Sep 30 11:11:17 2004 +0000 (2004-09-30)
parents f7b2e90dac20
children 3af1c3b54760
line source
1 # Copyright (C) 2004 Mike Wray <mike.wray@hp.com>
3 import traceback
5 import errno
6 import sys
7 import socket
8 import time
10 from twisted.internet import reactor
11 from twisted.internet import defer
12 #defer.Deferred.debug = 1
13 from twisted.internet.protocol import Protocol
14 from twisted.internet.protocol import ClientFactory
16 import sxp
17 import XendDB
18 import EventServer; eserver = EventServer.instance()
19 from XendError import XendError
20 from XendLogging import log
22 """The port for the migrate/save daemon xfrd."""
23 XFRD_PORT = 8002
25 """The transfer protocol major version number."""
26 XFR_PROTO_MAJOR = 1
27 """The transfer protocol minor version number."""
28 XFR_PROTO_MINOR = 0
30 class Xfrd(Protocol):
31 """Protocol handler for a connection to the migration/save daemon xfrd.
32 """
34 def __init__(self, xinfo):
35 self.parser = sxp.Parser()
36 self.xinfo = xinfo
38 def connectionMade(self):
39 # Send hello.
40 self.request(['xfr.hello', XFR_PROTO_MAJOR, XFR_PROTO_MINOR])
41 # Send request.
42 self.xinfo.request(self)
44 def request(self, req):
45 sxp.show(req, out=self.transport)
47 def loseConnection(self):
48 print 'Xfrd>loseConnection>'
49 self.transport.loseConnection()
51 def connectionLost(self, reason):
52 print 'Xfrd>connectionLost>', reason
53 self.xinfo.connectionLost(reason)
55 def dataReceived(self, data):
56 self.parser.input(data)
57 if self.parser.ready():
58 val = self.parser.get_val()
59 self.xinfo.dispatch(self, val)
60 if self.parser.at_eof():
61 self.loseConnection()
64 class XfrdClientFactory(ClientFactory):
65 """Factory for clients of the migration/save daemon xfrd.
66 """
68 def __init__(self, xinfo):
69 #ClientFactory.__init__(self)
70 self.xinfo = xinfo
72 def startedConnecting(self, connector):
73 print 'Started to connect', 'self=', self, 'connector=', connector
75 def buildProtocol(self, addr):
76 print 'buildProtocol>', addr
77 return Xfrd(self.xinfo)
79 def clientConnectionLost(self, connector, reason):
80 print 'clientConnectionLost>', 'connector=', connector, 'reason=', reason
82 def clientConnectionFailed(self, connector, reason):
83 print 'clientConnectionFailed>', 'connector=', connector, 'reason=', reason
84 self.xinfo.error(reason)
86 class XfrdInfo:
87 """Abstract class for info about a session with xfrd.
88 Has subclasses for save and migrate.
89 """
91 """Suspend timeout (seconds).
92 We set a timeout because suspending a domain can hang."""
93 timeout = 30
95 def __init__(self):
96 from xen.xend import XendDomain
97 self.xd = XendDomain.instance()
98 self.deferred = defer.Deferred()
99 self.suspended = {}
100 self.paused = {}
102 def vmconfig(self):
103 dominfo = self.xd.domain_get(self.src_dom)
104 if dominfo:
105 val = sxp.to_string(dominfo.sxpr())
106 else:
107 val = None
108 return val
110 def error(self, err):
111 print 'Error>', err
112 self.state = 'error'
113 if not self.deferred.called:
114 print 'Error> calling errback'
115 self.deferred.errback(err)
117 def dispatch(self, xfrd, val):
119 def cbok(v):
120 if v is None: return
121 sxp.show(v, out=xfrd.transport)
123 def cberr(err):
124 v = ['xfr.err', errno.EINVAL]
125 sxp.show(v, out=xfrd.transport)
126 self.error(err)
128 op = sxp.name(val)
129 op = op.replace('.', '_')
130 if op.startswith('xfr_'):
131 fn = getattr(self, op, self.unknown)
132 else:
133 fn = self.unknown
134 val = fn(xfrd, val)
135 if isinstance(val, defer.Deferred):
136 val.addCallback(cbok)
137 val.addErrback(cberr)
138 else:
139 cbok(val)
141 def unknown(self, xfrd, val):
142 print 'unknown>', val
143 xfrd.loseConnection()
144 return None
146 def xfr_err(self, xfrd, val):
147 # If we get an error with non-zero code the operation failed.
148 # An error with code zero indicates hello success.
149 print 'xfr_err>', val
150 v = sxp.child0(val)
151 print 'xfr_err>', type(v), v
152 err = int(sxp.child0(val))
153 if not err: return
154 self.error(err);
155 xfrd.loseConnection()
156 return None
158 def xfr_progress(self, xfrd, val):
159 print 'xfr_progress>', val
160 return None
162 def xfr_vm_destroy(self, xfrd, val):
163 print 'xfr_vm_destroy>', val
164 try:
165 vmid = sxp.child0(val)
166 val = self.xd.domain_destroy(vmid)
167 if vmid in self.paused:
168 del self.paused[vmid]
169 if vmid in self.suspended:
170 del self.suspended[vmid]
171 except:
172 val = errno.EINVAL
173 return ['xfr.err', val]
175 def xfr_vm_pause(self, xfrd, val):
176 print 'xfr_vm_pause>', val
177 try:
178 vmid = sxp.child0(val)
179 val = self.xd.domain_pause(vmid)
180 self.paused[vmid] = 1
181 except:
182 val = errno.EINVAL
183 return ['xfr.err', val]
185 def xfr_vm_unpause(self, xfrd, val):
186 print 'xfr_vm_unpause>', val
187 try:
188 vmid = sxp.child0(val)
189 val = self.xd.domain_unpause(vmid)
190 if vmid in self.paused:
191 del self.paused[vmid]
192 except:
193 val = errno.EINVAL
194 return ['xfr.err', val]
196 def xfr_vm_suspend(self, xfrd, val):
197 """Suspend a domain. Suspending takes time, so we return
198 a Deferred that is called when the suspend completes.
199 Suspending can hang, so we set a timeout and fail if it
200 takes too long.
201 """
202 print 'xfr_vm_suspend>', val
203 try:
204 vmid = sxp.child0(val)
205 d = defer.Deferred()
206 # Subscribe to 'suspended' events so we can tell when the
207 # suspend completes. Subscribe to 'died' events so we can tell if
208 # the domain died. Set a timeout and error handler so the subscriptions
209 # will be cleaned up if suspending hangs or there is an error.
210 def onSuspended(e, v):
211 print 'xfr_vm_suspend>onSuspended>', e, v
212 if v[1] != vmid: return
213 subscribe(on=0)
214 d.callback(v)
216 def onDied(e, v):
217 print 'xfr_vm_suspend>onDied>', e, v
218 if v[1] != vmid: return
219 d.errback(XendError('Domain died'))
221 def subscribe(on=1):
222 if on:
223 action = eserver.subscribe
224 else:
225 action = eserver.unsubscribe
226 action('xend.domain.suspended', onSuspended)
227 action('xend.domain.died', onDied)
229 def cberr(err):
230 print 'xfr_vm_suspend>cberr>', err
231 subscribe(on=0)
232 return err
234 subscribe()
235 val = self.xd.domain_shutdown(vmid, reason='suspend')
236 self.suspended[vmid] = 1
237 d.addErrback(cberr)
238 d.setTimeout(self.timeout)
239 return d
240 except Exception, err:
241 print 'xfr_vm_suspend> Exception', err
242 traceback.print_exc()
243 val = errno.EINVAL
244 return ['xfr.err', val]
246 def connectionLost(self, reason=None):
247 print 'XfrdInfo>connectionLost>', reason
248 for vmid in self.suspended:
249 try:
250 self.xd.domain_destroy(vmid)
251 except:
252 pass
253 for vmid in self.paused:
254 try:
255 self.xd.domain_unpause(vmid)
256 except:
257 pass
259 class XendMigrateInfo(XfrdInfo):
260 """Representation of a migrate in-progress and its interaction with xfrd.
261 """
263 def __init__(self, xid, dominfo, host, port, live):
264 XfrdInfo.__init__(self)
265 self.xid = xid
266 self.dominfo = dominfo
267 self.state = 'begin'
268 self.src_host = socket.gethostname()
269 self.src_dom = dominfo.id
270 self.dst_host = host
271 self.dst_port = port
272 self.dst_dom = None
273 self.live = live
274 self.start = 0
276 def sxpr(self):
277 sxpr = ['migrate',
278 ['id', self.xid ],
279 ['state', self.state ],
280 ['live', self.live ] ]
281 sxpr_src = ['src', ['host', self.src_host], ['domain', self.src_dom] ]
282 sxpr.append(sxpr_src)
283 sxpr_dst = ['dst', ['host', self.dst_host] ]
284 if self.dst_dom:
285 sxpr_dst.append(['domain', self.dst_dom])
286 sxpr.append(sxpr_dst)
287 return sxpr
289 def request(self, xfrd):
290 vmconfig = self.vmconfig()
291 if not vmconfig:
292 xfrd.loseConnection()
293 return
294 log.info('Migrate BEGIN: ' + str(self.sxpr()))
295 eserver.inject('xend.domain.migrate',
296 [ self.dominfo.name, self.dominfo.id,
297 "begin", self.sxpr() ])
298 # Special case for localhost: destroy all devices early.
299 if 0 and self.dst_host in ["localhost", "127.0.0.1"]:
300 self.dominfo.restart_cancel()
301 self.dominfo.cleanup()
302 #self.dominfo.destroy_console()
303 xfrd.request(['xfr.migrate',
304 self.src_dom,
305 vmconfig,
306 self.dst_host,
307 self.dst_port,
308 self.live ])
310 def xfr_migrate_ok(self, xfrd, val):
311 dom = int(sxp.child0(val))
312 self.state = 'ok'
313 self.dst_dom = dom
314 self.xd.domain_destroy(self.src_dom)
315 if not self.deferred.called:
316 self.deferred.callback(self)
318 def connectionLost(self, reason=None):
319 print 'XfrdMigrateInfo>connectionLost>', reason
320 XfrdInfo.connectionLost(self, reason)
321 if self.state =='ok':
322 log.info('Migrate OK: ' + str(self.sxpr()))
323 else:
324 self.state = 'error'
325 self.error(XendError("migrate failed"))
326 log.info('Migrate ERROR: ' + str(self.sxpr()))
327 eserver.inject('xend.domain.migrate',
328 [ self.dominfo.name, self.dominfo.id,
329 self.state, self.sxpr() ])
331 class XendSaveInfo(XfrdInfo):
332 """Representation of a save in-progress and its interaction with xfrd.
333 """
335 def __init__(self, xid, dominfo, file):
336 XfrdInfo.__init__(self)
337 self.xid = xid
338 self.dominfo = dominfo
339 self.state = 'begin'
340 self.src_dom = dominfo.id
341 self.file = file
342 self.start = 0
344 def sxpr(self):
345 sxpr = ['save',
346 ['id', self.xid],
347 ['state', self.state],
348 ['domain', self.src_dom],
349 ['file', self.file] ]
350 return sxpr
352 def request(self, xfrd):
353 print '***request>', self.vmconfig()
354 vmconfig = self.vmconfig()
355 if not vmconfig:
356 xfrd.loseConnection()
357 return
358 print '***request> begin'
359 log.info('Save BEGIN: ' + str(self.sxpr()))
360 eserver.inject('xend.domain.save',
361 [self.dominfo.name, self.dominfo.id,
362 "begin", self.sxpr()])
363 xfrd.request(['xfr.save', self.src_dom, vmconfig, self.file ])
365 def xfr_save_ok(self, xfrd, val):
366 self.state = 'ok'
367 self.xd.domain_destroy(self.src_dom)
368 if not self.deferred.called:
369 self.deferred.callback(self)
371 def connectionLost(self, reason=None):
372 print 'XfrdSaveInfo>connectionLost>', reason
373 XfrdInfo.connectionLost(self, reason)
374 if self.state =='ok':
375 log.info('Save OK: ' + str(self.sxpr()))
376 else:
377 self.state = 'error'
378 self.error(XendError("save failed"))
379 log.info('Save ERROR: ' + str(self.sxpr()))
380 eserver.inject('xend.domain.save',
381 [ self.dominfo.name, self.dominfo.id,
382 self.state, self.sxpr() ])
385 class XendMigrate:
386 """External api for interaction with xfrd for migrate and save.
387 Singleton.
388 """
389 # Use log for indications of begin/end/errors?
390 # Need logging of: domain create/halt, migrate begin/end/fail
391 # Log via event server?
393 dbpath = "migrate"
395 def __init__(self):
396 self.db = XendDB.XendDB(self.dbpath)
397 self.session = {}
398 self.session_db = self.db.fetchall("")
399 self.xid = 0
401 def nextid(self):
402 self.xid += 1
403 return "%d" % self.xid
405 def sync(self):
406 self.db.saveall("", self.session_db)
408 def sync_session(self, xid):
409 print 'sync_session>', type(xid), xid, self.session_db[xid]
410 self.db.save(xid, self.session_db[xid])
412 def close(self):
413 pass
415 def _add_session(self, info):
416 xid = info.xid
417 self.session[xid] = info
418 self.session_db[xid] = info.sxpr()
419 self.sync_session(xid)
421 def _delete_session(self, xid):
422 print '***_delete_session>', xid
423 if xid in self.session:
424 del self.session[xid]
425 if xid in self.session_db:
426 del self.session_db[xid]
427 self.db.delete(xid)
429 def session_ls(self):
430 return self.session.keys()
432 def sessions(self):
433 return self.session.values()
435 def session_get(self, xid):
436 return self.session.get(xid)
438 def session_begin(self, info):
439 """Add the session to the table and start it.
440 Set up callbacks to remove the session from the table
441 when it finishes.
443 @param info: session
444 @return: deferred
445 """
446 def cbremove(val):
447 print '***cbremove>', val
448 self._delete_session(info.xid)
449 return val
450 self._add_session(info)
451 info.deferred.addCallback(cbremove)
452 info.deferred.addErrback(cbremove)
453 xcf = XfrdClientFactory(info)
454 reactor.connectTCP('localhost', XFRD_PORT, xcf)
455 return info.deferred
457 def migrate_begin(self, dominfo, host, port=XFRD_PORT, live=0):
458 """Begin to migrate a domain to another host.
460 @param dominfo: domain info
461 @param host: destination host
462 @param port: destination port
463 @return: deferred
464 """
465 xid = self.nextid()
466 info = XendMigrateInfo(xid, dominfo, host, port, live)
467 return self.session_begin(info)
469 def save_begin(self, dominfo, file):
470 """Begin saving a domain to file.
472 @param dominfo: domain info
473 @param file: destination file
474 @return: deferred
475 """
476 xid = self.nextid()
477 info = XendSaveInfo(xid, dominfo, file)
478 return self.session_begin(info)
480 def instance():
481 global inst
482 try:
483 inst
484 except:
485 inst = XendMigrate()
486 return inst