Package xmpp :: Module session
[hide private]
[frames] | no frames]

Source Code for Module xmpp.session

  1  ## 
  2  ##   XMPP server 
  3  ## 
  4  ##   Copyright (C) 2004 Alexey "Snake" Nezhdanov 
  5  ## 
  6  ##   This program is free software; you can redistribute it and/or modify 
  7  ##   it under the terms of the GNU General Public License as published by 
  8  ##   the Free Software Foundation; either version 2, or (at your option) 
  9  ##   any later version. 
 10  ## 
 11  ##   This program is distributed in the hope that it will be useful, 
 12  ##   but WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  ##   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 14  ##   GNU General Public License for more details. 
 15   
 16  __version__="$Id" 
 17   
 18  """ 
 19  When your handler is called it is getting the session instance as the first argument. 
 20  This is the difference from xmpppy 0.1 where you got the "Client" instance. 
 21  With Session class you can have "multi-session" client instead of having 
 22  one client for each connection. Is is specifically important when you are 
 23  writing the server. 
 24  """ 
 25   
 26  from protocol import * 
 27   
 28  # Transport-level flags 
 29  SOCKET_UNCONNECTED  =0 
 30  SOCKET_ALIVE        =1 
 31  SOCKET_DEAD         =2 
 32  # XML-level flags 
 33  STREAM__NOT_OPENED =1 
 34  STREAM__OPENED     =2 
 35  STREAM__CLOSING    =3 
 36  STREAM__CLOSED     =4 
 37  # XMPP-session flags 
 38  SESSION_NOT_AUTHED =1 
 39  SESSION_AUTHED     =2 
 40  SESSION_BOUND      =3 
 41  SESSION_OPENED     =4 
 42  SESSION_CLOSED     =5 
 43   
