1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17  """ 
 18  This module contains IBB class that is the simple implementation of JEP-0047. 
 19  Note that this is just a transport for data. You have to negotiate data transfer before 
 20  (via StreamInitiation most probably). Unfortunately SI is not implemented yet. 
 21  """ 
 22   
 23  from protocol import * 
 24  from dispatcher import PlugIn 
 25  import base64 
 26   
 28      """ IBB used to transfer small-sized data chunk over estabilished xmpp connection. 
 29          Data is split into small blocks (by default 3000 bytes each), encoded as base 64 
 30          and sent to another entity that compiles these blocks back into the data chunk. 
 31          This is very inefficiend but should work under any circumstances. Note that 
 32          using IBB normally should be the last resort. 
 33      """ 
 35          """ Initialise internal variables. """ 
 36          PlugIn.__init__(self) 
 37          self.DBG_LINE='ibb' 
 38          self._exported_methods=[self.OpenStream] 
 39          self._streams={} 
 40          self._ampnode=Node(NS_AMP+' amp',payload=[Node('rule',{'condition':'deliver-at','value':'stored','action':'error'}),Node('rule',{'condition':'match-resource','value':'exact','action':'error'})]) 
  41   
 47   
 58   
 60          """ Handles opening of new incoming stream. Used internally. """ 
 61          """ 
 62  <iq type='set' 
 63      from='romeo@montague.net/orchard' 
 64      to='juliet@capulet.com/balcony' 
 65      id='inband_1'> 
 66    <open sid='mySID' 
 67          block-size='4096' 
 68          xmlns='http://jabber.org/protocol/ibb'/> 
 69  </iq> 
 70  """ 
 71          err=None 
 72          sid,blocksize=stanza.getTagAttr('open','sid'),stanza.getTagAttr('open','block-size') 
 73          self.DEBUG('StreamOpenHandler called sid->%s blocksize->%s'%(sid,blocksize),'info') 
 74          try: blocksize=int(blocksize) 
 75          except: err=ERR_BAD_REQUEST 
 76          if not sid or not blocksize: err=ERR_BAD_REQUEST 
 77          elif sid in self._streams.keys(): err=ERR_UNEXPECTED_REQUEST 
 78          if err: rep=Error(stanza,err) 
 79          else: 
 80              self.DEBUG("Opening stream: id %s, block-size %s"%(sid,blocksize),'info') 
 81              rep=Protocol('iq',stanza.getFrom(),'result',stanza.getTo(),{'id':stanza.getID()}) 
 82              self._streams[sid]={'direction':'<'+str(stanza.getFrom()),'block-size':blocksize,'fp':open('/tmp/xmpp_file_'+sid,'w'),'seq':0,'syn_id':stanza.getID()} 
 83          conn.send(rep) 
  84   
 86          """ Start new stream. You should provide stream id 'sid', the endpoind jid 'to', 
 87              the file object containing info for send 'fp'. Also the desired blocksize can be specified. 
 88              Take into account that recommended stanza size is 4k and IBB uses base64 encoding 
 89              that increases size of data by 1/3.""" 
 90          if sid in self._streams.keys(): return 
 91          if not JID(to).getResource(): return 
 92          self._streams[sid]={'direction':'|>'+to,'block-size':blocksize,'fp':fp,'seq':0} 
 93          self._owner.RegisterCycleHandler(self.SendHandler) 
 94          syn=Protocol('iq',to,'set',payload=[Node(NS_IBB+' open',{'sid':sid,'block-size':blocksize})]) 
 95          self._owner.send(syn) 
 96          self._streams[sid]['syn_id']=syn.getID() 
 97          return self._streams[sid] 
  98   
