ia64/xen-unstable

changeset 5485:43ae2740ac94

bitkeeper revision 1.1713.3.8 (42b2f91bG45uUFWHhUaUha3e1OAxJQ)

xsnode.py:
Updated watches/event code from Mike Wray.
Signed-off-by: Mike Wray <mike.wray@hp.com>
Signed-off-by: Christian Limpach <Christian.Limpach@cl.cam.ac.uk>
author cl349@firebug.cl.cam.ac.uk
date Fri Jun 17 16:23:55 2005 +0000 (2005-06-17)
parents 7d4dd8aae709
children bf7a872ed421
files tools/python/xen/xend/xenstore/xsnode.py
line diff
     1.1 --- a/tools/python/xen/xend/xenstore/xsnode.py	Fri Jun 17 15:30:49 2005 +0000
     1.2 +++ b/tools/python/xen/xend/xenstore/xsnode.py	Fri Jun 17 16:23:55 2005 +0000
     1.3 @@ -2,7 +2,9 @@ import errno
     1.4  import os
     1.5  import os.path
     1.6  import select
     1.7 +import socket
     1.8  import sys
     1.9 +import threading
    1.10  import time
    1.11  
    1.12  from xen.lowlevel import xs
    1.13 @@ -12,18 +14,26 @@ from xen.xend.PrettyPrint import prettyp
    1.14  SELECT_TIMEOUT = 2.0
    1.15  
    1.16  def getEventPath(event):
    1.17 -    return os.path.join("/_event", event)
    1.18 +    if event and event.startswith("/"):
    1.19 +        event = event[1:]
    1.20 +    return os.path.join("/event", event)
    1.21  
    1.22  def getEventIdPath(event):
    1.23 -    return os.path.join(eventPath(event), "@eid")
    1.24 +    return os.path.join(getEventPath(event), "@eid")
    1.25  
    1.26  class Subscription:
    1.27  
    1.28 -    def __init__(self, event, fn, id):
    1.29 -        self.event = event
    1.30 +    def __init__(self, path, fn, sid):
    1.31 +        self.path = path
    1.32          self.watcher = None
    1.33          self.fn = fn
    1.34 -        self.id = id
    1.35 +        self.sid = sid
    1.36 +
    1.37 +    def getPath(self):
    1.38 +        return self.path
    1.39 +
    1.40 +    def getSid(self):
    1.41 +        return self.sid
    1.42  
    1.43      def watch(self, watcher):
    1.44          self.watcher = watcher
    1.45 @@ -34,10 +44,11 @@ class Subscription:
    1.46          if watcher:
    1.47              self.watcher = None
    1.48              watcher.delSubs(self)
    1.49 +        return watcher
    1.50  
    1.51 -    def notify(self, event):
    1.52 +    def notify(self, path, val):
    1.53          try:
    1.54 -            self.fn(event, id)
    1.55 +            self.fn(self, path, val)
    1.56          except SystemExitException:
    1.57              raise
    1.58          except:
    1.59 @@ -45,45 +56,45 @@ class Subscription:
    1.60  
    1.61  class Watcher:
    1.62  
    1.63 -    def __init__(self, store, event):
    1.64 -        self.path = getEventPath(event)
    1.65 -        self.eidPath = getEventIdPath(event)
    1.66 +    def __init__(self, store, path):
    1.67 +        self.path = path
    1.68          store.mkdirs(self.path)
    1.69 -        if not store.exists(self.eidPath):
    1.70 -            store.writeInt(self.eidPath, 0)
    1.71          self.xs = None
    1.72 -        self.subs = []
    1.73 +        self.subscriptions = []
    1.74  
    1.75 -    def __getattr__(self, k, v):
    1.76 -        if k == "fileno":
    1.77 -            if self.xs:
    1.78 -                return self.xs.fileno
    1.79 -            else:
    1.80 -                return -1
    1.81 +    def fileno(self):
    1.82 +        if self.xs:
    1.83 +            return self.xs.fileno
    1.84          else:
    1.85 -            return self.__dict__.get(k, v)
    1.86 +            return -1
    1.87 +
    1.88 +    def getPath(self):
    1.89 +        return self.path
    1.90  
    1.91      def addSubs(self, subs):
    1.92 -        self.subs.append(subs)
    1.93 +        self.subscriptions.append(subs)
    1.94          self.watch()
    1.95  
    1.96      def delSubs(self, subs):
    1.97 -        self.subs.remove(subs)
    1.98 -        if len(self.subs) == 0:
    1.99 +        self.subscriptions.remove(subs)
   1.100 +        if len(self.subscriptions) == 0:
   1.101              self.unwatch()
   1.102  
   1.103 -    def getEvent(self):
   1.104 -        return self.event
   1.105 -
   1.106      def watch(self):
   1.107          if self.xs: return
   1.108          self.xs = xs.open()
   1.109 -        self.xs.watch(path)
   1.110 +        self.xs.watch(self.path)
   1.111  
   1.112      def unwatch(self):
   1.113          if self.xs:
   1.114 -            self.xs.unwatch(self.path)
   1.115 -            self.xs.close()
   1.116 +            try:
   1.117 +                self.xs.unwatch(self.path)
   1.118 +            except Exception, ex:
   1.119 +                print 'Watcher>unwatch>', ex
   1.120 +            try:
   1.121 +                self.xs.close()
   1.122 +            except Exception, ex:
   1.123 +                pass
   1.124              self.xs = None
   1.125              
   1.126      def watching(self):
   1.127 @@ -92,22 +103,38 @@ class Watcher:
   1.128      def getNotification(self):
   1.129          p = self.xs.read_watch()
   1.130          self.xs.acknowledge_watch()
   1.131 -        eid = self.xs.readInt(self.eidPath)
   1.132          return p
   1.133  
   1.134 -    def notify(self, subs):
   1.135 -        p = self.getNotification()
   1.136 -        for s in subs:
   1.137 -            s.notify(p)
   1.138 -            
   1.139 +    def notify(self):
   1.140 +        try:
   1.141 +            p = self.getNotification()
   1.142 +            v = self.xs.read(p)
   1.143 +            for s in subscriptions:
   1.144 +                s.notify(p, v)
   1.145 +        except Exception, ex:
   1.146 +            print 'Notify exception:', ex
   1.147 +
   1.148 +class EventWatcher(Watcher):
   1.149 +
   1.150 +    def __init__(self, store, path, event):
   1.151 +        Watcher.__init__(self, store, path)
   1.152 +        self.event = event
   1.153 +        self.eidPath = getEventIdPath(event)
   1.154 +        if not store.exists(self.eidPath):
   1.155 +            store.write(self.eidPath, str(0))
   1.156 +
   1.157 +    def getEvent(self):
   1.158 +        return self.event
   1.159 +
   1.160  class XenStore:
   1.161  
   1.162 +    xs = None
   1.163 +    watchThread = None
   1.164 +    subscription_id = 1
   1.165 +    
   1.166      def __init__(self):
   1.167 -        self.xs = None
   1.168 -        #self.xs = xs.open()
   1.169 -        self.subscription = {}
   1.170 -        self.subscription_id = 0
   1.171 -        self.events = {}
   1.172 +        self.subscriptions = {}
   1.173 +        self.watchers = {}
   1.174          self.write("/", "")
   1.175  
   1.176      def getxs(self):
   1.177 @@ -119,8 +146,8 @@ class XenStore:
   1.178                      ex = None
   1.179                      break
   1.180                  except Exception, ex:
   1.181 -                    print >>stderr, "Exception connecting to xsdaemon:", ex
   1.182 -                    print >>stderr, "Trying again..."
   1.183 +                    print >>sys.stderr, "Exception connecting to xsdaemon:", ex
   1.184 +                    print >>sys.stderr, "Trying again..."
   1.185                      time.sleep(1)
   1.186              else:
   1.187                  raise ex
   1.188 @@ -217,70 +244,85 @@ class XenStore:
   1.189          self.getxs().write(path, data, create=create, excl=excl)
   1.190  
   1.191      def begin(self, path):
   1.192 -        self.getxs().begin_transaction(path)
   1.193 +        self.getxs().transaction_start(path)
   1.194  
   1.195      def commit(self, abandon=False):
   1.196 -        self.getxs().end_transaction(abort=abandon)
   1.197 +        self.getxs().transaction_end(abort=abandon)
   1.198 +
   1.199 +    def watch(self, path, fn):
   1.200 +        watcher = self.watchers.get(path)
   1.201 +        if not watcher:
   1.202 +            watcher = self.addWatcher(Watcher(self, path))
   1.203 +        return self.addSubscription(watcher, fn)
   1.204 +        
   1.205 +    def unwatch(self, sid):
   1.206 +        s = self.subscriptions.get(sid)
   1.207 +        if not s: return
   1.208 +        del self.subscriptions[s.sid]
   1.209 +        watcher = s.unwatch()
   1.210 +        if watcher and not watcher.watching():
   1.211 +            del self.watchers[path]
   1.212  
   1.213      def subscribe(self, event, fn):
   1.214 -        watcher = self.watchEvent(event)
   1.215 -        self.subscription_id += 1
   1.216 -        subs = Subscription(event, fn, self.subscription_id)
   1.217 -        self.subscription[subs.id] = subs
   1.218 -        subs.watch(watcher)
   1.219 -        return subs.id
   1.220 +        path = getEventPath(event)
   1.221 +        watcher = self.watchers.get(path)
   1.222 +        if not watcher:
   1.223 +            watcher = self.addWatcher(EventWatcher(self, path, event))
   1.224 +        return self.addSubscription(watcher, fn)
   1.225  
   1.226 -    def unsubscribe(self, sid):
   1.227 -        s = self.subscription.get(sid)
   1.228 -        if not s: return
   1.229 -        del self.subscription[s.id]
   1.230 -        s.unwatch()
   1.231 -        unwatchEvent(s.event)
   1.232 +    unsubscribe = unwatch
   1.233  
   1.234      def sendEvent(self, event, data):
   1.235          eventPath = getEventPath(event)
   1.236          eidPath = getEventIdPath(event)
   1.237          try:
   1.238 -            self.begin(eventPath)
   1.239 +            #self.begin(eventPath)
   1.240              self.mkdirs(eventPath)
   1.241 +            eid = 1
   1.242              if self.exists(eidPath):
   1.243 -                eid = self.readInt(eidPath)
   1.244 -                eid += 1
   1.245 -            else:
   1.246 -                eid = 1
   1.247 -            self.writeInt(eidPath, eid)
   1.248 +                data = self.read(eidPath)
   1.249 +                print 'sendEvent>', 'data=', data, type(data)
   1.250 +                try:
   1.251 +                    eid = int(self.read(eidPath))
   1.252 +                    eid += 1
   1.253 +                except Exception, ex:
   1.254 +                    print 'sendEvent>', ex
   1.255 +                    pass
   1.256 +            self.write(eidPath, str(eid))
   1.257              self.write(os.path.join(eventPath, str(eid)), data)
   1.258          finally:
   1.259 -            self.commit()
   1.260 +            #self.commit()
   1.261 +            pass
   1.262  
   1.263 -    def watchEvent(self, event):
   1.264 -        if event in  self.events:
   1.265 -            return
   1.266 -        watcher = Watcher(event)
   1.267 -        self.watchers[watcher.getEvent()] = watcher
   1.268 +    def addWatcher(self, watcher):
   1.269 +        self.watchers[watcher.getPath()] = watcher
   1.270          self.watchStart()
   1.271          return watcher
   1.272  
   1.273 -    def unwatchEvent(self, event):
   1.274 -        watcher = self.watchers.get(event)
   1.275 -        if not watcher:
   1.276 -            return
   1.277 -        if not watcher.watching():
   1.278 -            del self.watchers[event]
   1.279 +    def addSubscription(self, watcher, fn):
   1.280 +        self.subscription_id += 1
   1.281 +        subs = Subscription(watcher.getPath(), fn, self.subscription_id)
   1.282 +        self.subscriptions[subs.sid] = subs
   1.283 +        subs.watch(watcher)
   1.284 +        return subs.sid
   1.285  
   1.286      def watchStart(self):
   1.287          if self.watchThread: return
   1.288 -
   1.289 +        self.watchThread = threading.Thread(name="Watcher",
   1.290 +                                            target=self.watchMain)
   1.291 +        self.watchThread.setDaemon(True)
   1.292 +        self.watchThread.start()
   1.293 +        
   1.294      def watchMain(self):
   1.295          try:
   1.296              while True:
   1.297                  if self.watchThread is None: return
   1.298 -                if not self.events:
   1.299 +                if not self.watchers:
   1.300                      return
   1.301                  rd = self.watchers.values()
   1.302                  try:
   1.303 -                    (rd, wr, er) = select.select(rd, [], [], SELECT_TIMEOUT)
   1.304 -                    for watcher in rd:
   1.305 +                    (srd, swr, ser) = select.select(rd, [], [], SELECT_TIMEOUT)
   1.306 +                    for watcher in srd:
   1.307                          watcher.notify()
   1.308                  except socket.error, ex:
   1.309                      if ex.args[0] in (EAGAIN, EINTR):
   1.310 @@ -315,6 +357,9 @@ class XenNode:
   1.311              else:
   1.312                  raise ValueError("path does not exist: '%s'" % path)
   1.313  
   1.314 +    def getStore(self):
   1.315 +        return self.store
   1.316 +
   1.317      def relPath(self, path=""):
   1.318          if not path:
   1.319              return self.path
   1.320 @@ -376,6 +421,24 @@ class XenNode:
   1.321      def releaseDomain(self, dom):
   1.322          self.store.releaseDomain(dom)
   1.323  
   1.324 +    def watch(self, fn, path=""):
   1.325 +        """Watch a path for changes. The path is relative
   1.326 +        to the node and defaults to the node itself.
   1.327 +        """
   1.328 +        return self.store.watch(self.relPath(path), fn)
   1.329 +
   1.330 +    def unwatch(self, sid):
   1.331 +        return self.store.unwatch(sid)
   1.332 +
   1.333 +    def subscribe(self, event, fn):
   1.334 +        return self.store.subscribe(event, fn)
   1.335 +
   1.336 +    def unsubscribe(self, sid):
   1.337 +        self.store.unsubscribe(sid)
   1.338 +
   1.339 +    def sendEvent(self, event, data):
   1.340 +        return self.store.sendEvent(event, data)
   1.341 +
   1.342      def __repr__(self):
   1.343          return "<XenNode %s>" % self.path
   1.344