1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 """
18 Main xmpppy mechanism. Provides library with methods to assign different handlers
19 to different XMPP stanzas.
20 Contains one tunable attribute: DefaultTimeout (25 seconds by default). It defines time that
21 Dispatcher.SendAndWaitForResponce method will wait for reply stanza before giving up.
22 """
23
24 import simplexml,time,sys,random
25 from protocol import *
26 from client import PlugIn
27
28 DefaultTimeout=25
29 ID=0
30 SALT=random.randint(1,100000)
31
33 """ Ancestor of PlugIn class. Handles XMPP stream, i.e. aware of stream headers.
34 Can be plugged out/in to restart these headers (used for SASL f.e.). """
36 PlugIn.__init__(self)
37 DBG_LINE='dispatcher'
38 self.handlers={}
39 self._expected={}
40 self._defaultHandler=None
41 self._pendingExceptions=[]
42 self._eventHandler=None
43 self._cycleHandlers=[]
44 self._exported_methods=[self.Process,self.RegisterHandler,self.RegisterDefaultHandler,\
45 self.RegisterEventHandler,self.UnregisterCycleHandler,self.RegisterCycleHandler,\
46 self.RegisterHandlerOnce,self.UnregisterHandler,self.RegisterProtocol,\
47 self.WaitForResponse,self.SendAndWaitForResponse,self.send,self.disconnect,\
48 self.SendAndCallForResponse, ]
49
51 """ Return set of user-registered callbacks in it's internal format.
52 Used within the library to carry user handlers set over Dispatcher replugins. """
53 return self.handlers
55 """ Restores user-registered callbacks structure from dump previously obtained via dumpHandlers.
56 Used within the library to carry user handlers set over Dispatcher replugins. """
57 self.handlers=handlers
58
69
71 """ Plug the Dispatcher instance into Client class instance and send initial stream header. Used internally."""
72 self._init()
73 for method in self._old_owners_methods:
74 if method.__name__=='send': self._owner_send=method; break
75 self._owner.lastErrNode=None
76 self._owner.lastErr=None
77 self._owner.lastErrCode=None
78 self.StreamInit()
79
81 """ Prepares instance to be destructed. """
82 self.Stream.dispatch=None
83 self.Stream.DEBUG=None
84 self.Stream.features=None
85 self.Stream.destroy()
86
102
104 if ns<>NS_STREAMS or tag<>'stream':
105 raise ValueError('Incorrect stream start: (%s,%s). Terminating.'%(tag,ns))
106
108 """ Check incoming stream for data waiting. If "timeout" is positive - block for as max. this time.
109 Returns:
110 1) length of processed data if some data were processed;
111 2) '0' string if no data were processed but link is alive;
112 3) 0 (zero) if underlying connection is closed.
113 Take note that in case of disconnection detect during Process() call
114 disconnect handlers are called automatically.
115 """
116 for handler in self._cycleHandlers: handler(self)
117 if len(self._pendingExceptions) > 0:
118 _pendingException = self._pendingExceptions.pop()
119 raise _pendingException[0], _pendingException[1], _pendingException[2]
120 if self._owner.Connection.pending_data(timeout):
121 try: data=self._owner.Connection.receive()
122 except IOError: return
123 self.Stream.Parse(data)
124 if len(self._pendingExceptions) > 0:
125 _pendingException = self._pendingExceptions.pop()
126 raise _pendingException[0], _pendingException[1], _pendingException[2]
127 if data: return len(data)
128 return '0'
129
131 """ Creates internal structures for newly registered namespace.
132 You can register handlers for this namespace afterwards. By default one namespace
133 already registered (jabber:client or jabber:component:accept depending on context. """
134 self.DEBUG('Registering namespace "%s"'%xmlns,order)
135 self.handlers[xmlns]={}
136 self.RegisterProtocol('unknown',Protocol,xmlns=xmlns)
137 self.RegisterProtocol('default',Protocol,xmlns=xmlns)
138
140 """ Used to declare some top-level stanza name to dispatcher.
141 Needed to start registering handlers for such stanzas.
142 Iq, message and presence protocols are registered by default. """
143 if not xmlns: xmlns=self._owner.defaultNamespace
144 self.DEBUG('Registering protocol "%s" as %s(%s)'%(tag_name,Proto,xmlns), order)
145 self.handlers[xmlns][tag_name]={type:Proto, 'default':[]}
146
148 """ Register handler for processing all stanzas for specified namespace. """
149 self.RegisterHandler('default', handler, typ, ns, xmlns, makefirst, system)
150
151 - def RegisterHandler(self,name,handler,typ='',ns='',xmlns=None, makefirst=0, system=0):
152 """Register user callback as stanzas handler of declared type. Callback must take
153 (if chained, see later) arguments: dispatcher instance (for replying), incomed
154 return of previous handlers.
155 The callback must raise xmpp.NodeProcessed just before return if it want preven
156 callbacks to be called with the same stanza as argument _and_, more importantly
157 library from returning stanza to sender with error set (to be enabled in 0.2 ve
158 Arguments:
159 "name" - name of stanza. F.e. "iq".
160 "handler" - user callback.
161 "typ" - value of stanza's "type" attribute. If not specified any value match
162 "ns" - namespace of child that stanza must contain.
163 "chained" - chain together output of several handlers.
164 "makefirst" - insert handler in the beginning of handlers list instead of
165 adding it to the end. Note that more common handlers (i.e. w/o "typ" and "
166 will be called first nevertheless.
167 "system" - call handler even if NodeProcessed Exception were raised already.
168 """
169 if not xmlns: xmlns=self._owner.defaultNamespace
170 self.DEBUG('Registering handler %s for "%s" type->%s ns->%s(%s)'%(handler,name,typ,ns,xmlns), 'info')
171 if not typ and not ns: typ='default'
172 if not self.handlers.has_key(xmlns): self.RegisterNamespace(xmlns,'warn')
173 if not self.handlers[xmlns].has_key(name): self.RegisterProtocol(name,Protocol,xmlns,'warn')
174 if not self.handlers[xmlns][name].has_key(typ+ns): self.handlers[xmlns][name][typ+ns]=[]
175 if makefirst: self.handlers[xmlns][name][typ+ns].insert(0,{'func':handler,'system':system})
176 else: self.handlers[xmlns][name][typ+ns].append({'func':handler,'system':system})
177
179 """ Unregister handler after first call (not implemented yet). """
180 if not xmlns: xmlns=self._owner.defaultNamespace
181 self.RegisterHandler(name, handler, typ, ns, xmlns, makefirst, system)
182
184 """ Unregister handler. "typ" and "ns" must be specified exactly the same as with registering."""
185 if not xmlns: xmlns=self._owner.defaultNamespace
186 if not self.handlers.has_key(xmlns): return
187 if not typ and not ns: typ='default'
188 for pack in self.handlers[xmlns][name][typ+ns]:
189 if handler==pack['func']: break
190 else: pack=None
191 try: self.handlers[xmlns][name][typ+ns].remove(pack)
192 except ValueError: pass
193
195 """ Specify the handler that will be used if no NodeProcessed exception were raised.
196 This is returnStanzaHandler by default. """
197 self._defaultHandler=handler
198
200 """ Register handler that will process events. F.e. "FILERECEIVED" event. """
201 self._eventHandler=handler
202
207
217
219 """ Register handler that will be called on every Dispatcher.Process() call. """
220 if handler not in self._cycleHandlers: self._cycleHandlers.append(handler)
221
223 """ Unregister handler that will is called on every Dispatcher.Process() call."""
224 if handler in self._cycleHandlers: self._cycleHandlers.remove(handler)
225
226 - def Event(self,realm,event,data):
227 """ Raise some event. Takes three arguments:
228 1) "realm" - scope of event. Usually a namespace.
229 2) "event" - the event itself. F.e. "SUCESSFULL SEND".
230 3) data that comes along with event. Depends on event."""
231 if self._eventHandler: self._eventHandler(realm,event,data)
232
233 - def dispatch(self,stanza,session=None,direct=0):
234 """ Main procedure that performs XMPP stanza recognition and calling apppropriate handlers for it.
235 Called internally. """
236 if not session: session=self
237 session.Stream._mini_dom=None
238 name=stanza.getName()
239
240 if not direct and self._owner._route:
241 if name == 'route':
242 if stanza.getAttr('error') == None:
243 if len(stanza.getChildren()) == 1:
244 stanza = stanza.getChildren()[0]
245 name=stanza.getName()
246 else:
247 for each in stanza.getChildren():
248 self.dispatch(each,session,direct=1)
249 return
250 elif name == 'presence':
251 return
252 elif name in ('features','bind'):
253 pass
254 else:
255 raise UnsupportedStanzaType(name)
256
257 if name=='features': session.Stream.features=stanza
258
259 xmlns=stanza.getNamespace()
260 if not self.handlers.has_key(xmlns):
261 self.DEBUG("Unknown namespace: " + xmlns,'warn')
262 xmlns='unknown'
263 if not self.handlers[xmlns].has_key(name):
264 self.DEBUG("Unknown stanza: " + name,'warn')
265 name='unknown'
266 else:
267 self.DEBUG("Got %s/%s stanza"%(xmlns,name), 'ok')
268
269 if stanza.__class__.__name__=='Node': stanza=self.handlers[xmlns][name][type](node=stanza)
270
271 typ=stanza.getType()
272 if not typ: typ=''
273 stanza.props=stanza.getProperties()
274 ID=stanza.getID()
275
276 session.DEBUG("Dispatching %s stanza with type->%s props->%s id->%s"%(name,typ,stanza.props,ID),'ok')
277
278 list=['default']
279 if self.handlers[xmlns][name].has_key(typ): list.append(typ)
280 for prop in stanza.props:
281 if self.handlers[xmlns][name].has_key(prop): list.append(prop)
282 if typ and self.handlers[xmlns][name].has_key(typ+prop): list.append(typ+prop)
283
284 chain=self.handlers[xmlns]['default']['default']
285 for key in list:
286 if key: chain = chain + self.handlers[xmlns][name][key]
287
288 output=''
289 if session._expected.has_key(ID):
290 user=0
291 if type(session._expected[ID])==type(()):
292 cb,args=session._expected[ID]
293 del session._expected[ID]
294 session.DEBUG("Expected stanza arrived. Callback %s(%s) found!"%(cb,args),'ok')
295 try: cb(session,stanza,**args)
296 except Exception, typ:
297 if typ.__class__.__name__<>'NodeProcessed': raise
298 else:
299 session.DEBUG("Expected stanza arrived!",'ok')
300 session._expected[ID]=stanza
301 else: user=1
302 for handler in chain:
303 if user or handler['system']:
304 try:
305 handler['func'](session,stanza)
306 except Exception, typ:
307 if typ.__class__.__name__<>'NodeProcessed':
308 self._pendingExceptions.insert(0, sys.exc_info())
309 return
310 user=0
311 if user and self._defaultHandler: self._defaultHandler(session,stanza)
312
314 """ Block and wait until stanza with specific "id" attribute will come.
315 If no such stanza is arrived within timeout, return None.
316 If operation failed for some reason then owner's attributes
317 lastErrNode, lastErr and lastErrCode are set accordingly. """
318 self._expected[ID]=None
319 has_timed_out=0
320 abort_time=time.time() + timeout
321 self.DEBUG("Waiting for ID:%s with timeout %s..." % (ID,timeout),'wait')
322 while not self._expected[ID]:
323 if not self.Process(0.04):
324 self._owner.lastErr="Disconnect"
325 return None
326 if time.time() > abort_time:
327 self._owner.lastErr="Timeout"
328 return None
329 response=self._expected[ID]
330 del self._expected[ID]
331 if response.getErrorCode():
332 self._owner.lastErrNode=response
333 self._owner.lastErr=response.getError()
334 self._owner.lastErrCode=response.getErrorCode()
335 return response
336
338 """ Put stanza on the wire and wait for recipient's response to it. """
339 return self.WaitForResponse(self.send(stanza),timeout)
340
342 """ Put stanza on the wire and call back when recipient replies.
343 Additional callback arguments can be specified in args. """
344 self._expected[self.send(stanza)]=(func,args)
345
346 - def send(self,stanza):
347 """ Serialise stanza and put it on the wire. Assign an unique ID to it before send.
348 Returns assigned ID."""
349 if type(stanza) in [type(''), type(u'')]: return self._owner_send(stanza)
350 if not isinstance(stanza,Protocol): _ID=None
351 elif not stanza.getID():
352 global ID
353 ID+=1
354 _ID="%s_#%s#-%s" % (self._owner.User, SALT, `ID`)
355 stanza.setID(_ID)
356 else: _ID=stanza.getID()
357 if self._owner._registered_name and not stanza.getAttr('from'): stanza.setAttr('from',self._owner._registered_name)
358 if self._owner._route and stanza.getName()!='bind':
359 to=self._owner.Server
360 if stanza.getTo() and stanza.getTo().getDomain():
361 to=stanza.getTo().getDomain()
362 frm=stanza.getFrom()
363 if frm.getDomain():
364 frm=frm.getDomain()
365 route=Protocol('route',to=to,frm=frm,payload=[stanza])
366 stanza=route
367 stanza.setNamespace(self._owner.Namespace)
368 stanza.setParent(self._metastream)
369 self._owner_send(stanza)
370 return _ID
371
373 """ Send a stream terminator and and handle all incoming stanzas before stream closure. """
374 self._owner_send('</stream:stream>')
375 while self.Process(1): pass
376