44 -class Session:
45 """ 46 The Session class instance is used for storing all session-related info like 47 credentials, socket/xml stream/session state flags, roster items (in case of 48 client type connection) etc. 49 Session object have no means of discovering is any info is ready to be read. 50 Instead you should use poll() (recomended) or select() methods for this purpose. 51 Session can be one of two types: 'server' and 'client'. 'server' session handles 52 inbound connection and 'client' one used to create an outbound one. 53 Session instance have multitude of internal attributes. The most imporant is the 'peer' one. 54 It is set once the peer is authenticated (client). 55 """
56 - def __init__(self,socket,owner,xmlns=None,peer=None):
57 """ When the session is created it's type (client/server) is determined from the beginning. 58 socket argument is the pre-created socket-like object. 59 It must have the following methods: send, recv, fileno, close. 60 owner is the 'master' instance that have Dispatcher plugged into it and generally 61 will take care about all session events. 62 xmlns is the stream namespace that will be used. Client must set this argument 63 If server sets this argument than stream will be dropped if opened with some another namespace. 64 peer is the name of peer instance. This is the flag that differentiates client session from 65 server session. Client must set it to the name of the server that will be connected, server must 66 leave this argument alone. 67 """ 68 self.xmlns=xmlns 69 if peer: 70 self.TYP='client' 71 self.peer=peer 72 self._socket_state=SOCKET_UNCONNECTED 73 else: 74 self.TYP='server' 75 self.peer=None 76 self._socket_state=SOCKET_ALIVE 77 self._sock=socket 78 self._send=socket.send 79 self._recv=socket.recv 80 self.fileno=socket.fileno 81 self._registered=0 82 83 self.Dispatcher=owner.Dispatcher 84 self.DBG_LINE='session' 85 self.DEBUG=owner.Dispatcher.DEBUG 86 self._expected={} 87 self._owner=owner 88 if self.TYP=='server': self.ID=`random.random()`[2:] 89 else: self.ID=None 90 91 self.sendbuffer='' 92 self._stream_pos_queued=None 93 self._stream_pos_sent=0 94 self.deliver_key_queue=[] 95 self.deliver_queue_map={} 96 self.stanza_queue=[] 97 98 self._session_state=SESSION_NOT_AUTHED 99 self.waiting_features=[] 100 for feature in [NS_TLS,NS_SASL,NS_BIND,NS_SESSION]: 101 if feature in owner.features: self.waiting_features.append(feature) 102 self.features=[] 103 self.feature_in_process=None 104 self.slave_session=None 105 self.StartStream()
106
107 - def StartStream(self):
108 """ This method is used to initialise the internal xml expat parser 109 and to send initial stream header (in case of client connection). 110 Should be used after initial connection and after every stream restart.""" 111 self._stream_state=STREAM__NOT_OPENED 112 self.Stream=simplexml.NodeBuilder() 113 self.Stream._dispatch_depth=2 114 self.Stream.dispatch=self._dispatch 115 self.Parse=self.Stream.Parse 116 self.Stream.stream_footer_received=self._stream_close 117 if self.TYP=='client': 118 self.Stream.stream_header_received=self._catch_stream_id 119 self._stream_open() 120 else: 121 self.Stream.stream_header_received=self._stream_open
122
123 - def receive(self):
124 """ Reads all pending incoming data. 125 Raises IOError on disconnection. 126 Blocks until at least one byte is read.""" 127 try: received = self._recv(10240) 128 except: received = '' 129 130 if len(received): # length of 0 means disconnect 131 self.DEBUG(`self.fileno()`+' '+received,'got') 132 else: 133 self.DEBUG('Socket error while receiving data','error') 134 self.set_socket_state(SOCKET_DEAD) 135 raise IOError("Peer disconnected") 136 return received
137
138 - def sendnow(self,chunk):
139 """ Put chunk into "immidiatedly send" queue. 140 Should only be used for auth/TLS stuff and like. 141 If you just want to shedule regular stanza for delivery use enqueue method. 142 """ 143 if isinstance(chunk,Node): chunk = chunk.__str__().encode('utf-8') 144 elif type(chunk)==type(u''): chunk = chunk.encode('utf-8') 145 self.enqueue(chunk)
146
147 - def enqueue(self,stanza):
148 """ Takes Protocol instance as argument. 149 Puts stanza into "send" fifo queue. Items into the send queue are hold until 150 stream authenticated. After that this method is effectively the same as "sendnow" method.""" 151 if isinstance(stanza,Protocol): 152 self.stanza_queue.append(stanza) 153 else: self.sendbuffer+=stanza 154 if self._socket_state>=SOCKET_ALIVE: self.push_queue()
155
156 - def push_queue(self,failreason=ERR_RECIPIENT_UNAVAILABLE):
157 """ If stream is authenticated than move items from "send" queue to "immidiatedly send" queue. 158 Else if the stream is failed then return all queued stanzas with error passed as argument. 159 Otherwise do nothing.""" 160 # If the stream authed - convert stanza_queue into sendbuffer and set the checkpoints 161 162 if self._stream_state>=STREAM__CLOSED or self._socket_state>=SOCKET_DEAD: # the stream failed. Return all stanzas that are still waiting for delivery. 163 self._owner.deactivatesession(self) 164 for key in self.deliver_key_queue: # Not sure. May be I 165 self._dispatch(Error(self.deliver_queue_map[key],failreason),trusted=1) # should simply re-dispatch it? 166 for stanza in self.stanza_queue: # But such action can invoke 167 self._dispatch(Error(stanza,failreason),trusted=1) # Infinite loops in case of S2S connection... 168 self.deliver_queue_map,self.deliver_key_queue,self.stanza_queue={},[],[] 169 return 170 elif self._session_state>=SESSION_AUTHED: # FIXME! 171 #### LOCK_QUEUE 172 for stanza in self.stanza_queue: 173 txt=stanza.__str__().encode('utf-8') 174 self.sendbuffer+=txt 175 self._stream_pos_queued+=len(txt) # should be re-evaluated for SSL connection. 176 self.deliver_queue_map[self._stream_pos_queued]=stanza # position of the stream when stanza will be successfully and fully sent 177 self.deliver_key_queue.append(self._stream_pos_queued) 178 self.stanza_queue=[]
179 #### UNLOCK_QUEUE 180
181 - def flush_queue(self):
182 """ Put the "immidiatedly send" queue content on the wire. Blocks until at least one byte sent.""" 183 if self.sendbuffer: 184 try: 185 # LOCK_QUEUE 186 sent=self._send(self.sendbuffer) 187 except: 188 # UNLOCK_QUEUE 189 self.set_socket_state(SOCKET_DEAD) 190 self.DEBUG("Socket error while sending data",'error') 191 return self.terminate_stream() 192 self.DEBUG(`self.fileno()`+' '+self.sendbuffer[:sent],'sent') 193 self._stream_pos_sent+=sent 194 self.sendbuffer=self.sendbuffer[sent:] 195 self._stream_pos_delivered=self._stream_pos_sent # Should be acquired from socket somehow. Take SSL into account. 196 while self.deliver_key_queue and self._stream_pos_delivered>self.deliver_key_queue[0]: 197 del self.deliver_queue_map[self.deliver_key_queue[0]] 198 self.deliver_key_queue.remove(self.deliver_key_queue[0])
199 # UNLOCK_QUEUE 200
201 - def _dispatch(self,stanza,trusted=0):
202 """ This is callback that is used to pass the received stanza forth to owner's dispatcher 203 _if_ the stream is authorised. Otherwise the stanza is just dropped. 204 The 'trusted' argument is used to emulate stanza receive. 205 This method is used internally. 206 """ 207 self._owner.packets+=1 208 if self._stream_state==STREAM__OPENED or trusted: # if the server really should reject all stanzas after he is closed stream (himeself)? 209 self.DEBUG(stanza.__str__(),'dispatch') 210 stanza.trusted=trusted 211 return self.Dispatcher.dispatch(stanza,self)
212
213 - def _catch_stream_id(self,ns=None,tag='stream',attrs={}):
214 """ This callback is used to detect the stream namespace of incoming stream. Used internally. """ 215 if not attrs.has_key('id') or not attrs['id']: 216 return self.terminate_stream(STREAM_INVALID_XML) 217 self.ID=attrs['id'] 218 if not attrs.has_key('version'): self._owner.Dialback(self)
219
220 - def _stream_open(self,ns=None,tag='stream',attrs={}):
221 """ This callback is used to handle opening stream tag of the incoming stream. 222 In the case of client session it just make some validation. 223 Server session also sends server headers and if the stream valid the features node. 224 Used internally. """ 225 text='<?xml version="1.0" encoding="utf-8"?>\n<stream:stream' 226 if self.TYP=='client': 227 text+=' to="%s"'%self.peer 228 else: 229 text+=' id="%s"'%self.ID 230 if not attrs.has_key('to'): text+=' from="%s"'%self._owner.servernames[0] 231 else: text+=' from="%s"'%attrs['to'] 232 if attrs.has_key('xml:lang'): text+=' xml:lang="%s"'%attrs['xml:lang'] 233 if self.xmlns: xmlns=self.xmlns 234 else: xmlns=NS_SERVER 235 text+=' xmlns:db="%s" xmlns:stream="%s" xmlns="%s"'%(NS_DIALBACK,NS_STREAMS,xmlns) 236 if attrs.has_key('version') or self.TYP=='client': text+=' version="1.0"' 237 self.sendnow(text+'>') 238 self.set_stream_state(STREAM__OPENED) 239 if self.TYP=='client': return 240 if tag<>'stream': return self.terminate_stream(STREAM_INVALID_XML) 241 if ns<>NS_STREAMS: return self.terminate_stream(STREAM_INVALID_NAMESPACE) 242 if self.Stream.xmlns<>self.xmlns: return self.terminate_stream(STREAM_BAD_NAMESPACE_PREFIX) 243 if not attrs.has_key('to'): return self.terminate_stream(STREAM_IMPROPER_ADDRESSING) 244 if attrs['to'] not in self._owner.servernames: return self.terminate_stream(STREAM_HOST_UNKNOWN) 245 self.ourname=attrs['to'].lower() 246 if self.TYP=='server' and attrs.has_key('version'): 247 # send features 248 features=Node('stream:features') 249 if NS_TLS in self.waiting_features: 250 features.NT.starttls.setNamespace(NS_TLS) 251 features.T.starttls.NT.required 252 if NS_SASL in self.waiting_features: 253 features.NT.mechanisms.setNamespace(NS_SASL) 254 for mec in self._owner.SASL.mechanisms: 255 features.T.mechanisms.NT.mechanism=mec 256 else: 257 if NS_BIND in self.waiting_features: features.NT.bind.setNamespace(NS_BIND) 258 if NS_SESSION in self.waiting_features: features.NT.session.setNamespace(NS_SESSION) 259 self.sendnow(features)
260
261 - def feature(self,feature):
262 """ Declare some stream feature as activated one. """ 263 if feature not in self.features: self.features.append(feature) 264 self.unfeature(feature)
265
266 - def unfeature(self,feature):
267 """ Declare some feature as illegal. Illegal features can not be used. 268 Example: BIND feature becomes illegal after Non-SASL auth. """ 269 if feature in self.waiting_features: self.waiting_features.remove(feature)
270
271 - def _stream_close(self,unregister=1):
272 """ Write the closing stream tag and destroy the underlaying socket. Used internally. """ 273 if self._stream_state>=STREAM__CLOSED: return 274 self.set_stream_state(STREAM__CLOSING) 275 self.sendnow('</stream:stream>') 276 self.set_stream_state(STREAM__CLOSED) 277 self.push_queue() # decompose queue really since STREAM__CLOSED 278 self._owner.flush_queues() 279 if unregister: self._owner.unregistersession(self) 280 self._destroy_socket()
281
282 - def terminate_stream(self,error=None,unregister=1):
283 """ Notify the peer about stream closure. 284 Ensure that xmlstream is not brokes - i.e. if the stream isn't opened yet - 285 open it before closure. 286 If the error condition is specified than create a stream error and send it along with 287 closing stream tag. 288 Emulate receiving 'unavailable' type presence just before stream closure. 289 """ 290 if self._stream_state>=STREAM__CLOSING: return 291 if self._stream_state<STREAM__OPENED: 292 self.set_stream_state(STREAM__CLOSING) 293 self._stream_open() 294 else: 295 self.set_stream_state(STREAM__CLOSING) 296 p=Presence(typ='unavailable') 297 p.setNamespace(NS_CLIENT) 298 self._dispatch(p,trusted=1) 299 if error: 300 if isinstance(error,Node): self.sendnow(error) 301 else: self.sendnow(ErrorNode(error)) 302 self._stream_close(unregister=unregister) 303 if self.slave_session: 304 self.slave_session.terminate_stream(STREAM_REMOTE_CONNECTION_FAILED)
305
306 - def _destroy_socket(self):
307 """ Break cyclic dependancies to let python's GC free memory right now.""" 308 self.Stream.dispatch=None 309 self.Stream.stream_footer_received=None 310 self.Stream.stream_header_received=None 311 self.Stream.destroy() 312 self._sock.close() 313 self.set_socket_state(SOCKET_DEAD)
314
315 - def start_feature(self,f):
316 """ Declare some feature as "negotiating now" to prevent other features from start negotiating. """ 317 if self.feature_in_process: raise Exception("Starting feature %s over %s !"%(f,self.feature_in_process)) 318 self.feature_in_process=f
319
320 - def stop_feature(self,f):
321 """ Declare some feature as "negotiated" to allow other features start negotiating. """ 322 if self.feature_in_process<>f: raise Exception("Stopping feature %s instead of %s !"%(f,self.feature_in_process)) 323 self.feature_in_process=None
324
325 - def set_socket_state(self,newstate):
326 """ Change the underlaying socket state. 327 Socket starts with SOCKET_UNCONNECTED state 328 and then proceeds (possibly) to SOCKET_ALIVE 329 and then to SOCKET_DEAD """ 330 if self._socket_state<newstate: self._socket_state=newstate
331
332 - def set_session_state(self,newstate):
333 """ Change the session state. 334 Session starts with SESSION_NOT_AUTHED state 335 and then comes through 336 SESSION_AUTHED, SESSION_BOUND, SESSION_OPENED and SESSION_CLOSED states. 337 """ 338 if self._session_state<newstate: 339 if self._session_state<SESSION_AUTHED and \ 340 newstate>=SESSION_AUTHED: self._stream_pos_queued=self._stream_pos_sent 341 self._session_state=newstate
342
343 - def set_stream_state(self,newstate):
344 """ Change the underlaying XML stream state 345 Stream starts with STREAM__NOT_OPENED and then proceeds with 346 STREAM__OPENED, STREAM__CLOSING and STREAM__CLOSED states. 347 Note that some features (like TLS and SASL) 348 requires stream re-start so this state can have non-linear changes. """ 349 if self._stream_state<newstate: self._stream_state=newstate
350