ia64/xen-unstable

changeset 8289:76bff6c996b0

Strip huge piles of cruft from the connection infrastructure. We now actually
block inside accept rather than using select to poll and then calling accept
regardless of the outcome of the select call, and then failing because the
socket is non-blocking.

SocketClientConnection, SocketConnector, TCPClientConnection, TCPConnector,
connectTCP, UnixClientConnection, UnixConnector, connectUnix have gone.

loseConnection and stopListening and closeSocket (where they are needed) are
now called close. startListening is now called listen.

Closes bug #379.

Relieves a weight from the shoulders of the universe.

Signed-off-by: Ewan Mellor <ewan@xensource.com>
author emellor@leeni.uk.xensource.com
date Thu Dec 08 15:04:31 2005 +0000 (2005-12-08)
parents 231686596796
children c9772105fead
files tools/python/xen/web/connection.py tools/python/xen/web/protocol.py tools/python/xen/web/tcp.py tools/python/xen/web/unix.py tools/python/xen/xend/server/relocate.py
line diff
     1.1 --- a/tools/python/xen/web/connection.py	Thu Dec 08 14:30:15 2005 +0000
     1.2 +++ b/tools/python/xen/web/connection.py	Thu Dec 08 15:04:31 2005 +0000
     1.3 @@ -30,11 +30,8 @@ specifying what kind of socket they are.
     1.4  for TCP and unix-domain sockets (see tcp.py and unix.py).
     1.5  """
     1.6  
     1.7 -"""We make sockets non-blocking so that operations like accept()
     1.8 -don't block. We also select on a timeout. Otherwise we have no way
     1.9 -of getting the threads to shutdown.
    1.10 -"""
    1.11 -SELECT_TIMEOUT = 2.0
    1.12 +BUFFER_SIZE = 1024
    1.13 +
    1.14  
    1.15  class SocketServerConnection:
    1.16      """An accepted connection to a server.
    1.17 @@ -45,73 +42,35 @@ class SocketServerConnection:
    1.18          self.protocol = protocol
    1.19          self.addr = addr
    1.20          self.server = server
    1.21 -        self.buffer_n = 1024
    1.22 -        self.thread = None
    1.23          self.protocol.setTransport(self)
    1.24  
    1.25 +
    1.26      def run(self):
    1.27 -        self.thread = threading.Thread(target=self.main)
    1.28 -        self.thread.start()
    1.29 +        threading.Thread(target=self.main).start()
    1.30 +
    1.31  
    1.32      def main(self):
    1.33 -        while True:
    1.34 -            if not self.thread: break
    1.35 -            if self.select(): break
    1.36 -            if not self.thread: break
    1.37 -            data = self.read()
    1.38 -            if data is None: continue
    1.39 -            if data is True: break
    1.40 -            if self.dataReceived(data): break
    1.41 -
    1.42 -    def select(self):
    1.43          try:
    1.44 -            select.select([self.sock], [], [], SELECT_TIMEOUT)
    1.45 -            return False
    1.46 -        except socket.error, ex:
    1.47 -            if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
    1.48 -                return False
    1.49 -            else:
    1.50 -                self.loseConnection(ex)
    1.51 -                return True
    1.52 +            while True:
    1.53 +                try:
    1.54 +                    data = self.sock.recv(BUFFER_SIZE)
    1.55 +                    if data == '':
    1.56 +                        break
    1.57 +                    if self.protocol.dataReceived(data):
    1.58 +                        break
    1.59 +                except socket.error, ex:
    1.60 +                    if ex.args[0] not in (EWOULDBLOCK, EAGAIN, EINTR):
    1.61 +                        break
    1.62 +        finally:
    1.63 +            try:
    1.64 +                self.sock.close()
    1.65 +            except:
    1.66 +                pass
    1.67  
    1.68 -    def read(self):
    1.69 -        try:
    1.70 -            data = self.sock.recv(self.buffer_n)
    1.71 -            if data == '':
    1.72 -                self.loseConnection()
    1.73 -                return True
    1.74 -            return data
    1.75 -        except socket.error, ex:
    1.76 -            if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
    1.77 -                return None
    1.78 -            else:
    1.79 -                self.loseConnection(ex)
    1.80 -                return True
    1.81 -
    1.82 -    def dataReceived(self, data):
    1.83 -        try:
    1.84 -            self.protocol.dataReceived(data)
    1.85 -        except SystemExit:
    1.86 -            raise
    1.87 -        except Exception, ex:
    1.88 -            self.loseConnection(ex)
    1.89 -            return True
    1.90 -        return False
    1.91  
    1.92      def write(self, data):
    1.93          self.sock.send(data)
    1.94  
    1.95 -    def loseConnection(self, reason=None):
    1.96 -        self.thread = None
    1.97 -        self.closeSocket(reason)
    1.98 -
    1.99 -    def closeSocket(self, reason):
   1.100 -        try:
   1.101 -            self.sock.close()
   1.102 -        except SystemExit:
   1.103 -            raise
   1.104 -        except:
   1.105 -            pass
   1.106  
   1.107  class SocketListener:
   1.108      """A server socket, running listen in a thread.
   1.109 @@ -126,192 +85,44 @@ class SocketListener:
   1.110          self.backlog = backlog
   1.111          self.thread = None
   1.112  
   1.113 +
   1.114      def createSocket(self):
   1.115          raise NotImplementedError()
   1.116  
   1.117 +
   1.118      def setCloExec(self):
   1.119          fcntl.fcntl(self.sock.fileno(), fcntl.F_SETFD, fcntl.FD_CLOEXEC)
   1.120  
   1.121 +
   1.122      def acceptConnection(self, sock, protocol, addr):
   1.123          return SocketServerConnection(sock, protocol, addr, self)
   1.124  
   1.125 -    def startListening(self):
   1.126 +
   1.127 +    def listen(self):
   1.128          if self.sock or self.thread:
   1.129              raise IOError("already listening")
   1.130          self.sock = self.createSocket()
   1.131 -        self.sock.setblocking(0)
   1.132          self.sock.listen(self.backlog)
   1.133          self.run()
   1.134  
   1.135 -    def stopListening(self, reason=None):
   1.136 -        self.loseConnection(reason)
   1.137  
   1.138      def run(self):
   1.139          self.thread = threading.Thread(target=self.main)
   1.140          self.thread.start()
   1.141  
   1.142 -    def main(self):
   1.143 -        while True:
   1.144 -            if not self.thread: break
   1.145 -            if self.select(): break
   1.146 -            if self.accept(): break
   1.147 -
   1.148 -    def select(self):
   1.149 -        try:
   1.150 -            select.select([self.sock], [], [], SELECT_TIMEOUT)
   1.151 -            return False
   1.152 -        except socket.error, ex:
   1.153 -            if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
   1.154 -                return False
   1.155 -            else:
   1.156 -                self.loseConnection(ex)
   1.157 -                return True
   1.158 -
   1.159 -    def accept(self):
   1.160 -        try:
   1.161 -            (sock, addr) = self.sock.accept()
   1.162 -            sock.setblocking(0)
   1.163 -            return self.accepted(sock, addr)
   1.164 -        except socket.error, ex:
   1.165 -            if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
   1.166 -                return False
   1.167 -            else:
   1.168 -                self.loseConnection(ex)
   1.169 -                return True
   1.170 -
   1.171 -    def accepted(self, sock, addr):
   1.172 -        self.acceptConnection(sock, self.protocol_class(), addr).run()
   1.173 -        return False
   1.174 -
   1.175 -    def loseConnection(self, reason=None):
   1.176 -        self.thread = None
   1.177 -        self.closeSocket(reason)
   1.178 -
   1.179 -    def closeSocket(self, reason):
   1.180 -        try:
   1.181 -            self.sock.close()
   1.182 -        except SystemExit:
   1.183 -            raise
   1.184 -        except Exception, ex:
   1.185 -            pass
   1.186 -
   1.187 -
   1.188 -class SocketClientConnection:
   1.189 -    """A connection to a server from a client.
   1.190 -
   1.191 -    Call connectionMade() on the protocol in a thread when connected.
   1.192 -    It is completely up to the protocol what to do.
   1.193 -    """
   1.194 -
   1.195 -    def __init__(self, connector):
   1.196 -        self.addr = None
   1.197 -        self.connector = connector
   1.198 -        self.buffer_n = 1024
   1.199 -
   1.200 -    def createSocket (self):
   1.201 -        raise NotImplementedError()
   1.202 -
   1.203 -    def write(self, data):
   1.204 -        if self.sock:
   1.205 -            return self.sock.send(data)
   1.206 -        else:
   1.207 -            return 0
   1.208 -
   1.209 -    def connect(self, timeout):
   1.210 -        #todo: run a timer to cancel on timeout?
   1.211 -        try:
   1.212 -            sock = self.createSocket()
   1.213 -            sock.connect(self.addr)
   1.214 -            self.sock = sock
   1.215 -            self.protocol = self.connector.protocol_class()
   1.216 -            self.protocol.setTransport(self)
   1.217 -        except SystemExit:
   1.218 -            raise
   1.219 -        except Exception, ex:
   1.220 -            self.connector.connectionFailed(ex)
   1.221 -            return False
   1.222 -
   1.223 -        self.thread = threading.Thread(target=self.main)
   1.224 -        #self.thread.setDaemon(True)
   1.225 -        self.thread.start()
   1.226 -        return True
   1.227  
   1.228      def main(self):
   1.229          try:
   1.230 -            # Call the protocol in a thread.
   1.231 -            # Up to it what to do.
   1.232 -            self.protocol.connectionMade(self.addr)
   1.233 -        except SystemExit:
   1.234 -            raise
   1.235 -        except Exception, ex:
   1.236 -            self.loseConnection(ex)
   1.237 -
   1.238 -    def mainLoop(self):
   1.239 -        # Something a protocol could call.
   1.240 -        while True:
   1.241 -            if not self.thread: break
   1.242 -            if self.select(): break
   1.243 -            if not self.thread: break
   1.244 -            data = self.read()
   1.245 -            if data is None: continue
   1.246 -            if data is True: break
   1.247 -            if self.dataReceived(data): break
   1.248 -
   1.249 -    def select(self):
   1.250 -        try:
   1.251 -            select.select([self.sock], [], [], SELECT_TIMEOUT)
   1.252 -            return False
   1.253 -        except socket.error, ex:
   1.254 -            if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
   1.255 -                return False
   1.256 -            else:
   1.257 -                self.loseConnection(ex)
   1.258 -                return True
   1.259 -
   1.260 -    def read(self):
   1.261 -        try:
   1.262 -            data = self.sock.recv(self.buffer_n)
   1.263 -            return data
   1.264 -        except socket.error, ex:
   1.265 -            if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
   1.266 -                return None
   1.267 -            else:
   1.268 -                self.loseConnection(ex)
   1.269 -                return True
   1.270 -        
   1.271 -    def dataReceived(self, data):
   1.272 -        if not self.protocol:
   1.273 -            return True
   1.274 -        try:
   1.275 -            self.protocol.dataReceived(data)
   1.276 -        except SystemExit:
   1.277 -            raise
   1.278 -        except Exception, ex:
   1.279 -            self.loseConnection(ex)
   1.280 -            return True
   1.281 -        return False
   1.282 -
   1.283 -    def loseConnection(self, reason=None):
   1.284 -        self.thread = None
   1.285 -        self.closeSocket(reason)
   1.286 -
   1.287 -    def closeSocket(self, reason):
   1.288 -        try:
   1.289 -            if self.sock:
   1.290 +            while True:
   1.291 +                try:
   1.292 +                    (sock, addr) = self.sock.accept()
   1.293 +                    self.acceptConnection(sock, self.protocol_class(),
   1.294 +                                          addr).run()
   1.295 +                except socket.error, ex:
   1.296 +                    if ex.args[0] not in (EWOULDBLOCK, EAGAIN, EINTR):
   1.297 +                        break
   1.298 +        finally:
   1.299 +            try:
   1.300                  self.sock.close()
   1.301 -        except SystemExit:
   1.302 -            raise
   1.303 -        except:
   1.304 -            pass
   1.305 -
   1.306 -class SocketConnector:
   1.307 -    """A client socket. Connects to a server and runs the client protocol
   1.308 -    in a thread.
   1.309 -    """
   1.310 -
   1.311 -    def __init__(self, protocol_class):
   1.312 -        self.protocol_class = protocol_class
   1.313 -        self.transport = None
   1.314 -
   1.315 -    def connect(self):
   1.316 -        pass
   1.317 +            except:
   1.318 +                pass
     2.1 --- a/tools/python/xen/web/protocol.py	Thu Dec 08 14:30:15 2005 +0000
     2.2 +++ b/tools/python/xen/web/protocol.py	Thu Dec 08 15:04:31 2005 +0000
     2.3 @@ -25,7 +25,7 @@ class Protocol:
     2.4          self.transport = transport
     2.5  
     2.6      def dataReceived(self, data):
     2.7 -        print 'Protocol>dataReceived>'
     2.8 +        raise NotImplementedError()
     2.9  
    2.10      def write(self, data):
    2.11          if self.transport:
     3.1 --- a/tools/python/xen/web/tcp.py	Thu Dec 08 14:30:15 2005 +0000
     3.2 +++ b/tools/python/xen/web/tcp.py	Thu Dec 08 15:04:31 2005 +0000
     3.3 @@ -13,16 +13,16 @@
     3.4  # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
     3.5  #============================================================================
     3.6  # Copyright (C) 2005 Mike Wray <mike.wray@hp.com>
     3.7 +# Copyright (C) 2005 XenSource Ltd.
     3.8  #============================================================================
     3.9  
    3.10 -import sys
    3.11 +
    3.12  import socket
    3.13 -import types
    3.14  import time
    3.15  import errno
    3.16  
    3.17  from connection import *
    3.18 -from protocol import *
    3.19 +
    3.20  
    3.21  class TCPListener(SocketListener):
    3.22  
    3.23 @@ -52,48 +52,8 @@ class TCPListener(SocketListener):
    3.24      def acceptConnection(self, sock, protocol, addr):
    3.25          return SocketServerConnection(sock, protocol, addr, self)
    3.26  
    3.27 -class TCPClientConnection(SocketClientConnection):
    3.28 -
    3.29 -    def __init__(self, host, port, bindAddress, connector):
    3.30 -        SocketClientConnection.__init__(self, connector)
    3.31 -        self.addr = (host, port)
    3.32 -        self.bindAddress = bindAddress
    3.33 -
    3.34 -    def createSocket(self):
    3.35 -        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    3.36 -        if self.bindAddress is not None:
    3.37 -            sock.bind(self.bindAddress)
    3.38 -        return sock
    3.39 -    
    3.40 -class TCPConnector(SocketConnector):
    3.41 -
    3.42 -    def __init__(self, host, port, protocol, timeout=None, bindAddress=None):
    3.43 -        SocketConnector.__init__(self, protocol)
    3.44 -        self.host = host
    3.45 -        self.port = self.servicePort(port)
    3.46 -        self.bindAddress = bindAddress
    3.47 -        self.timeout = timeout
    3.48 -
    3.49 -    def servicePort(self, port):
    3.50 -        if isinstance(port, types.StringTypes):
    3.51 -            try:
    3.52 -                port = socket.getservbyname(port, 'tcp')
    3.53 -            except socket.error, ex:
    3.54 -                raise IOError("unknown service: " + ex)
    3.55 -        return port
    3.56 -
    3.57 -    def connect(self):
    3.58 -        self.transport = TCPClientConnection(
    3.59 -            self.host, self.port, self.bindAddress, self)
    3.60 -        self.transport.connect(self.timeout)
    3.61  
    3.62  def listenTCP(port, protocol, interface='', backlog=None):
    3.63      l = TCPListener(port, protocol, interface=interface, backlog=backlog)
    3.64 -    l.startListening()
    3.65 -    return l
    3.66 -
    3.67 -def connectTCP(host, port, protocol, timeout=None, bindAddress=None):
    3.68 -    c = TCPConnector(host, port, protocol, timeout=timeout,
    3.69 -                     bindAddress=bindAddress)
    3.70 -    c.connect()
    3.71 -    return c
    3.72 +    l.listen()
    3.73 +    l.setCloExec()
     4.1 --- a/tools/python/xen/web/unix.py	Thu Dec 08 14:30:15 2005 +0000
     4.2 +++ b/tools/python/xen/web/unix.py	Thu Dec 08 15:04:31 2005 +0000
     4.3 @@ -16,13 +16,13 @@
     4.4  # Copyright (C) 2005 XenSource Ltd.
     4.5  #============================================================================
     4.6  
     4.7 -import sys
     4.8 +
     4.9  import socket
    4.10  import os
    4.11  import os.path
    4.12  
    4.13  from connection import *
    4.14 -from protocol import *
    4.15 +
    4.16  
    4.17  class UnixListener(SocketListener):
    4.18  
    4.19 @@ -48,33 +48,6 @@ class UnixListener(SocketListener):
    4.20      def acceptConnection(self, sock, protocol, addr):
    4.21          return SocketServerConnection(sock, protocol, self.path, self)
    4.22  
    4.23 -class UnixClientConnection(SocketClientConnection):
    4.24 -
    4.25 -    def __init__(self, addr, connector):
    4.26 -        SocketClientConnection.__init__(self, connector)
    4.27 -        self.addr = addr
    4.28 -        
    4.29 -    def createSocket(self):
    4.30 -        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
    4.31 -        return sock
    4.32 -    
    4.33 -class UnixConnector(SocketConnector):
    4.34 -
    4.35 -    def __init__(self, path, protocol, timeout=None):
    4.36 -        SocketConnector.__init__(self, protocol)
    4.37 -        self.addr = path
    4.38 -        self.timeout = timeout
    4.39 -
    4.40 -    def connect(self):
    4.41 -        self.transport = UnixClientConnection(self.addr, self)
    4.42 -        self.transport.connect(self.timeout)
    4.43  
    4.44  def listenUNIX(path, protocol, backlog=None):
    4.45 -    l = UnixListener(path, protocol, backlog=backlog)
    4.46 -    l.startListening()
    4.47 -    return l
    4.48 -
    4.49 -def connectUNIX(path, protocol, timeout=None):
    4.50 -    c = UnixConnector(path, protocol, timeout=timeout)
    4.51 -    c.connect()
    4.52 -    return c
    4.53 +    UnixListener(path, protocol, backlog=backlog).listen()
     5.1 --- a/tools/python/xen/xend/server/relocate.py	Thu Dec 08 14:30:15 2005 +0000
     5.2 +++ b/tools/python/xen/xend/server/relocate.py	Thu Dec 08 15:04:31 2005 +0000
     5.3 @@ -44,15 +44,15 @@ class RelocationProtocol(protocol.Protoc
     5.4                  res = self.dispatch(val)
     5.5                  self.send_result(res)
     5.6              if self.parser.at_eof():
     5.7 -                self.loseConnection()
     5.8 +                self.close()
     5.9          except SystemExit:
    5.10              raise
    5.11          except:
    5.12              self.send_error()
    5.13  
    5.14 -    def loseConnection(self):
    5.15 +    def close(self):
    5.16          if self.transport:
    5.17 -            self.transport.loseConnection()
    5.18 +            self.transport.close()
    5.19  
    5.20      def send_reply(self, sxpr):
    5.21          io = StringIO.StringIO()
    5.22 @@ -100,15 +100,13 @@ class RelocationProtocol(protocol.Protoc
    5.23          return l
    5.24  
    5.25      def op_quit(self, _1, _2):
    5.26 -        self.loseConnection()
    5.27 +        self.close()
    5.28  
    5.29      def op_receive(self, name, _):
    5.30          if self.transport:
    5.31              self.send_reply(["ready", name])
    5.32 -            self.transport.sock.setblocking(1)
    5.33              XendDomain.instance().domain_restore_fd(
    5.34                  self.transport.sock.fileno())
    5.35 -            self.transport.sock.setblocking(0)
    5.36          else:
    5.37              log.error(name + ": no transport")
    5.38              raise XendError(name + ": no transport")
    5.39 @@ -122,5 +120,4 @@ def listenRelocation():
    5.40      if xroot.get_xend_relocation_server():
    5.41          port = xroot.get_xend_relocation_port()
    5.42          interface = xroot.get_xend_relocation_address()
    5.43 -        l = tcp.listenTCP(port, RelocationProtocol, interface=interface)
    5.44 -        l.setCloExec()
    5.45 +        tcp.listenTCP(port, RelocationProtocol, interface=interface)