ia64/xen-unstable

view tools/python/xen/xend/server/SrvDaemon.py @ 4072:ee7313088070

bitkeeper revision 1.1159.258.38 (4230628bX3yukmWYtNDh925BckMoHQ)

Merge ssh://xenbk@gandalf.hpl.hp.com//var/bk/xen-2.0-testing.bk
into tetris.cl.cam.ac.uk:/auto/groups/xeno-xenod/BK/xen-2.0-testing.bk
author iap10@tetris.cl.cam.ac.uk
date Thu Mar 10 15:06:51 2005 +0000 (2005-03-10)
parents b555b108ca0d 8a0070a6f1b6
children afa2f6b47e70 552e3748f0e7 c7837f5059f1
line source
1 ###########################################################
2 ## Xen controller daemon
3 ## Copyright (c) 2004, K A Fraser (University of Cambridge)
4 ## Copyright (C) 2004, Mike Wray <mike.wray@hp.com>
5 ###########################################################
7 import os
8 import os.path
9 import signal
10 import sys
11 import threading
12 import linecache
13 import socket
14 import pwd
15 import re
16 import StringIO
17 import traceback
18 import time
20 from twisted.internet import pollreactor
21 pollreactor.install()
23 from twisted.internet import reactor
24 from twisted.internet import protocol
25 from twisted.internet import abstract
26 from twisted.internet import defer
28 from xen.lowlevel import xu
30 from xen.xend import sxp
31 from xen.xend import PrettyPrint
32 from xen.xend import EventServer
33 eserver = EventServer.instance()
34 from xen.xend.XendError import XendError
35 from xen.xend.server import SrvServer
36 from xen.xend import XendRoot
37 from xen.xend.XendLogging import log
39 from xen.util.ip import _readline, _readlines
41 import channel
42 import blkif
43 import netif
44 import console
45 import domain
46 from params import *
48 DAEMONIZE = 1
49 DEBUG = 1
51 class NotifierProtocol(protocol.Protocol):
52 """Asynchronous handler for i/o on the notifier (event channel).
53 """
55 def __init__(self, channelFactory):
56 self.channelFactory = channelFactory
58 def notificationReceived(self, idx):
59 channel = self.channelFactory.getChannel(idx)
60 if channel:
61 channel.notificationReceived()
63 def connectionLost(self, reason=None):
64 pass
66 def doStart(self):
67 pass
69 def doStop(self):
70 pass
72 def startProtocol(self):
73 pass
75 def stopProtocol(self):
76 pass
78 class NotifierPort(abstract.FileDescriptor):
79 """Transport class for the event channel.
80 """
82 def __init__(self, daemon, notifier, proto, reactor=None):
83 assert isinstance(proto, NotifierProtocol)
84 abstract.FileDescriptor.__init__(self, reactor)
85 self.daemon = daemon
86 self.notifier = notifier
87 self.protocol = proto
89 def startListening(self):
90 self._bindNotifier()
91 self._connectToProtocol()
93 def stopListening(self):
94 if self.connected:
95 result = self.d = defer.Deferred()
96 else:
97 result = None
98 self.loseConnection()
99 return result
101 def fileno(self):
102 return self.notifier.fileno()
104 def _bindNotifier(self):
105 self.connected = 1
107 def _connectToProtocol(self):
108 self.protocol.makeConnection(self)
109 self.startReading()
111 def loseConnection(self):
112 if self.connected:
113 self.stopReading()
114 self.disconnecting = 1
115 reactor.callLater(0, self.connectionLost)
117 def connectionLost(self, reason=None):
118 abstract.FileDescriptor.connectionLost(self, reason)
119 if hasattr(self, 'protocol'):
120 self.protocol.doStop()
121 self.connected = 0
122 #self.notifier.close() # Not implemented.
123 os.close(self.fileno())
124 del self.notifier
125 if hasattr(self, 'd'):
126 self.d.callback(None)
127 del self.d
129 def doRead(self):
130 count = 0
131 while 1:
132 notification = self.notifier.read()
133 if not notification:
134 break
135 self.protocol.notificationReceived(notification)
136 self.notifier.unmask(notification)
137 count += 1
139 class EventProtocol(protocol.Protocol):
140 """Asynchronous handler for a connected event socket.
141 """
143 def __init__(self, daemon):
144 #protocol.Protocol.__init__(self)
145 self.daemon = daemon
146 # Event queue.
147 self.queue = []
148 # Subscribed events.
149 self.events = []
150 self.parser = sxp.Parser()
151 self.pretty = 0
153 # For debugging subscribe to everything and make output pretty.
154 self.subscribe(['*'])
155 self.pretty = 1
157 def dataReceived(self, data):
158 try:
159 self.parser.input(data)
160 if self.parser.ready():
161 val = self.parser.get_val()
162 res = self.dispatch(val)
163 self.send_result(res)
164 if self.parser.at_eof():
165 self.loseConnection()
166 except SystemExit:
167 raise
168 except:
169 if DEBUG:
170 raise
171 else:
172 self.send_error()
174 def loseConnection(self):
175 if self.transport:
176 self.transport.loseConnection()
177 if self.connected:
178 reactor.callLater(0, self.connectionLost)
180 def connectionLost(self, reason=None):
181 self.unsubscribe()
183 def send_reply(self, sxpr):
184 io = StringIO.StringIO()
185 if self.pretty:
186 PrettyPrint.prettyprint(sxpr, out=io)
187 else:
188 sxp.show(sxpr, out=io)
189 print >> io
190 io.seek(0)
191 return self.transport.write(io.getvalue())
193 def send_result(self, res):
194 return self.send_reply(['ok', res])
196 def send_error(self):
197 (extype, exval) = sys.exc_info()[:2]
198 return self.send_reply(['err',
199 ['type', str(extype)],
200 ['value', str(exval)]])
202 def send_event(self, val):
203 return self.send_reply(['event', val[0], val[1]])
205 def unsubscribe(self):
206 for event in self.events:
207 eserver.unsubscribe(event, self.queue_event)
209 def subscribe(self, events):
210 self.unsubscribe()
211 for event in events:
212 eserver.subscribe(event, self.queue_event)
213 self.events = events
215 def queue_event(self, name, v):
216 # Despite the name we don't queue the event here.
217 # We send it because the transport will queue it.
218 self.send_event([name, v])
220 def opname(self, name):
221 return 'op_' + name.replace('.', '_')
223 def operror(self, name, req):
224 raise XendError('Invalid operation: ' +name)
226 def dispatch(self, req):
227 op_name = sxp.name(req)
228 op_method_name = self.opname(op_name)
229 op_method = getattr(self, op_method_name, self.operror)
230 return op_method(op_name, req)
232 def op_help(self, name, req):
233 def nameop(x):
234 if x.startswith('op_'):
235 return x[3:].replace('_', '.')
236 else:
237 return x
239 l = [ nameop(k) for k in dir(self) if k.startswith('op_') ]
240 return l
242 def op_quit(self, name, req):
243 self.loseConnection()
245 def op_exit(self, name, req):
246 sys.exit(0)
248 def op_pretty(self, name, req):
249 self.pretty = 1
250 return ['ok']
252 def op_console_disconnect(self, name, req):
253 id = sxp.child_value(req, 'id')
254 if not id:
255 raise XendError('Missing console id')
256 id = int(id)
257 self.daemon.console_disconnect(id)
258 return ['ok']
260 def op_info(self, name, req):
261 val = ['info']
262 val += self.daemon.consoles()
263 val += self.daemon.blkifs()
264 val += self.daemon.netifs()
265 return val
267 def op_sys_subscribe(self, name, v):
268 # (sys.subscribe event*)
269 # Subscribe to the events:
270 self.subscribe(v[1:])
271 return ['ok']
273 def op_sys_inject(self, name, v):
274 # (sys.inject event)
275 event = v[1]
276 eserver.inject(sxp.name(event), event)
277 return ['ok']
279 def op_trace(self, name, v):
280 mode = (v[1] == 'on')
281 self.daemon.tracing(mode)
283 def op_log_stderr(self, name, v):
284 mode = v[1]
285 logging = XendRoot.instance().get_logging()
286 if mode == 'on':
287 logging.addLogStderr()
288 else:
289 logging.removeLogStderr()
291 def op_debug_msg(self, name, v):
292 mode = v[1]
293 import messages
294 messages.DEBUG = (mode == 'on')
296 def op_debug_controller(self, name, v):
297 mode = v[1]
298 import controller
299 controller.DEBUG = (mode == 'on')
302 class EventFactory(protocol.Factory):
303 """Asynchronous handler for the event server socket.
304 """
305 protocol = EventProtocol
306 service = None
308 def __init__(self, daemon):
309 #protocol.Factory.__init__(self)
310 self.daemon = daemon
312 def buildProtocol(self, addr):
313 proto = self.protocol(self.daemon)
314 proto.factory = self
315 return proto
317 class VirqClient:
318 def __init__(self, daemon):
319 self.daemon = daemon
321 def virqReceived(self, virq):
322 print 'VirqClient.virqReceived>', virq
323 eserver.inject('xend.virq', virq)
325 def lostChannel(self, channel):
326 print 'VirqClient.lostChannel>', channel
328 class Daemon:
329 """The xend daemon.
330 """
331 def __init__(self):
332 self.shutdown = 0
333 self.traceon = 0
335 def daemon_pids(self):
336 pids = []
337 pidex = '(?P<pid>\d+)'
338 pythonex = '(?P<python>\S*python\S*)'
339 cmdex = '(?P<cmd>.*)'
340 procre = re.compile('^\s*' + pidex + '\s*' + pythonex + '\s*' + cmdex + '$')
341 xendre = re.compile('^/usr/sbin/xend\s*(start|restart)\s*.*$')
342 procs = os.popen('ps -e -o pid,args 2>/dev/null')
343 for proc in procs:
344 pm = procre.match(proc)
345 if not pm: continue
346 xm = xendre.match(pm.group('cmd'))
347 if not xm: continue
348 #print 'pid=', pm.group('pid'), 'cmd=', pm.group('cmd')
349 pids.append(int(pm.group('pid')))
350 return pids
352 def new_cleanup(self, kill=0):
353 err = 0
354 pids = self.daemon_pids()
355 if kill:
356 for pid in pids:
357 print "Killing daemon pid=%d" % pid
358 os.kill(pid, signal.SIGHUP)
359 elif pids:
360 err = 1
361 print "Daemon already running: ", pids
362 return err
364 def read_pid(self, pidfile):
365 """Read process id from a file.
367 @param pidfile: file to read
368 @return pid or 0
369 """
370 pid = 0
371 if os.path.isfile(pidfile) and os.path.getsize(pidfile):
372 try:
373 pid = open(pidfile, 'r').read()
374 pid = int(pid)
375 except:
376 pid = 0
377 return pid
379 def find_process(self, pid, name):
380 """Search for a process.
382 @param pid: process id
383 @param name: process name
384 @return: pid if found, 0 otherwise
385 """
386 running = 0
387 if pid:
388 lines = _readlines(os.popen('ps %d 2>/dev/null' % pid))
389 exp = '^ *%d.+%s' % (pid, name)
390 for line in lines:
391 if re.search(exp, line):
392 running = pid
393 break
394 return running
396 def cleanup_process(self, pidfile, name, kill):
397 """Clean up the pidfile for a process.
398 If a running process is found, kills it if 'kill' is true.
400 @param pidfile: pid file
401 @param name: process name
402 @param kill: whether to kill the process
403 @return running process id or 0
404 """
405 running = 0
406 pid = self.read_pid(pidfile)
407 if self.find_process(pid, name):
408 if kill:
409 os.kill(pid, 1)
410 else:
411 running = pid
412 if running == 0 and os.path.isfile(pidfile):
413 os.remove(pidfile)
414 return running
416 def cleanup_xend(self, kill=False):
417 return self.cleanup_process(XEND_PID_FILE, "xend", kill)
419 def cleanup_xfrd(self, kill=False):
420 return self.cleanup_process(XFRD_PID_FILE, "xfrd", kill)
422 def cleanup(self, kill=False):
423 self.cleanup_xend(kill=kill)
424 self.cleanup_xfrd(kill=kill)
426 def status(self):
427 """Returns the status of the xend and xfrd daemons.
428 The return value is defined by the LSB:
429 0 Running
430 3 Not running
431 """
432 if (self.cleanup_process(XEND_PID_FILE, "xend", False) == 0 or
433 self.cleanup_process(XFRD_PID_FILE, "xfrd", False) == 0):
434 return 3
435 else:
436 return 0
438 def install_child_reaper(self):
439 #signal.signal(signal.SIGCHLD, self.onSIGCHLD)
440 # Ensure that zombie children are automatically reaped.
441 xu.autoreap()
443 def onSIGCHLD(self, signum, frame):
444 code = 1
445 while code > 0:
446 code = os.waitpid(-1, os.WNOHANG)
448 def fork_pid(self, pidfile):
449 """Fork and write the pid of the child to 'pidfile'.
451 @param pidfile: pid file
452 @return: pid of child in parent, 0 in child
453 """
454 pid = os.fork()
455 if pid:
456 # Parent
457 pidfile = open(pidfile, 'w')
458 pidfile.write(str(pid))
459 pidfile.close()
460 return pid
462 def start_xfrd(self):
463 """Fork and exec xfrd, writing its pid to XFRD_PID_FILE.
464 """
465 if self.fork_pid(XFRD_PID_FILE):
466 # Parent
467 pass
468 else:
469 # Child
470 os.execl("/usr/sbin/xfrd", "xfrd")
472 def daemonize(self):
473 if not DAEMONIZE: return
474 # Detach from TTY.
475 os.setsid()
477 # Detach from standard file descriptors.
478 # I do this at the file-descriptor level: the overlying Python file
479 # objects also use fd's 0, 1 and 2.
480 os.close(0)
481 os.close(1)
482 os.close(2)
483 if DEBUG:
484 os.open('/dev/null', os.O_RDONLY)
485 # XXX KAF: Why doesn't this capture output from C extensions that
486 # fprintf(stdout) or fprintf(stderr) ??
487 os.open('/var/log/xend-debug.log', os.O_WRONLY|os.O_CREAT)
488 os.dup(1)
489 else:
490 os.open('/dev/null', os.O_RDWR)
491 os.dup(0)
492 os.open('/var/log/xend-debug.log', os.O_WRONLY|os.O_CREAT)
495 def start(self, trace=0):
496 """Attempts to start the daemons.
497 The return value is defined by the LSB:
498 0 Success
499 4 Insufficient privileges
500 """
501 xend_pid = self.cleanup_xend()
502 xfrd_pid = self.cleanup_xfrd()
505 self.daemonize()
507 if self.set_user():
508 return 4
509 os.chdir("/")
511 if xfrd_pid == 0:
512 self.start_xfrd()
513 if xend_pid > 0:
514 # Trying to run an already-running service is a success.
515 return 0
517 self.install_child_reaper()
519 if self.fork_pid(XEND_PID_FILE):
520 #Parent. Sleep to give child time to start.
521 time.sleep(1)
522 else:
523 # Child
524 self.tracing(trace)
525 self.run()
526 return 0
528 def tracing(self, traceon):
529 """Turn tracing on or off.
531 @param traceon: tracing flag
532 """
533 if traceon == self.traceon:
534 return
535 self.traceon = traceon
536 if traceon:
537 self.tracefile = open(XEND_TRACE_FILE, 'w+', 1)
538 self.traceindent = 0
539 sys.settrace(self.trace)
540 try:
541 threading.settrace(self.trace) # Only in Python >= 2.3
542 except:
543 pass
545 def print_trace(self, str):
546 for i in range(self.traceindent):
547 ch = " "
548 if (i % 5):
549 ch = ' '
550 else:
551 ch = '|'
552 self.tracefile.write(ch)
553 self.tracefile.write(str)
555 def trace(self, frame, event, arg):
556 if not self.traceon:
557 print >>self.tracefile
558 print >>self.tracefile, '-' * 20, 'TRACE OFF', '-' * 20
559 self.tracefile.close()
560 self.tracefile = None
561 return None
562 if event == 'call':
563 code = frame.f_code
564 filename = code.co_filename
565 m = re.search('.*xend/(.*)', filename)
566 if not m:
567 return None
568 modulename = m.group(1)
569 if re.search('sxp.py', modulename):
570 return None
571 self.traceindent += 1
572 self.print_trace("> %s:%s\n"
573 % (modulename, code.co_name))
574 elif event == 'line':
575 filename = frame.f_code.co_filename
576 lineno = frame.f_lineno
577 self.print_trace("%4d %s" %
578 (lineno, linecache.getline(filename, lineno)))
579 elif event == 'return':
580 code = frame.f_code
581 filename = code.co_filename
582 m = re.search('.*xend/(.*)', filename)
583 if not m:
584 return None
585 modulename = m.group(1)
586 self.print_trace("< %s:%s\n"
587 % (modulename, code.co_name))
588 self.traceindent -= 1
589 elif event == 'exception':
590 self.print_trace("! Exception:\n")
591 (ex, val, tb) = arg
592 traceback.print_exception(ex, val, tb, 10, self.tracefile)
593 #del tb
594 return self.trace
596 def set_user(self):
597 # Set the UID.
598 try:
599 os.setuid(pwd.getpwnam(USER)[2])
600 return 0
601 except KeyError, error:
602 print "Error: no such user '%s'" % USER
603 return 1
605 def stop(self):
606 return self.cleanup(kill=True)
608 def run(self):
609 xroot = XendRoot.instance()
610 log.info("Xend Daemon started")
611 self.createFactories()
612 self.listenEvent()
613 self.listenNotifier()
614 self.listenVirq()
615 SrvServer.create(bridge=1)
616 reactor.run()
618 def createFactories(self):
619 self.channelF = channel.channelFactory()
620 self.domainCF = domain.DomainControllerFactory()
621 self.blkifCF = blkif.BlkifControllerFactory()
622 self.netifCF = netif.NetifControllerFactory()
623 self.consoleCF = console.ConsoleControllerFactory()
625 def listenEvent(self):
626 protocol = EventFactory(self)
627 return reactor.listenTCP(EVENT_PORT, protocol)
629 def listenNotifier(self):
630 protocol = NotifierProtocol(self.channelF)
631 p = NotifierPort(self, self.channelF.notifier, protocol, reactor)
632 p.startListening()
633 return p
635 def listenVirq(self):
636 virqChan = self.channelF.virqChannel(channel.VIRQ_DOM_EXC)
637 virqChan.registerClient(VirqClient(self))
639 def exit(self):
640 reactor.disconnectAll()
641 sys.exit(0)
643 def getDomChannel(self, dom):
644 """Get the channel to a domain.
646 @param dom: domain
647 @return: channel (or None)
648 """
649 return self.channelF.getDomChannel(dom)
651 def createDomChannel(self, dom, local_port=0, remote_port=0):
652 """Get the channel to a domain, creating if necessary.
654 @param dom: domain
655 @param local_port: optional local port to re-use
656 @param remote_port: optional remote port to re-use
657 @return: channel
658 """
659 return self.channelF.domChannel(dom, local_port=local_port,
660 remote_port=remote_port)
662 def blkif_create(self, dom, recreate=0):
663 """Create or get a block device interface controller.
665 Returns controller
666 """
667 blkif = self.blkifCF.getController(dom)
668 blkif.daemon = self
669 return blkif
671 def blkifs(self):
672 return [ x.sxpr() for x in self.blkifCF.getControllers() ]
674 def blkif_get(self, dom):
675 return self.blkifCF.getControllerByDom(dom)
677 def netif_create(self, dom, recreate=0):
678 """Create or get a network interface controller.
680 """
681 return self.netifCF.getController(dom)
683 def netifs(self):
684 return [ x.sxpr() for x in self.netifCF.getControllers() ]
686 def netif_get(self, dom):
687 return self.netifCF.getControllerByDom(dom)
689 def console_create(self, dom, console_port=None):
690 """Create a console for a domain.
691 """
692 console = self.consoleCF.getControllerByDom(dom)
693 if console is None:
694 console = self.consoleCF.createController(dom, console_port)
695 return console
697 def consoles(self):
698 return [ c.sxpr() for c in self.consoleCF.getControllers() ]
700 def get_consoles(self):
701 return self.consoleCF.getControllers()
703 def get_console(self, id):
704 return self.consoleCF.getControllerByIndex(id)
706 def get_domain_console(self, dom):
707 return self.consoleCF.getControllerByDom(dom)
709 def console_disconnect(self, id):
710 """Disconnect any connected console client.
711 """
712 console = self.get_console(id)
713 if not console:
714 raise XendError('Invalid console id')
715 console.disconnect()
717 def domain_shutdown(self, dom, reason, key=0):
718 """Shutdown a domain.
719 """
720 dom = int(dom)
721 ctrl = self.domainCF.getController(dom)
722 if not ctrl:
723 raise XendError('No domain controller: %s' % dom)
724 ctrl.shutdown(reason, key)
725 return 0
727 def domain_mem_target_set(self, dom, target):
728 """Set memory target for a domain.
729 """
730 dom = int(dom)
731 ctrl = self.domainCF.getController(dom)
732 if not ctrl:
733 raise XendError('No domain controller: %s' % dom)
734 ctrl.mem_target_set(target)
735 return 0
737 def instance():
738 global inst
739 try:
740 inst
741 except:
742 inst = Daemon()
743 return inst