ia64/xen-unstable

changeset 4618:40af907d69a9

bitkeeper revision 1.1327.2.3 (4267b4918M714ImdecocSvKqAkVj1A)

Add some locking to console handling.
Remove a dead file.

Signed-off-by: Mike Wray <mike.wray@hp.com>
author mjw@wray-m-3.hpl.hp.com
date Thu Apr 21 14:11:29 2005 +0000 (2005-04-21)
parents a838a908e38e
children d781b9d08e80
files .rootkeys tools/python/xen/xend/EventTypes.py tools/python/xen/xend/server/console.py
line diff
     1.1 --- a/.rootkeys	Thu Apr 21 13:25:07 2005 +0000
     1.2 +++ b/.rootkeys	Thu Apr 21 14:11:29 2005 +0000
     1.3 @@ -895,7 +895,6 @@ 4267a9b2XqvzKDWxfAdV22c3mO6NHA tools/pyt
     1.4  40c9c468SNuObE_YWARyS0hzTPSzKg tools/python/xen/xend/Args.py
     1.5  41597996WNvJA-DVCBmc0xU9w_XmoA tools/python/xen/xend/Blkctl.py
     1.6  40c9c468Um_qc66OQeLEceIz1pgD5g tools/python/xen/xend/EventServer.py
     1.7 -40c9c468U8EVl0d3G--8YXVg6VJD3g tools/python/xen/xend/EventTypes.py
     1.8  40c9c468QJTEuk9g4qHxGpmIi70PEQ tools/python/xen/xend/PrettyPrint.py
     1.9  40e15b7eeQxWE_hUPB2YTgM9fsZ1PQ tools/python/xen/xend/Vifctl.py
    1.10  4151594bBq8h-bwTfEt8dbBuojMtcA tools/python/xen/xend/XendAsynchProtocol.py
     2.1 --- a/tools/python/xen/xend/EventTypes.py	Thu Apr 21 13:25:07 2005 +0000
     2.2 +++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
     2.3 @@ -1,34 +0,0 @@
     2.4 -#   Copyright (C) 2004 Mike Wray <mike.wray@hp.com>
     2.5 -
     2.6 -## XEND_DOMAIN_CREATE = "xend.domain.create": dom
     2.7 -## create: 
     2.8 -## xend.domain.destroy: dom, reason:died/crashed
     2.9 -## xend.domain.up ?
    2.10 -
    2.11 -## xend.domain.unpause: dom
    2.12 -## xend.domain.pause: dom
    2.13 -## xend.domain.shutdown: dom
    2.14 -## xend.domain.destroy: dom
    2.15 -
    2.16 -## xend.domain.migrate.begin: dom, to
    2.17 -## Begin tells: src host, src domain uri, dst host. Dst id known?
    2.18 -## err: src host, src domain uri, dst host, dst id if known, status (of domain: ok, dead,...), reason
    2.19 -## end: src host, src domain uri, dst host, dst uri
    2.20 -
    2.21 -## Events for both ends of migrate: for exporter and importer?
    2.22 -## Include migrate id so can tie together.
    2.23 -## Have uri /xend/migrate/<id> for migrate info (migrations in progress).
    2.24 -
    2.25 -## (xend.domain.migrate.begin (src <host>) (src.domain <id>)
    2.26 -##                            (dst <host>) (id <migrate id>))
    2.27 - 
    2.28 -## xend.domain.migrate.end:
    2.29 -## (xend.domain.migrate.end (domain <id>) (to <host>)
    2.30 -
    2.31 -## xend.node.up:  xend uri
    2.32 -## xend.node.down: xend uri
    2.33 -
    2.34 -## xend.error ?
    2.35 -
    2.36 -## format:
    2.37 -
     3.1 --- a/tools/python/xen/xend/server/console.py	Thu Apr 21 13:25:07 2005 +0000
     3.2 +++ b/tools/python/xen/xend/server/console.py	Thu Apr 21 14:11:29 2005 +0000
     3.3 @@ -1,6 +1,7 @@
     3.4  # Copyright (C) 2004 Mike Wray <mike.wray@hp.com>
     3.5  
     3.6  import socket
     3.7 +import threading
     3.8  
     3.9  from xen.web import reactor, protocol
    3.10  
    3.11 @@ -86,6 +87,7 @@ class ConsoleDev(Dev):
    3.12  
    3.13      def __init__(self, controller, id, config, recreate=False):
    3.14          Dev.__init__(self, controller, id, config)
    3.15 +        self.lock = threading.RLock()
    3.16          self.status = self.STATUS_NEW
    3.17          self.addr = None
    3.18          self.conn = None
    3.19 @@ -107,9 +109,13 @@ class ConsoleDev(Dev):
    3.20                         [self.id, self.getDomain(), self.console_port])
    3.21  
    3.22      def init(self, recreate=False, reboot=False):
    3.23 -        self.destroyed = False
    3.24 -        self.channel = self.getChannel()
    3.25 -        self.listen()
    3.26 +        try:
    3.27 +            self.lock.acquire()
    3.28 +            self.destroyed = False
    3.29 +            self.channel = self.getChannel()
    3.30 +            self.listen()
    3.31 +        finally:
    3.32 +            self.lock.release()
    3.33  
    3.34      def checkConsolePort(self, console_port):
    3.35          """Check that a console port is not in use by another console.
    3.36 @@ -121,29 +127,41 @@ class ConsoleDev(Dev):
    3.37              ctrl.checkConsolePort(console_port)
    3.38      
    3.39      def sxpr(self):
    3.40 -        val = ['console',
    3.41 -               ['status', self.status ],
    3.42 -               ['id',     self.id    ],
    3.43 -               ['domain', self.getDomain() ] ]
    3.44 -        val.append(['local_port',   self.getLocalPort()  ])
    3.45 -        val.append(['remote_port',  self.getRemotePort() ])
    3.46 -        val.append(['console_port', self.console_port    ])
    3.47 -        val.append(['index', self.getIndex()])
    3.48 -        if self.addr:
    3.49 -            val.append(['connected', self.addr[0], self.addr[1]])
    3.50 +        try:
    3.51 +            self.lock.acquire()
    3.52 +            val = ['console',
    3.53 +                   ['status', self.status ],
    3.54 +                   ['id',     self.id    ],
    3.55 +                   ['domain', self.getDomain() ] ]
    3.56 +            val.append(['local_port',   self.getLocalPort()  ])
    3.57 +            val.append(['remote_port',  self.getRemotePort() ])
    3.58 +            val.append(['console_port', self.console_port    ])
    3.59 +            val.append(['index', self.getIndex()])
    3.60 +            if self.addr:
    3.61 +                val.append(['connected', self.addr[0], self.addr[1]])
    3.62 +        finally:
    3.63 +            self.lock.release()
    3.64          return val
    3.65  
    3.66      def getLocalPort(self):
    3.67 -        if self.channel:
    3.68 -            return self.channel.getLocalPort()
    3.69 -        else:
    3.70 -            return 0
    3.71 +        try:
    3.72 +            self.lock.acquire()
    3.73 +            if self.channel:
    3.74 +                return self.channel.getLocalPort()
    3.75 +            else:
    3.76 +                return 0
    3.77 +        finally:
    3.78 +            self.lock.release()
    3.79  
    3.80      def getRemotePort(self):
    3.81 -        if self.channel:
    3.82 -            return self.channel.getRemotePort()
    3.83 -        else:
    3.84 -            return 0
    3.85 +        try:
    3.86 +            self.lock.acquire()
    3.87 +            if self.channel:
    3.88 +                return self.channel.getRemotePort()
    3.89 +            else:
    3.90 +                return 0
    3.91 +        finally:
    3.92 +            self.lock.release()
    3.93  
    3.94      def uri(self):
    3.95          """Get the uri to use to connect to the console.
    3.96 @@ -166,23 +184,31 @@ class ConsoleDev(Dev):
    3.97          print 'ConsoleDev>destroy>', self, reboot
    3.98          if reboot:
    3.99              return
   3.100 -        self.status = self.STATUS_CLOSED
   3.101 -        if self.conn:
   3.102 -            self.conn.loseConnection()
   3.103 -        self.listener.stopListening()
   3.104 +        try:
   3.105 +            self.lock.acquire()
   3.106 +            self.status = self.STATUS_CLOSED
   3.107 +            if self.conn:
   3.108 +                self.conn.loseConnection()
   3.109 +            self.listener.stopListening()
   3.110 +        finally:
   3.111 +            self.lock.release()
   3.112  
   3.113      def listen(self):
   3.114          """Listen for TCP connections to the console port..
   3.115          """
   3.116 -        if self.closed():
   3.117 -            return
   3.118 -        if self.listener:
   3.119 -            pass
   3.120 -        else:
   3.121 -            self.status = self.STATUS_LISTENING
   3.122 -            cf = ConsoleFactory(self, self.id)
   3.123 -            interface = xroot.get_console_address()
   3.124 -            self.listener = reactor.listenTCP(self.console_port, cf, interface=interface)
   3.125 +        try:
   3.126 +            self.lock.acquire()
   3.127 +            if self.closed():
   3.128 +                return
   3.129 +            if self.listener:
   3.130 +                pass
   3.131 +            else:
   3.132 +                self.status = self.STATUS_LISTENING
   3.133 +                cf = ConsoleFactory(self, self.id)
   3.134 +                interface = xroot.get_console_address()
   3.135 +                self.listener = reactor.listenTCP(self.console_port, cf, interface=interface)
   3.136 +        finally:
   3.137 +            self.lock.release()
   3.138  
   3.139      def connect(self, addr, conn):
   3.140          """Connect a TCP connection to the console.
   3.141 @@ -193,27 +219,35 @@ class ConsoleDev(Dev):
   3.142  
   3.143          returns 0 if ok, negative otherwise
   3.144          """
   3.145 -        if self.closed():
   3.146 -            return -1
   3.147 -        if self.connected():
   3.148 -            return -1
   3.149 -        self.addr = addr
   3.150 -        self.conn = conn
   3.151 -        self.status = self.STATUS_CONNECTED
   3.152 -        self.writeOutput()
   3.153 +        try:
   3.154 +            self.lock.acquire()
   3.155 +            if self.closed():
   3.156 +                return -1
   3.157 +            if self.connected():
   3.158 +                return -1
   3.159 +            self.addr = addr
   3.160 +            self.conn = conn
   3.161 +            self.status = self.STATUS_CONNECTED
   3.162 +            self.writeOutput()
   3.163 +        finally:
   3.164 +            self.lock.release()
   3.165          return 0
   3.166  
   3.167      def disconnect(self, conn=None):
   3.168          """Disconnect the TCP connection to the console.
   3.169          """
   3.170          print 'ConsoleDev>disconnect>', conn
   3.171 -        if conn and conn != self.conn: return
   3.172 -        if self.conn:
   3.173 -            self.conn.loseConnection()
   3.174 -        self.addr = None
   3.175 -        self.conn = None
   3.176 -        self.status = self.STATUS_LISTENING
   3.177 -        self.listen()
   3.178 +        try:
   3.179 +            self.lock.acquire()
   3.180 +            if conn and conn != self.conn: return
   3.181 +            if self.conn:
   3.182 +                self.conn.loseConnection()
   3.183 +            self.addr = None
   3.184 +            self.conn = None
   3.185 +            self.status = self.STATUS_LISTENING
   3.186 +            self.listen()
   3.187 +        finally:
   3.188 +            self.lock.release()
   3.189  
   3.190      def receiveOutput(self, msg):
   3.191          """Receive output console data from the console channel.
   3.192 @@ -223,30 +257,38 @@ class ConsoleDev(Dev):
   3.193          subtype minor message typ
   3.194          """
   3.195          # Treat the obuf as a ring buffer.
   3.196 -        data = msg.get_payload()
   3.197 -        data_n = len(data)
   3.198 -        if self.obuf.space() < data_n:
   3.199 -            self.obuf.discard(data_n)
   3.200 -        if self.obuf.space() < data_n:
   3.201 -            data = data[-self.obuf.space():]
   3.202 -        self.obuf.write(data)
   3.203 -        self.writeOutput()
   3.204 +        try:
   3.205 +            self.lock.acquire()
   3.206 +            data = msg.get_payload()
   3.207 +            data_n = len(data)
   3.208 +            if self.obuf.space() < data_n:
   3.209 +                self.obuf.discard(data_n)
   3.210 +            if self.obuf.space() < data_n:
   3.211 +                data = data[-self.obuf.space():]
   3.212 +            self.obuf.write(data)
   3.213 +            self.writeOutput()
   3.214 +        finally:
   3.215 +            self.lock.release()
   3.216          
   3.217      def writeOutput(self):
   3.218          """Handle buffered output from the console device.
   3.219          Sends it to the connected TCP connection (if any).
   3.220          """
   3.221 -        if self.closed():
   3.222 -            return -1
   3.223 -        if not self.conn:
   3.224 -            return 0
   3.225 -        while not self.obuf.empty():
   3.226 -            try:
   3.227 -                bytes = self.conn.write(self.obuf.peek())
   3.228 -                if bytes > 0:
   3.229 -                    self.obuf.discard(bytes)
   3.230 -            except socket.error:
   3.231 -                pass
   3.232 +        try:
   3.233 +            self.lock.acquire()
   3.234 +            if self.closed():
   3.235 +                return -1
   3.236 +            if not self.conn:
   3.237 +                return 0
   3.238 +            while not self.obuf.empty():
   3.239 +                try:
   3.240 +                    bytes = self.conn.write(self.obuf.peek())
   3.241 +                    if bytes > 0:
   3.242 +                        self.obuf.discard(bytes)
   3.243 +                except socket.error:
   3.244 +                    pass
   3.245 +        finally:
   3.246 +            self.lock.release()
   3.247          return 0
   3.248      
   3.249      def receiveInput(self, conn, data):
   3.250 @@ -257,20 +299,28 @@ class ConsoleDev(Dev):
   3.251          conn connection
   3.252          data input data
   3.253          """
   3.254 -        if self.closed(): return -1
   3.255 -        if conn != self.conn: return 0
   3.256 -        self.ibuf.write(data)
   3.257 -        self.writeInput()
   3.258 +        try:
   3.259 +            self.lock.acquire()
   3.260 +            if self.closed(): return -1
   3.261 +            if conn != self.conn: return 0
   3.262 +            self.ibuf.write(data)
   3.263 +            self.writeInput()
   3.264 +        finally:
   3.265 +            self.lock.release()
   3.266          return 0
   3.267  
   3.268      def writeInput(self):
   3.269          """Write pending console input to the console channel.
   3.270          Writes as much to the channel as it can.
   3.271          """
   3.272 -        while self.channel and not self.ibuf.empty():
   3.273 -            msg = xu.message(CMSG_CONSOLE, 0, 0)
   3.274 -            msg.append_payload(self.ibuf.read(msg.MAX_PAYLOAD))
   3.275 -            self.channel.writeRequest(msg)
   3.276 +        try:
   3.277 +            self.lock.acquire()
   3.278 +            while self.channel and not self.ibuf.empty():
   3.279 +                msg = xu.message(CMSG_CONSOLE, 0, 0)
   3.280 +                msg.append_payload(self.ibuf.read(msg.MAX_PAYLOAD))
   3.281 +                self.channel.writeRequest(msg)
   3.282 +        finally:
   3.283 +            self.lock.release()
   3.284  
   3.285  class ConsoleController(DevController):
   3.286      """Device controller for all the consoles for a domain.