100          """ Send next portion of data if it is time to do it. Used internally. """ 
101          self.DEBUG('SendHandler called','info') 
102          for sid in self._streams.keys(): 
103              stream=self._streams[sid] 
104              if stream['direction'][:2]=='|>': cont=1 
105              elif stream['direction'][0]=='>': 
106                  chunk=stream['fp'].read(stream['block-size']) 
107                  if chunk: 
108                      datanode=Node(NS_IBB+' data',{'sid':sid,'seq':stream['seq']},base64.encodestring(chunk)) 
109                      stream['seq']+=1 
110                      if stream['seq']==65536: stream['seq']=0 
111                      conn.send(Protocol('message',stream['direction'][1:],payload=[datanode,self._ampnode])) 
112                  else: 
113                      """ notify the other side about stream closing 
114                          notify the local user about sucessfull send 
115                          delete the local stream""" 
116                      conn.send(Protocol('iq',stream['direction'][1:],'set',payload=[Node(NS_IBB+' close',{'sid':sid})])) 
117                      conn.Event(self.DBG_LINE,'SUCCESSFULL SEND',stream) 
118                      del self._streams[sid] 
119                      self._owner.UnregisterCycleHandler(self.SendHandler) 
120   
121                      """ 
122  <message from='romeo@montague.net/orchard' to='juliet@capulet.com/balcony' id='msg1'> 
123    <data xmlns='http://jabber.org/protocol/ibb' sid='mySID' seq='0'> 
124      qANQR1DBwU4DX7jmYZnncmUQB/9KuKBddzQH+tZ1ZywKK0yHKnq57kWq+RFtQdCJ 
125      WpdWpR0uQsuJe7+vh3NWn59/gTc5MDlX8dS9p0ovStmNcyLhxVgmqS8ZKhsblVeu 
126      IpQ0JgavABqibJolc3BKrVtVV1igKiX/N7Pi8RtY1K18toaMDhdEfhBRzO/XB0+P 
127      AQhYlRjNacGcslkhXqNjK5Va4tuOAPy2n1Q8UUrHbUd0g+xJ9Bm0G0LZXyvCWyKH 
128      kuNEHFQiLuCY6Iv0myq6iX6tjuHehZlFSh80b5BVV9tNLwNR5Eqz1klxMhoghJOA 
129    </data> 
130    <amp xmlns='http://jabber.org/protocol/amp'> 
131      <rule condition='deliver-at' value='stored' action='error'/> 
132      <rule condition='match-resource' value='exact' action='error'/> 
133    </amp> 
134  </message> 
135  """ 
 136   
138          """ Receive next portion of incoming datastream and store it write 
139              it to temporary file. Used internally. 
140          """ 
141          sid,seq,data=stanza.getTagAttr('data','sid'),stanza.getTagAttr('data','seq'),stanza.getTagData('data') 
142          self.DEBUG('ReceiveHandler called sid->%s seq->%s'%(sid,seq),'info') 
143          try: seq=int(seq); data=base64.decodestring(data) 
144          except: seq=''; data='' 
145          err=None 
146          if not sid in self._streams.keys(): err=ERR_ITEM_NOT_FOUND 
147          else: 
148              stream=self._streams[sid] 
149              if not data: err=ERR_BAD_REQUEST 
150              elif seq<>stream['seq']: err=ERR_UNEXPECTED_REQUEST 
151              else: 
152                  self.DEBUG('Successfull receive sid->%s %s+%s bytes'%(sid,stream['fp'].tell(),len(data)),'ok') 
153                  stream['seq']+=1 
154                  stream['fp'].write(data) 
155          if err: 
156              self.DEBUG('Error on receive: %s'%err,'error') 
157              conn.send(Error(Iq(to=stanza.getFrom(),frm=stanza.getTo(),payload=[Node(NS_IBB+' close')]),err,reply=0)) 
 158   
160          """ Handle stream closure due to all data transmitted. 
161              Raise xmpppy event specifying successfull data receive. """ 
162          sid=stanza.getTagAttr('close','sid') 
163          self.DEBUG('StreamCloseHandler called sid->%s'%sid,'info') 
164          if sid in self._streams.keys(): 
165              conn.send(stanza.buildReply('result')) 
166              conn.Event(self.DBG_LINE,'SUCCESSFULL RECEIVE',self._streams[sid]) 
167              del self._streams[sid] 
168          else: conn.send(Error(stanza,ERR_ITEM_NOT_FOUND)) 
 169   
171          """ Handle stream closure due to all some error while receiving data. 
172              Raise xmpppy event specifying unsuccessfull data receive. """ 
173          syn_id=stanza.getID() 
174          self.DEBUG('StreamBrokenHandler called syn_id->%s'%syn_id,'info') 
175          for sid in self._streams.keys(): 
176              stream=self._streams[sid] 
177              if stream['syn_id']==syn_id: 
178                  if stream['direction'][0]=='<': conn.Event(self.DBG_LINE,'ERROR ON RECEIVE',stream) 
179                  else: conn.Event(self.DBG_LINE,'ERROR ON SEND',stream) 
180                  del self._streams[sid] 
 181   
183          """ Handle remote side reply about is it agree or not to receive our datastream. 
184              Used internally. Raises xmpppy event specfiying if the data transfer 
185              is agreed upon.""" 
186          syn_id=stanza.getID() 
187          self.DEBUG('StreamOpenReplyHandler called syn_id->%s'%syn_id,'info') 
188          for sid in self._streams.keys(): 
189              stream=self._streams[sid] 
190              if stream['syn_id']==syn_id: 
191                  if stanza.getType()=='error': 
192                      if stream['direction'][0]=='<': conn.Event(self.DBG_LINE,'ERROR ON RECEIVE',stream) 
193                      else: conn.Event(self.DBG_LINE,'ERROR ON SEND',stream) 
194                      del self._streams[sid] 
195                  elif stanza.getType()=='result': 
196                      if stream['direction'][0]=='|': 
197                          stream['direction']=stream['direction'][1:] 
198                          conn.Event(self.DBG_LINE,'STREAM COMMITTED',stream) 
199                      else: conn.send(Error(stanza,ERR_UNEXPECTED_REQUEST)) 
  200