ia64/xen-unstable

view tools/python/xen/xend/xenstore/xsnode.py @ 6813:c1450b657ede

g/c introduceDomain and releaseDomain bindings in xsnode and xsobj.
Signed-off-by: Christian Limpach <Christian.Limpach@cl.cam.ac.uk>
author cl349@firebug.cl.cam.ac.uk
date Tue Sep 13 17:32:59 2005 +0000 (2005-09-13)
parents dd668f7527cb
children 9af349b055e5 3233e7ecfa9f
line source
1 #============================================================================
2 # This library is free software; you can redistribute it and/or
3 # modify it under the terms of version 2.1 of the GNU Lesser General Public
4 # License as published by the Free Software Foundation.
5 #
6 # This library is distributed in the hope that it will be useful,
7 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9 # Lesser General Public License for more details.
10 #
11 # You should have received a copy of the GNU Lesser General Public
12 # License along with this library; if not, write to the Free Software
13 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14 #============================================================================
15 # Copyright (C) 2005 Mike Wray <mike.wray@hp.com>
16 #============================================================================
17 import errno
18 import os
19 import os.path
20 import select
21 import socket
22 import sys
23 import threading
24 import time
26 from xen.lowlevel import xs
27 from xen.xend import sxp
28 from xen.xend.PrettyPrint import prettyprint
30 SELECT_TIMEOUT = 2.0
32 def getEventPath(event):
33 if event and event.startswith("/"):
34 event = event[1:]
35 return os.path.join("/event", event)
37 def getEventIdPath(event):
38 return os.path.join(getEventPath(event), "@eid")
40 class Subscription:
42 def __init__(self, path, fn, sid):
43 self.path = path
44 self.watcher = None
45 self.fn = fn
46 self.sid = sid
48 def getPath(self):
49 return self.path
51 def getSid(self):
52 return self.sid
54 def watch(self, watcher):
55 self.watcher = watcher
56 watcher.addSubs(self)
58 def unwatch(self):
59 watcher = self.watcher
60 if watcher:
61 self.watcher = None
62 watcher.delSubs(self)
63 return watcher
65 def notify(self, token, path, val):
66 try:
67 self.fn(self, token, path, val)
68 except SystemExit:
69 raise
70 except Exception, ex:
71 pass
73 class Watcher:
75 def __init__(self, store, path):
76 self.path = path
77 store.mkdirs(self.path)
78 self.xs = None
79 self.subscriptions = []
81 def fileno(self):
82 if self.xs:
83 return self.xs.fileno()
84 else:
85 return -1
87 def getPath(self):
88 return self.path
90 def getToken(self):
91 return self.path
93 def addSubs(self, subs):
94 self.subscriptions.append(subs)
95 self.watch()
97 def delSubs(self, subs):
98 self.subscriptions.remove(subs)
99 if len(self.subscriptions) == 0:
100 self.unwatch()
102 def watch(self):
103 if self.xs: return
104 self.xs = xs.open()
105 self.xs.watch(path=self.getPath(), token=self.getToken())
107 def unwatch(self):
108 if self.xs:
109 ## Possibly crashes xenstored.
110 ## try:
111 ## self.xs.unwatch(path=self.getPath(), token=self.getToken())
112 ## except Exception, ex:
113 ## print 'Watcher>unwatch>', ex
114 try:
115 self.xs.close()
116 except Exception, ex:
117 pass
118 self.xs = None
120 def watching(self):
121 return self.xs is not None
123 def getNotification(self):
124 p = self.xs.read_watch()
125 self.xs.acknowledge_watch(p[1])
126 return p
128 def notify(self):
129 try:
130 (path, token) = self.getNotification()
131 if path.endswith("@eid"):
132 pass
133 else:
134 val = self.xs.read(path)
135 for subs in self.subscriptions:
136 subs.notify(token, path, val)
137 except SystemExit:
138 raise
139 except Exception, ex:
140 raise
142 class EventWatcher(Watcher):
144 def __init__(self, store, path, event):
145 Watcher.__init__(self, store, path)
146 self.event = event
147 self.eidPath = getEventIdPath(event)
148 if not store.exists(self.eidPath):
149 store.write(self.eidPath, str(0))
151 def getEvent(self):
152 return self.event
154 def getToken(self):
155 return self.event
157 class XenStore:
159 xs = None
160 watchThread = None
161 subscription_id = 1
163 def __init__(self):
164 self.subscriptions = {}
165 self.watchers = {}
166 self.write("/", "")
168 def getxs(self):
169 if self.xs is None:
170 ex = None
171 for i in range(0,20):
172 try:
173 self.xs = xs.open()
174 ex = None
175 break
176 except SystemExit:
177 raise
178 except Exception, ex:
179 print >>sys.stderr, "Exception connecting to xenstored:", ex
180 print >>sys.stderr, "Trying again..."
181 time.sleep(1)
182 else:
183 raise ex
185 #todo would like to reconnect if xs conn closes (e.g. daemon restart).
186 return self.xs
188 def dump(self, path="/", out=sys.stdout):
189 print 'dump>', path
190 val = ['node']
191 val.append(['path', path])
192 ## perms = ['perms']
193 ## for p in self.getPerms(path):
194 ## l = ['perm']
195 ## l.append('dom', p.get['dom'])
196 ## for k in ['read', 'write', 'create', 'owner']:
197 ## v = p.get(k)
198 ## l.append([k, v])
199 ## perms.append(l)
200 ## val.append(perms)
201 data = self.read(path)
202 if data:
203 val.append(['data', data])
204 children = ['children']
205 for x in self.lsPaths(path):
206 print 'dump>', 'child=', x
207 children.append(self.dump(x))
208 if len(children) > 1:
209 val.append(children)
210 prettyprint(val, out=out)
211 return val
213 def getPerms(self, path):
214 return self.getxs().get_permissions(path)
216 def ls(self, path="/"):
217 return self.getxs().ls(path)
219 def lsPaths(self, path="/"):
220 return [ os.path.join(path, x) for x in self.ls(path) ]
222 def lsr(self, path="/", list=None):
223 if list is None:
224 list = []
225 list.append(path)
226 for x in self.lsPaths(path):
227 list.append(x)
228 self.lsr(x, list=list)
229 return list
231 def rm(self, path):
232 try:
233 #for x in self.lsPaths():
234 # self.getxs().rm(x)
235 self.getxs().rm(path)
236 except:
237 pass
239 def exists(self, path):
240 try:
241 self.getxs().ls(path)
242 return True
243 except RuntimeError, ex:
244 if ex.args[0] == errno.ENOENT:
245 return False
246 else:
247 raise
249 def mkdirs(self, path):
250 if self.exists(path):
251 return
252 elts = path.split("/")
253 p = "/"
254 for x in elts:
255 if x == "": continue
256 p = os.path.join(p, x)
257 if not self.exists(p):
258 self.getxs().write(p, "", create=True)
260 def read(self, path):
261 try:
262 return self.getxs().read(path)
263 except RuntimeError, ex:
264 if ex.args[0] == errno.EISDIR:
265 return None
266 else:
267 raise
269 def create(self, path, excl=False):
270 self.write(path, "", create=True, excl=excl)
272 def write(self, path, data, create=True, excl=False):
273 self.mkdirs(path)
274 try:
275 self.getxs().write(path, data, create=create, excl=excl)
276 except Exception, ex:
277 raise
279 def begin(self, path):
280 self.getxs().transaction_start(path)
282 def commit(self, abandon=False):
283 self.getxs().transaction_end(abort=abandon)
285 def watch(self, path, fn):
286 watcher = self.watchers.get(path)
287 if not watcher:
288 watcher = self.addWatcher(Watcher(self, path))
289 return self.addSubscription(watcher, fn)
291 def unwatch(self, sid):
292 s = self.subscriptions.get(sid)
293 if not s: return
294 del self.subscriptions[s.sid]
295 watcher = s.unwatch()
296 if watcher and not watcher.watching():
297 try:
298 del self.watchers[watcher.getPath()]
299 except:
300 pass
302 def subscribe(self, event, fn):
303 path = getEventPath(event)
304 watcher = self.watchers.get(path)
305 if not watcher:
306 watcher = self.addWatcher(EventWatcher(self, path, event))
307 return self.addSubscription(watcher, fn)
309 unsubscribe = unwatch
311 def sendEvent(self, event, data):
312 eventPath = getEventPath(event)
313 eidPath = getEventIdPath(event)
314 try:
315 #self.begin(eventPath)
316 self.mkdirs(eventPath)
317 eid = 1
318 if self.exists(eidPath):
319 try:
320 eid = int(self.read(eidPath))
321 eid += 1
322 except Exception, ex:
323 pass
324 self.write(eidPath, str(eid))
325 self.write(os.path.join(eventPath, str(eid)), data)
326 finally:
327 #self.commit()
328 pass
330 def addWatcher(self, watcher):
331 self.watchers[watcher.getPath()] = watcher
332 self.watchStart()
333 return watcher
335 def addSubscription(self, watcher, fn):
336 self.subscription_id += 1
337 subs = Subscription(watcher.getPath(), fn, self.subscription_id)
338 self.subscriptions[subs.sid] = subs
339 subs.watch(watcher)
340 return subs.sid
342 def watchStart(self):
343 if self.watchThread: return
344 self.watchThread = threading.Thread(name="Watcher",
345 target=self.watchMain)
346 self.watchThread.setDaemon(True)
347 self.watchThread.start()
349 def watchMain(self):
350 try:
351 while True:
352 if self.watchThread is None: return
353 if not self.watchers:
354 return
355 rd = self.watchers.values()
356 try:
357 (srd, swr, ser) = select.select(rd, [], [], SELECT_TIMEOUT)
358 for watcher in srd:
359 watcher.notify()
360 except socket.error, ex:
361 if ex.args[0] in (EAGAIN, EINTR):
362 pass
363 else:
364 raise
365 finally:
366 self.watchThread = None
368 def getXenStore():
369 global xenstore
370 try:
371 return xenstore
372 except:
373 xenstore = XenStore()
374 return xenstore
376 def sendEvent(event, val):
377 getXenStore.sendEvent(event, val)
379 def subscribe(event, fn):
380 return getXenStore().subscribe(event, fn)
382 def unsubscribe(sid):
383 getXenStore().unsubscribe(sid)
385 class XenNode:
387 def __init__(self, path="/", create=True):
388 self.store = getXenStore()
389 self.path = path
390 if not self.store.exists(path):
391 if create:
392 self.store.create(path)
393 else:
394 raise ValueError("path does not exist: '%s'" % path)
396 def getStore(self):
397 return self.store
399 def relPath(self, path=""):
400 if not path:
401 return self.path
402 if path and path.startswith("/"):
403 path = path[1:]
404 return os.path.join(self.path, path)
406 def delete(self, path=""):
407 self.store.rm(self.relPath(path))
409 def exists(self, path=""):
410 return self.store.exists(self.relPath(path))
412 def getNode(self, path="", create=True):
413 if path == "":
414 return self
415 else:
416 return XenNode(self.relPath(path=path), create=create)
418 getChild = getNode
420 def getData(self, path=""):
421 path = self.relPath(path)
422 try:
423 return self.store.read(path)
424 except:
425 return None
427 def setData(self, data, path=""):
428 return self.store.write(self.relPath(path), data)
430 def getLock(self):
431 return None
433 def lock(self, lockid):
434 return None
436 def unlock(self, lockid):
437 return None
439 def deleteChild(self, name):
440 self.delete(name)
442 def deleteChildren(self):
443 for name in self.ls():
444 self.deleteChild(name)
446 def getChildren(self):
447 return [ self.getNode(name) for name in self.ls() ]
449 def ls(self):
450 return self.store.ls(self.path)
452 def watch(self, fn, path=""):
453 """Watch a path for changes. The path is relative
454 to the node and defaults to the node itself.
455 """
456 return self.store.watch(self.relPath(path), fn)
458 def unwatch(self, sid):
459 return self.store.unwatch(sid)
461 def subscribe(self, event, fn):
462 return self.store.subscribe(event, fn)
464 def unsubscribe(self, sid):
465 self.store.unsubscribe(sid)
467 def sendEvent(self, event, data):
468 return self.store.sendEvent(event, data)
470 def __repr__(self):
471 return "<XenNode %s>" % self.path