+++ /dev/null
-#!/usr/bin/env python
-#
-# This is a save process which also buffers outgoing I/O between
-# rounds, so that external viewers never see anything that hasn't
-# been committed at the backup
-#
-# TODO: fencing.
-
-import optparse, os, re, select, signal, sys, time
-
-from xen.remus import save, util, vm
-from xen.remus.device import ReplicatedDisk, ReplicatedDiskException
-from xen.remus.device import BufferedNIC, BufferedNICException
-from xen.xend import XendOptions
-
-class CfgException(Exception): pass
-
-class Cfg(object):
-
- REMUS_FLAGS_COMPRESSION = 1
-
- def __init__(self):
- # must be set
- self.domid = 0
-
- self.host = 'localhost'
- self.nullremus = False
- self.port = XendOptions.instance().get_xend_relocation_port()
- self.interval = 200
- self.netbuffer = True
- self.flags = self.REMUS_FLAGS_COMPRESSION
- self.timer = False
-
- parser = optparse.OptionParser()
- parser.usage = '%prog [options] domain [destination]'
- parser.add_option('-i', '--interval', dest='interval', type='int',
- metavar='MS',
- help='checkpoint every MS milliseconds')
- parser.add_option('-p', '--port', dest='port', type='int',
- help='send stream to port PORT', metavar='PORT')
- parser.add_option('', '--blackhole', dest='nullremus', action='store_true',
- help='replicate to /dev/null (no disk checkpoints, only memory & net buffering)')
- parser.add_option('', '--no-net', dest='nonet', action='store_true',
- help='run without net buffering (benchmark option)')
- parser.add_option('', '--no-compression', dest='nocompress', action='store_true',
- help='run without checkpoint compression')
- parser.add_option('', '--timer', dest='timer', action='store_true',
- help='force pause at checkpoint interval (experimental)')
- self.parser = parser
-
- def usage(self):
- self.parser.print_help()
-
- def getargs(self):
- opts, args = self.parser.parse_args()
-
- if opts.interval:
- self.interval = opts.interval
- if opts.port:
- self.port = opts.port
- if opts.nullremus:
- self.nullremus = True
- if opts.nonet:
- self.netbuffer = False
- if opts.nocompress:
- self.flags &= ~self.REMUS_FLAGS_COMPRESSION
- if opts.timer:
- self.timer = True
-
- if not args:
- raise CfgException('Missing domain')
- self.domid = args[0]
- if (len(args) > 1):
- self.host = args[1]
-
-class SignalException(Exception): pass
-
-def run(cfg):
- closure = lambda: None
- closure.cmd = None
-
- def sigexception(signo, frame):
- raise SignalException(signo)
-
- def die():
- # I am not sure what the best way to die is. xm destroy is another option,
- # or we could attempt to trigger some instant reboot.
- print "dying..."
- print util.runcmd(['sudo', 'ifdown', 'eth2'])
- # dangling imq0 handle on vif locks up the system
- for buf in bufs:
- buf.uninstall()
- print util.runcmd(['sudo', 'xm', 'destroy', cfg.domid])
- print util.runcmd(['sudo', 'ifup', 'eth2'])
-
- def getcommand():
- """Get a command to execute while running.
- Commands include:
- s: die prior to postsuspend hook
- s2: die after postsuspend hook
- r: die prior to preresume hook
- r2: die after preresume hook
- c: die prior to commit hook
- c2: die after commit hook
- """
- r, w, x = select.select([sys.stdin], [], [], 0)
- if sys.stdin not in r:
- return
-
- cmd = sys.stdin.readline().strip()
- if cmd not in ('s', 's2', 'r', 'r2', 'c', 'c2'):
- print "unknown command: %s" % cmd
- closure.cmd = cmd
-
- signal.signal(signal.SIGTERM, sigexception)
-
- dom = vm.VM(cfg.domid)
-
- # set up I/O buffers
- bufs = []
-
- # disks must commit before network can be released
- if not cfg.nullremus:
- for disk in dom.disks:
- try:
- bufs.append(ReplicatedDisk(disk))
- except ReplicatedDiskException, e:
- print e
- continue
-
- if cfg.netbuffer:
- for vif in dom.vifs:
- bufs.append(BufferedNIC(vif))
-
- if cfg.nullremus:
- fd = save.NullSocket((cfg.host, cfg.port))
- else:
- fd = save.MigrationSocket((cfg.host, cfg.port))
-
- def postsuspend():
- 'Begin external checkpointing after domain has paused'
- if not cfg.timer:
- # when not using a timer thread, sleep until now + interval
- closure.starttime = time.time()
-
- if closure.cmd == 's':
- die()
-
- for buf in bufs:
- buf.postsuspend()
-
- if closure.cmd == 's2':
- die()
-
- def preresume():
- 'Complete external checkpointing before domain resumes'
- if closure.cmd == 'r':
- die()
-
- for buf in bufs:
- buf.preresume()
-
- if closure.cmd == 'r2':
- die()
-
- def commit():
- 'commit network buffer'
- if closure.cmd == 'c':
- die()
-
- print >> sys.stderr, "PROF: flushed memory at %0.6f" % (time.time())
-
- for buf in bufs:
- buf.commit()
-
- if closure.cmd == 'c2':
- die()
-
- # Since the domain is running at this point, it's a good time to
- # check for control channel commands
- getcommand()
-
- if not cfg.timer:
- endtime = time.time()
- elapsed = (endtime - closure.starttime) * 1000
-
- if elapsed < cfg.interval:
- time.sleep((cfg.interval - elapsed) / 1000.0)
-
- # False ends checkpointing
- return True
-
- if cfg.timer:
- interval = cfg.interval
- else:
- interval = 0
-
- rc = 0
-
- checkpointer = save.Saver(cfg.domid, fd, postsuspend, preresume, commit,
- interval, cfg.flags)
-
- try:
- checkpointer.start()
- except save.CheckpointError, e:
- print e
- rc = 1
- except KeyboardInterrupt:
- pass
- except SignalException:
- print '*** signalled ***'
-
- for buf in bufs:
- buf.uninstall()
-
- sys.exit(rc)
-
-cfg = Cfg()
-try:
- cfg.getargs()
-except CfgException, inst:
- print str(inst)
- cfg.usage()
- sys.exit(1)
-
-try:
- run(cfg)
-except vm.VMException, inst:
- print str(inst)
- sys.exit(1)