I2P SAM Server and Client @file jython/src/i2psam.py imports globals I2CP Interface Classes class JavaWrapper class I2PDestination __init__ Exporting Methods toBin toBinFile toBinPrivate toBinFilePrivate toBase64 toBase64Private toBase64File toBase64FilePrivate Importing Methods fromBin fromBinFile fromBinPrivate fromBinFilePrivate fromBase64 fromBase64File fromBase64Private fromBase64PrivateFile Signature Methods sign verify Sanity Methods hasPrivate class I2PClient __init__ createDestination createSession class I2PSession attributes __init__ sendMessage numMessages getMessage setSessionListener destroySession CALLBACKS on_message on_abuse on_disconnected on_error class I2PSessionListener Streaming Interface Classes class I2PSocket attributes __init__ bind listen accept connect recv send available close _createSockmgr I2P SAM Server class I2PSamServer attributes __init__ run finish_request samAllocId class I2PSamClientHandler handle on_genkeys on_createsession on_destroysession on_send on_receive on_HELLO on_SESSION on_SESSION_CREATE on_STREAM on_DATAGRAM on_RAW on_NAMING on_DEST on_message threadSocketListener threadSocketReceiver samParse samSend samCreateArgsList _sendbytes _recvbytes Exceptions Functions shahash base64enc base64dec str2bytearray bytearray2str byteoutstream2str dict2props takeKey log logException Tests testdests testsigs testsession testsocket usage main MAINLINE @file python/src/i2psamclient.py imports globals exceptions class I2PSamClient attributes __init__ createSession destroySession send receive SAM methods samHello samSessionCreate samDestGenerate samRawSend samRawCheck samRawReceive samDatagramSend samDatagramCheck samDatagramReceive samStreamConnect samStreamAccept samStreamSend samStreamClose samNamingLookup samParse samSend samCreateArgsList Receiver Side threadRx on_HELLO on_SESSION on_STREAM on_DATAGRAM on_RAW on_NAMING on_DEST Utility Methods samAllocId _recvline _recvbytes _sendbytes _sendline class I2PSAMStream __init__ send recv readline close __del__ _notifyIncomingData class I2PRemoteSession __init__ send recv destroy Functions log logException demoNAMING demoRAW demoDATAGRAM demoSTREAM demoSTREAM_thread demo MAINLINE @first #!/usr/bin/env jython r""" Implements I2P SAM Server. (refer U{http://drupal.i2p.net/node/view/144}) Also contains useful classes for jython programs, which wrap the I2P java classes into more python-compatible paradigms. If you run this module (or the i2psam.jar file created from it) without arguments, it'll run an I2P SAM server bridge, listening on port 7656. The file i2psamclient.py contains python client classes and a demo program. Latest vers of this file is available from U{http://www.freenet.org.nz/i2p/i2psam.py} Latest epydoc-generated doco at U{http://www.freenet.org.nz/i2p/i2pjyDoc} The i2psam.jar file is built from this module with the following command (requires jython and java 1.4.x+ to be installed):: CLASSPATH=/path/to/i2p.jar:/path/to/mstreaming.jar \ jythonc -jar i2psam.jar --all -A net.invisiblenet i2psam.py """ @others # python imports import sys, os, time, Queue, thread, threading, StringIO, traceback, getopt from SocketServer import ThreadingTCPServer, StreamRequestHandler # java imports import java # i2p-specific imports import net.i2p import net.i2p.client # to shut up epydoc #import net.i2p.client.I2PClient #import net.i2p.client.I2PClientFactory #import net.i2p.client.I2PSessionListener import net.i2p.client.naming import net.i2p.client.streaming import net.i2p.crypto import net.i2p.data # handy shorthand refs i2p = net.i2p jI2PClient = i2p.client.I2PClient # import my own helper hack module #import I2PHelper clientFactory = i2p.client.I2PClientFactory #i2phelper = I2PHelper() PROP_RELIABILITY_BEST_EFFORT = i2p.client.I2PClient.PROP_RELIABILITY_BEST_EFFORT PROP_RELIABILITY_GUARANTEED = i2p.client.I2PClient.PROP_RELIABILITY_GUARANTEED version = "0.1.0" # host/port that our socketserver listens on i2psamhost = "127.0.0.1" i2psamport = 7656 # host/port that I2P's I2CP listens on i2cpHost = "127.0.0.1" i2cpPort = 7654 #print "i2cpPort=%s" % repr(i2cpPort) # ------------------------------------------ # logging settings # 1=v.quiet, 2=normal, 3=verbose, 4=debug, 5=painful verbosity = 2 # change to a filename to log there instead logfile = sys.stdout # when set to 1, and when logfile != sys.stdout, log msgs are written # both to logfile and console stdout log2console = 1 # don't touch this! loglock = threading.Lock() class JavaWrapper: """ Wraps a java object as attribute '_item', and forwards __getattr__ to it. All the classes here derive from this """ def __init__(self, item): self._item = item def __getattr__(self, attr): return getattr(self._item, attr) class I2PDestination(JavaWrapper): """ Wraps java I2P destination objects, with a big difference - these objects store the private parts. """ @others def __init__(self, **kw): """ Versatile constructor Keywords (choose only one option): - (none) - create a whole new dest - dest, private - wrap an existing I2P java dest with private stream (private is a byte array) - bin - reconstitute a public-only dest from a binary string - binfile - reconstitute public-only from a binary file - binprivate - reconsistitute private dest from binary string - binfileprivate - reconsistitute private dest from binary file pathname - base64 - reconstitute public-only from base64 string - base64file - reconstitute public-only from file containing base64 - base64private - reconstitute private from string containing base64 - base64fileprivate - reconstitute private from file containing base64 also: - client - a java net.i2p.client.I2PClient object (avoids need for temporary client object when creating new dests) """ dest = i2p.data.Destination() JavaWrapper.__init__(self, dest) self._private = None if kw.has_key('dest'): self._item = kw['dest'] if kw.has_key('private'): self._private = kw['private'] elif kw.has_key('bin'): self.fromBin(kw['bin']) elif kw.has_key('binfile'): self.fromBinFilePrivate(kw['binfile']) elif kw.has_key('binprivate'): self.fromBinPrivate(kw['binprivate']) elif kw.has_key('binfileprivate'): self.fromBinFilePrivate(kw['binfileprivate']) elif kw.has_key('base64'): self.fromBase64(kw['base64']) elif kw.has_key('base64file'): self.fromBase64File(kw['base64file']) elif kw.has_key('base64private'): self.fromBase64Private(kw['base64private']) elif kw.has_key('base64fileprivate'): self.fromBase64FilePrivate(kw['base64fileprivate']) else: # create a whole new one, with a temporary client object (if needed) if kw.has_key('client'): client = kw['client'] else: client = clientFactory.createClient() bytestream = java.io.ByteArrayOutputStream() self._item = client.createDestination(bytestream) self._private = bytestream.toByteArray() def toBin(self): """ Returns a binary string of dest """ return bytearray2str(self.toByteArray()) def toBinFile(self, path): """ Writes out public binary to a file """ f = open(path, "wb") f.write(self.toBin()) f.flush() f.close() def toBinPrivate(self): """ Returns the private key string as binary """ if self._private == None: raise NoPrivateKey return bytearray2str(self._private) def toBinFilePrivate(self, path): """ Writes out a binary file with the dest info """ f = open(path, "wb") f.write(self.toBinPrivate()) f.flush() f.close() def toBase64(self): """ Returns base64 string of public part """ return self._item.toBase64() def toBase64Private(self): """ Exports dest as base64, including private stuff """ if self._private == None: raise NoPrivateKey return i2p.data.Base64.encode(self._private) def toBase64File(self, path): """ Exports dest to file as base64 """ f = open(path, "wb") f.write(self.toBase64()) f.flush() f.close() def toBase64FilePrivate(self, path): """ Writes out a base64 file with the private dest info """ f = open(path, "wb") f.write(self.toBase64Private()) f.flush() f.close() def fromBin(self, bin): """ Loads this dest from a binary string """ self._item.fromByteArray(str2bytearray(bin)) self._private = None def fromBinFile(self, path): """ Loads public part from file containing binary """ f = open(path, "rb") self.fromBin(f.read()) f.close() def fromBinPrivate(self, s): """ Loads this dest object from a base64 private key string """ bytes = str2bytearray(s) self._private = bytes stream = java.io.ByteArrayInputStream(bytes) self._item.readBytes(stream) def fromBinFilePrivate(self, path): """ Loads this dest object, given the pathname of a file containing a binary destkey """ self.fromBinPrivate(open(path, "rb").read()) def fromBase64(self, b64): """ Loads this dest from a base64 string """ self._item.fromBase64(b64) self._private = None def fromBase64File(self, path): """ Loads public part from file containing base64 """ f = open(path, "rb") self.fromBase64(f.read()) f.close() def fromBase64Private(self, s): """ Loads this dest object from a base64 private key string """ bytes = i2p.data.Base64.decode(s) self._private = bytes stream = java.io.ByteArrayInputStream(bytes) self._item.readBytes(stream) def fromBase64FilePrivate(self, path): """ Loads this dest from a base64 file containing private key """ self.fromBase64Private(open(path, "rb").read()) def sign(self, s): """ Signs a string using this dest's priv key """ # get byte stream bytes = str2bytearray(s) # stream up our private bytes stream = java.io.ByteArrayInputStream(self._private) # temporary dest object d = i2p.data.Destination() # suck the public part off the stream d.readBytes(stream) # temporary private key object privkey = i2p.data.PrivateKey() privkey.readBytes(stream) # now we should just have the signing key portion left in the stream signingkey = i2p.data.SigningPrivateKey() signingkey.readBytes(stream) # create DSA engine dsa = i2p.crypto.DSAEngine() sig = dsa.sign(bytes, signingkey) rawsig = bytearray2str(sig.getData()) return rawsig def verify(self, s, sig): """ Verifies a string against this dest, to test if it was actually signed by whoever has the dest privkey """ # get byte stream from data databytes = str2bytearray(s) # get signature stream from sig sigstream = java.io.ByteArrayInputStream(str2bytearray(sig)) # make a signature object signature = i2p.data.Signature() signature.readBytes(sigstream) # get signature verify key pubkey = self.getSigningPublicKey() #log(4, "databytes=%s, pubkey=%s" % (repr(databytes), repr(pubkey))) # now get a verification dsa = i2p.crypto.DSAEngine() result = dsa.verifySignature(signature, databytes, pubkey) return result def hasPrivate(self): """ Returns True if this dest has private parts, False if not """ if self._private: return 1 else: return 0 class I2PClient(JavaWrapper): """ jython-comfortable wrapper for java I2P client class """ @others def __init__(self, **kw): """ I2PClient constructor No args or keywords as yet """ client = clientFactory.createClient() JavaWrapper.__init__(self, client) def createDestination(self, **kw): """ Creates a destination, either a new one, or from a bin or base64 file Keywords: - see L{I2PDestination} constructor """ return I2PDestination(**kw) def createSession(self, dest, sessionClass=None, **kw): """ Create a session Arguments: - dest - an L{I2PDestination} object which MUST contain a private portion - sessionClass - if given, this should be a subclass of I2PSession. This allows you to implement your own handlers. Keywords: - session options (refer javadocs) """ if sessionClass is None: sessionClass = I2PSession if not dest.hasPrivate(): raise NoPrivateKey("Dest object has no private key") #print kw #session = self._item.createSession(destStream, dict2props(kw)) session = sessionClass(client=self, dest=dest, **kw) return session #return sessionClass(session=session) class I2PSession(JavaWrapper): """ Wraps an I2P client session You can subclass this, overriding the on_* handler callbacks, and pass it as an argument to I2PClient.createSession In the default 'on_message' callback, message retrieval is synchronous - inbound messages get written to an internal queue, which you can checked with numMessages() and retrieved from via getMessage(). You may override on_message() if you want to handle incoming messages asynchronously yourself. Note - as far as I can tell, this class should be thread-safe. """ @others host = i2cpHost port = i2cpPort def __init__(self, **kw): """ I2PSession constructor Keywords: - either: - session - a java i2p session object - or: - client - an L{I2PClient} object - dest - an L{I2PDestination} object Also: - listener - an L{I2PSessionListener} object. Router-level options: - reliability - one of 'guaranteed' and 'besteffort' (default 'besteffort') - host - host on which router is running - port - port on which router is listening """ # # grab options destined for java class # options = {} reliability = takeKey(kw, 'reliability', 'besteffort') if reliability == 'guaranteed': reliability = jI2PClient.PROP_RELIABILITY_GUARANTEED else: reliability = jI2PClient.PROP_RELIABILITY_BEST_EFFORT options[jI2PClient.PROP_RELIABILITY] = reliability host = takeKey(kw, 'host', self.host) options[jI2PClient.PROP_TCP_HOST] = host port = takeKey(kw, 'port', self.port) options[jI2PClient.PROP_TCP_PORT] = str(port) if kw.has_key('reliability'): reliability = kw['reliability'] if kw.has_key('listener'): listener = kw['listener'] del kw['listener'] else: listener = I2PSessionListener() #print options # # other keywords handled locally # if kw.has_key('session'): session = kw['session'] del kw['session'] JavaWrapper.__init__(self, session) elif kw.has_key('client') and kw.has_key('dest'): client = kw['client'] dest = kw['dest'] del kw['client'] del kw['dest'] destStream = java.io.ByteArrayInputStream(dest._private) session = self._item = client._item.createSession(destStream, dict2props(options)) #client.createSession(dest, dict2props(options)) else: raise Exception("implementation incomplete") # set up a listener self.setSessionListener(listener) # set up a queue for inbound msgs self.qInbound = Queue.Queue() self.lockInbound = threading.Lock() self.nInboundMessages = 0 self.lockOutbound = threading.Lock() def sendMessage(self, dest, payload): """ Sends a message to another dest Arguments: - dest - an L{I2PDestination} object - payload - a string to send """ dest = dest._item payload = str2bytearray(payload) self.lockOutbound.acquire() try: res = self._item.sendMessage(dest, payload) except: self.lockOutbound.release() raise self.lockOutbound.release() return res def numMessages(self): """ Returns the number of unretrieved inbound messages """ self.lockInbound.acquire() n = self.nInboundMessages self.lockInbound.release() return n def getMessage(self, blocking=1): """ Returns the next available inbound message. If blocking is set to 1 (default), blocks till another message comes in. If blocking is set to 0, returns None if there are no available messages. """ if blocking: msg = self.qInbound.get() #print "getMessage: acquiring lock" self.lockInbound.acquire() #print "getMessage: got lock" self.nInboundMessages -= 1 else: #print "getMessage: acquiring lock" self.lockInbound.acquire() #print "getMessage: got lock" if self.nInboundMessages > 0: msg = self.qInbound.get() self.nInboundMessages -= 1 else: msg = None self.lockInbound.release() #print "getMessage: released lock" return msg def setSessionListener(self, listener): """ Designates an L{I2PSessionListener} object to listen to this session """ self.listener = listener listener.addSession(self) self._item.setSessionListener(listener) def destroySession(self): """ Destroys an existing session Note that due to a jython quirk, calls to destroySession might trigger a TypeError relating to arg mismatch - we ignore such errors here because by the time the exception happens, the session has already been successfully closed """ try: self._item.destroySession() except TypeError: pass # # handler methods which you should override # @others def on_message(self, msg): """ Callback for when a message arrives. Appends the message to the inbound queue, which you can check with the numMessages() method, and read with getMessage() You should override this if you want to handle inbound messages asynchronously. Arguments: - msg - a string that was sent by peer """ #print "on_message: msg=%s" % msg self.lockInbound.acquire() #print "on_message: got lock" self.qInbound.put(msg) self.nInboundMessages += 1 self.lockInbound.release() #print "on_message: released lock" def on_abuse(self, severity): """ Callback indicating abuse is happening Arguments: - severity - an int of abuse level, 1-100 """ print "on_abuse: severity=%s" % severity def on_disconnected(self): """ Callback indicating remote peer disconnected """ print "on_disconnected" def on_error(self, message, error): """ Callback indicating an error occurred """ print "on_error: message=%s error=%s" % (message, error) class I2PSessionListener(i2p.client.I2PSessionListener): """ Wraps a java i2p.client.I2PSessionListener object """ def __init__(self, *sessions): self.sessions = list(sessions) def addSession(self, session): """ Adds an L{I2PSession} object to the list of sessions to listen on Note - you must also invoke the session's setSessionListener() method (see I2PSession.setSessionListener) """ if session not in self.sessions: self.sessions.append(session) def delSession(self, session): """ Stop listening to a given session """ if session in self.sessions: del self.sessions.index[session] def messageAvailable(self, session, msgId, size): """ Callback from java:: public void messageAvailable( I2PSession session, int msgId, long size) """ #print "listener - messageAvailable" # try to find session in our sessions table sessions = filter(lambda s, session=session: s._item == session, self.sessions) if sessions: #print "compare to self.session->%s" % (session == self.session._item) # found a matching session - retrieve it session = sessions[0] # retrieve message and pass to callback msg = session.receiveMessage(msgId) msgStr = bytearray2str(msg) session.on_message(msgStr) else: print "messageAvailable: unknown session=%s msgId=%s size=%s" % (session, msgId, size) def reportAbuse(self, session, severity): """ Callback from java:: public void reportAbuse( I2PSession session, int severity) """ if self.session: self.session.on_abuse(severity) else: print "reportAbuse: unknown session=%s severity=%s" % (session, severity) def disconnected(self, session): """ Callback from java:: public void disconnected(I2PSession session) """ if self.session: self.session.on_disconnected() else: print "disconnected: unknown session=%s" % session def errorOccurred(session, message, error): """ Callback from java:: public void errorOccurred( I2PSession session, java.lang.String message, java.lang.Throwable error) """ if self.session: self.session.on_error(message, error) else: print "errorOccurred: message=%s error=%s" % (message, error) class I2PSocket: """ Wraps I2P streaming API into a form resembling python sockets """ @others host = i2cpHost port = i2cpPort def __init__(self, dest=None, **kw): """ Create an I2P streaming socket Arguments: - dest - a private destination to associate with this socket Keywords: - host - hostname on which i2cp is listening (default self.host) - port - port on which i2cp listens (default self.port) Internally used keywords (used for wrapping an accept()ed connection): - dest - remdest - sock - instream - outstream """ # set up null attribs self.sockmgr = None self.instream = None self.outstream = None self.sock = None self._connected = 0 self._blocking = 1 # save dest (or lack thereof) self.dest = dest if kw.has_key('sock') \ and kw.has_key('remdest') \ and kw.has_key('instream') \ and kw.has_key('outstream'): # wrapping an accept()'ed connection log(4, "accept()'ed a connection, wrapping...") self.sock = kw['sock'] self.dest = dest self.remdest = kw['remdest'] self.instream = kw['instream'] self.outstream = kw['outstream'] else: log(4, "creating new I2PSocket %s" % dest) # process keywords self.host = kw.get('host', self.host) self.port = int(kw.get('port', self.port)) # we need a factory, don't we? self.sockmgrFact = i2p.client.streaming.I2PSocketManagerFactory() def bind(self, dest=None): """ 'binds' the socket to a dest dest is an I2PDestination object, which you may specify in the constructor instead of here. However, we give you the option of specifying here for some semantic compatibility with python sockets. """ if dest is not None: self.dest = dest elif not self.dest: # create new dest, client should interrogate it at some time log(4, "bind: socket has no dest, creating one") self.dest = I2PDestination() def listen(self, *args, **kw): """ Sets up the object to receive connections """ # sanity checks if self.sockmgr: raise I2PSocketError(".sockmgr already present - have you already called listen?") if not self.dest: raise I2PSocketError("socket is not bound to a destination") log(4, "listening on socket") # create the socket manager self._createSockmgr() def accept(self): """ Waits for incoming connections, and returns a new I2PSocket object with the connection """ # sanity check if not self.sockmgr: raise I2PSocketError(".listen() has not been called on this socket") # accept a conn and get its streams sock = self.sockmgr.getServerSocket().accept() instream = sock.getInputStream() outstream = sock.getOutputStream() remdest = I2PDestination(dest=sock.getPeerDestination()) # wrap it and return it sockobj = I2PSocket(dest=self.dest, remdest=remdest, sock=sock, instream=instream, outstream=outstream) self._connected = 1 return sockobj def connect(self, remdest): """ Connects to a remote destination This has one totally major difference from the normal socket paradigm, and that is that you can have n outbound connections to different dests. """ # sanity check if self.sockmgr: raise I2PSocketError(".sockmgr already present - have you already called listen/connect?") # create whole new dest if none was provided to constructor if self.dest is None: log(4, "connect: creating whole new dest") self.dest = I2PDestination() # create the socket manager self._createSockmgr() # do the connect #print "remdest._item = %s" % repr(remdest._item) opts = net.i2p.client.streaming.I2PSocketOptions() try: log(4, "trying to connect to %s" % remdest.toBase64()) sock = self.sock = self.sockmgr.connect(remdest._item, opts) self.remdest = remdest except: logException(2, "apparent exception, continuing...") self.instream = sock.getInputStream() self.outstream = sock.getOutputStream() sockobj = I2PSocket(dest=self.dest, remdest=remdest, sock=sock, instream=self.instream, outstream=self.outstream) self._connected = 1 return sockobj def recv(self, nbytes): """ Reads nbytes of data from socket """ # sanity check if not self.instream: raise I2PSocketError("Socket is not connected") # for want of better methods, read bytewise chars = [] while nbytes > 0: byte = self.instream.read() if byte < 0: break # got all we're gonna get char = chr(byte) chars.append(char) #print "read: got a byte %s (%s)" % (byte, repr(char)) nbytes -= 1 # got it all buf = "".join(chars) #print "recv: buf=%s" % repr(buf) return buf def send(self, buf): """ Sends buf thru socket """ # sanity check if not self.outstream: raise I2PSocketError("Socket is not connected") # and write it out log(4, "send: writing '%s' to outstream..." % repr(buf)) outstream = self.outstream for c in buf: outstream.write(ord(c)) # flush just in case log(4, "send: flushing...") self.outstream.flush() log(4, "send: done") def available(self): """ Returns the number of bytes available for recv() """ #print "available: sock is %s" % repr(self.sock) return self.instream.available() def close(self): """ Closes the socket """ # sanity check #if not self._connected: # raise I2PSocketError("Socket is not connected") # shut up everything try: self.instream.close() except: pass try: self.outstream.close() except: pass try: self.sock.close() except: pass def _createSockmgr(self): if getattr(self, 'sockmgr', None): return #options = {jI2PClient.PROP_TCP_HOST: self.host, # jI2PClient.PROP_TCP_PORT: self.port} options = {} props = dict2props(options) # get a java stream thing from dest stream = java.io.ByteArrayInputStream(self.dest._private) # create socket manager thing self.sockmgr = self.sockmgrFact.createManager(stream, self.host, self.port, props) class I2PSamServer(ThreadingTCPServer): """ A server which makes I2CP available via a socket """ @others host = i2psamhost port = i2psamport i2cphost = i2cpHost i2cpport = i2cpPort version = version def __init__(self, i2pclient=None, **kw): """ Create the client listener object Arguments: - i2pclient - an I2PClient object - optional - if not given, one will be created Keywords: - host - host to listen on for client conns (default self.host ('127.0.0.1') - port - port to listen on for client conns (default self.port (7656) - i2cphost - host to talk to i2cp on (default self.i2cphost ('127.0.0.1')) - i2cpport - port to talk to i2cp on (default self.i2cphost ('127.0.0.1')) """ # create an I2PClient object if none given if i2pclient is None: i2pclient = I2PClient() self.i2pclient = i2pclient # get optional host/port for client and i2cp self.host = kw.get('host', self.host) self.port = int(kw.get('port', self.port)) self.i2cphost = kw.get('i2cphost', self.i2cphost) self.i2cpport = int(kw.get('i2cpport', self.i2cpport)) # create record of current sessions, and a lock for it self.sessions = {} self.sessionsLock = threading.Lock() self.streams = {} self.streamsLock = threading.Lock() self.samNextId = 1 self.samNextIdLock = threading.Lock() # and create the server try: ThreadingTCPServer.__init__( self, (self.host, self.port), I2PSamClientHandler) except: log(4, "crashed with host=%s, port=%s" % (self.host, self.port)) raise def run(self): """ Run the SAM server. when connections come in, they are automatically accepted, and an L{I2PClientHandler} object created, and its L{handle} method invoked. """ log(4, "Listening for client requests on %s:%s" % (self.host, self.port)) self.serve_forever() def finish_request(self, request, client_address): """Finish one request by instantiating RequestHandlerClass.""" try: self.RequestHandlerClass(request, client_address, self) except: pass log(3, "Client session terminated") def samAllocId(self): """ Allocates a new unique id as required by SAM protocol """ self.samNextIdLock.acquire() id = self.samNextId self.samNextId += 1 self.samNextIdLock.release() return id class I2PSamClientHandler(StreamRequestHandler): r""" Manages a single socket connection from a client. When a client connects to the SAM server, the I2PSamServer object creates an instance of this class, and invokes its handle method. See L{handle}. Note that if a client terminates its connection to the server, the server will destroy all current connections initiated by that client Size values are decimal Connection is persistent """ @others def handle(self): """ Reads command/data messages from SAM Client, executes these, and sends back responses. Plants callback hooks into I2PSession objects, so that when data arrives via I2P, it can be immediately sent to the client. """ self.localsessions = {} self.globalsessions = self.server.sessions self.localstreams = {} # keyed by sam stream id self.globalstreams = self.server.streams self.samSessionIsOpen = 0 self.samSessionStyle = '' # localise the id allocator self.samAllocId = self.server.samAllocId # need a local sending lock self.sendLock = threading.Lock() log(5, "Got req from %s" % repr(self.client_address)) try: self.namingService = i2p.client.naming.HostsTxtNamingService() except: logException(2, "Failed to create naming service object") try: while 1: # get req req = self.rfile.readline().strip() flds = [s.strip() for s in req.split(" ")] cmd = flds[0] if cmd in ['HELLO', 'SESSION', 'STREAM', 'DATAGRAM', 'RAW', 'NAMING', 'DEST']: topic, subtopic, args = self.samParse(flds) method = getattr(self, "on_"+cmd, None) method(topic, subtopic, args) else: method = getattr(self, "on_"+cmd, None) if method: method(flds) else: # bad shit self.wfile.write("error unknown command '%s'\n" % cmd) except IOError: log(3, "Client connection terminated") except ValueError: pass except: logException(4, "Client req handler crashed") self.wfile.write("error\n") # clean up sessions for dest in self.localsessions.keys(): if dest in self.globalsessions.keys(): log(4, "forgetting global dest %s" % dest[:30]) del self.globalsessions[dest] self.finish() #thread.exit() def on_genkeys(self, flds): log(4, "entered") server = self.server client = server.i2pclient globalsessions = server.sessions sessionsLock = server.sessionsLock read = self.rfile.read readline = self.rfile.readline write = self.wfile.write flush = self.wfile.flush # genkeys try: dest = I2PDestination() priv = dest.toBase64Private() pub = dest.toBase64() write("ok %s %s\n" % (pub, priv)) except: write("error exception\n") def on_createsession(self, flds): log(4, "entered") server = self.server client = server.i2pclient globalsessions = server.sessions sessionsLock = server.sessionsLock read = self.rfile.read readline = self.rfile.readline write = self.wfile.write flush = self.wfile.flush sessionsLock.acquire() try: b64priv = flds[1] # spit if someone else already has this dest if b64priv in globalsessions.keys(): write("error dest in use\n") elif b64priv in self.localsessions.keys(): # duh, already open locally, treat as ok write("ok\n") else: # whole new session - set it up dest = I2PDestination(base64private=b64priv) log(4, "Creating session on dest '%s'" % b64priv[:40]) session = client.createSession(dest) log(4, "Connecting session on dest '%s'" % b64priv[:40]) session.connect() log(4, "Session on dest '%s' now live" % b64priv[:40]) # and remember it self.localsessions[b64priv] = session globalsessions[b64priv] = session # and tell the client the good news write("ok\n") except: logException(4, "createsession fail") write("error exception\n") sessionsLock.release() def on_destroysession(self, flds): log(4, "entered") server = self.server client = server.i2pclient globalsessions = server.sessions sessionsLock = server.sessionsLock read = self.rfile.read readline = self.rfile.readline write = self.wfile.write flush = self.wfile.flush sessionsLock.acquire() try: b64priv = flds[1] # spit if session not known if not globalsessions.has_key(b64priv): # no such session presently exists anywhere write("error nosuchsession\n") elif not self.localsessions.has_key(b64priv): # session exists, but another client owns it write("error notyoursession\n") else: # session exists and we own it session = self.localsessions[b64priv] del self.localsessions[b64priv] del globalsessions[b64priv] try: session.destroySession() write("ok\n") except: raise except: logException(4, "destroy session failed") write("error exception\n") sessionsLock.release() log(4, "done") def on_send(self, flds): #log(4, "entered: %s" % repr(flds)) log(4, "entered") server = self.server client = server.i2pclient globalsessions = server.sessions sessionsLock = server.sessionsLock read = self.rfile.read readline = self.rfile.readline write = self.wfile.write flush = self.wfile.flush sessionsLock.acquire() session = None try: size = int(flds[1]) b64priv = flds[2] b64peer = flds[3] msg = self._recvbytes(size) # spit if session not known if not globalsessions.has_key(b64priv): # no such session presently exists anywhere log(4, "no such session") write("error nosuchsession\n") elif not self.localsessions.has_key(b64priv): # session exists, but another client owns it write("error notyoursession\n") else: session = self.localsessions[b64priv] except: logException(2, "Send exception") write("error exception on send command\n") sessionsLock.release() if not session: return # now get/instantiate the remote dest try: peerDest = I2PDestination(base64=b64peer) except: peerDest = None logException(2, "Send: bad remote dest") write("error bad remote dest\n") if not peerDest: return # and do the send try: res = session.sendMessage(peerDest, msg) except: logException(2, "Send: failed") write("error exception on send\n") res = None if res is None: return # report result if res: write("ok\n") else: write("error send failed\n") log(4, "done") def on_receive(self, flds): log(4, "entered") server = self.server client = server.i2pclient globalsessions = server.sessions sessionsLock = server.sessionsLock read = self.rfile.read readline = self.rfile.readline write = self.wfile.write flush = self.wfile.flush sessionsLock.acquire() session = None try: b64priv = flds[1] # spit if session not known if not globalsessions.has_key(b64priv): # no such session presently exists anywhere write("error nosuchsession\n") elif not self.localsessions.has_key(b64priv): # session exists, but another client owns it write("error notyoursession\n") else: session = self.localsessions[b64priv] except: logException(4, "receive command error") write("error exception on receive command\n") sessionsLock.release() if not session: log(4, "no session matching privdest %s" % b64priv[:30]) return # does this session have any received data? if session.numMessages() > 0: msg = session.getMessage() write("ok %s\n%s" % (len(msg), msg)) else: write("ok 0\n") log(4, "done") return def on_HELLO(self, topic, subtopic, args): """ Responds to client PING """ log(4, "entered") self.samSend("HELLO", "PONG") log(4, "responded to HELLO") def on_SESSION(self, topic, subtopic, args): log(4, "entered") server = self.server client = server.i2pclient globalsessions = server.sessions localsessions = self.localsessions sessionsLock = server.sessionsLock read = self.rfile.read readline = self.rfile.readline write = self.wfile.write flush = self.wfile.flush if subtopic == 'CREATE': if self.samSessionIsOpen: self.samSend("SESSION", "STATUS", RESULT="I2P_ERROR", MESSAGE="Session_already_created", ) return # get/validate STYLE arg style = self.samSessionStyle = args.get('STYLE', None) if style is None: self.samSend("SESSION", "STATUS", RESULT="I2P_ERROR", MESSAGE="Missing_STYLE_argument", ) return elif style not in ['STREAM', 'DATAGRAM', 'RAW']: self.samSend("SESSION", "STATUS", RESULT="I2P_ERROR", MESSAGE="Invalid_STYLE_argument_'%s'" % style, ) return # get/validate DESTINATION arg dest = args.get('DESTINATION', None) if dest == 'TRANSIENT': # create new temporary dest dest = self.samDest = I2PDestination() destb64 = dest.toBase64Private() else: # make sure dest isn't globally or locally known if dest in globalsessions.keys() or dest in localsessions.keys(): self.samSend("SESSION", "STATUS", RESULT="DUPLICATED_DEST", MESSAGE="Destination_'%s...'_already_in_use" % dest[:20], ) return # try to reconstitute dest from given base64 try: destb64 = dest dest = I2PDestination(base64private=dest) except: self.samSend("SESSION", "STATUS", RESULT="INVALID_KEY", MESSAGE="Bad_destination_base64_string_'%s...'" % destb64[:20], ) return # got valid dest now self.dest = dest self.samDestPub = dest.toBase64() if style in ['RAW', 'DATAGRAM']: if style == 'DATAGRAM': # we need to know how big binary pub dests and sigs self.samDestPubBin = dest.toBin() self.samDestPubBinLen = len(self.samDestPubBin) self.samSigLen = len(self.dest.sign("nothing")) log(4, "binary pub dests are %s bytes, sigs are %s bytes" % ( self.samDestPubBinLen, self.samSigLen)) i2cpHost = args.get('I2CP.HOST', server.i2cphost) i2cpPort = int(args.get('I2CP.PORT', server.i2cpport)) # both these styles require an I2PSession object session = client.createSession(dest, host=i2cpHost, port=i2cpPort) # plug in our inbound message handler session.on_message = self.on_message log(4, "Connecting session on dest '%s'" % destb64[:40]) try: session.connect() except net.i2p.client.I2PSessionException: self.samSend("SESSION", "STATUS", RESULT="I2P_ERROR", MESSAGE="Failed_to_connect_to_i2cp_port", ) logException(3, "Failed to connect I2PSession") return log(4, "Session on dest '%s' now live" % destb64[:40]) # and remember it localsessions[destb64] = session globalsessions[destb64] = session self.samSession = session else: # STREAM # no need to create session object, because we're using streaming api log(4, "Creating STREAM session") # what kind of stream? direction = args.get('DIRECTION', 'BOTH') if direction not in ['BOTH', 'RECEIVE', 'CREATE']: self.samSend("SESSION", "STATUS", RESULT="I2P_ERROR", MESSAGE="Illegal_direction_keyword_%s" % direction.replace(" ","_"), ) return if direction == 'BOTH': self.canConnect = 1 self.canAccept = 1 elif direction == 'RECEIVE': self.canConnect = 0 self.canAccept = 1 elif direction == 'CREATE': self.canConnect = 1 self.canAccept = 0 # but we do need to mark it as being in use localsessions[destb64] = globalsessions[destb64] = None # make a local socket sock = self.samSock = I2PSocket(dest) # and we also need to fire up a socket listener, if not CREATE-only if self.canAccept: thread.start_new_thread(self.threadSocketListener, (sock, dest)) # finally, we can reply with the good news self.samSend("SESSION", "STATUS", RESULT="OK", ) else: # subtopic != CREATE self.samSend("SESSION", "STATUS", RESULT="I2P_ERROR", MESSAGE="Invalid_command_'SESSION_%s'" % subtopic, ) return def on_SESSION_CREATE(self, topic, subtopic, args): log(4, "entered") server = self.server client = server.i2pclient globalsessions = server.sessions localsessions = self.localsessions sessionsLock = server.sessionsLock read = self.rfile.read readline = self.rfile.readline write = self.wfile.write flush = self.wfile.flush def on_STREAM(self, topic, subtopic, args): log(4, "entered") server = self.server client = server.i2pclient globalsessions = server.sessions sessionsLock = server.sessionsLock read = self.rfile.read readline = self.rfile.readline write = self.wfile.write flush = self.wfile.flush if subtopic == 'CONNECT': # who are we connecting to again? remdest = I2PDestination(base64=args['DESTINATION']) id = int(args['ID']) try: log(4, "Trying to connect to remote peer %s..." % args['DESTINATION']) sock = self.samSock.connect(remdest) log(4, "Connected to remote peer %s..." % args['DESTINATION']) self.localstreams[id] = sock self.samSend("STREAM", "STATUS", RESULT='OK', ID=id, ) thread.start_new_thread(self.threadSocketReceiver, (sock, id)) except: log(4, "Failed to connect to remote peer %s..." % args['DESTINATION']) self.samSend("STREAM", "STATUS", RESULT='I2P_ERROR', MESSAGE='exception_on_connect', ID=id, ) elif subtopic == 'SEND': # send to someone id = int(args['ID']) try: sock = self.localstreams[id] sock.send(args['DATA']) except: logException(4, "send failed") def on_DATAGRAM(self, topic, subtopic, args): r""" DATAGRAM SEND DESTINATION=$base64key SIZE=$numBytes\n[$numBytes of data] All datagram messages have a signature/hash header, formatted as: - sender's binary public dest - S(H(sender_bin_pubdest + recipient_bin_pubdest + msg)) """ log(4, "entered") # at this stage of things, we don't know how to handle anything except SEND if subtopic != 'SEND': log(3, "Got illegal subtopic '%s' in DATAGRAM command" % subtopic) return # get the details peerdestb64 = args['DESTINATION'] peerdest = I2PDestination(base64=peerdestb64) peerdestBin = base64dec(peerdestb64) data = args['DATA'] # make up the header log(4, "samDestPubBin (%s) %s" % (type(self.samDestPubBin), repr(self.samDestPubBin))) log(4, "peerdestBin (%s) %s" % (type(peerdestBin), repr(peerdestBin))) log(4, "data (%s) %s" % (type(data), repr(data))) hashed = shahash(self.samDestPubBin + peerdestBin + data) log(4, "hashed=%s" % repr(hashed)) sig = self.dest.sign(hashed) log(4, "sig=%s" % repr(sig)) hdr = self.samDestPubBin + sig # send the thing self.samSession.sendMessage(peerdest, hdr + data) def on_RAW(self, topic, subtopic, args): r""" RAW SEND DESTINATION=$base64key SIZE=$numBytes\n[$numBytes of data] """ log(4, "entered") # at this stage of things, we don't know how to handle anything except SEND if subtopic != 'SEND': return # get the details peerdest = I2PDestination(base64=args['DESTINATION']) msg = args['DATA'] # send the thing self.samSession.sendMessage(peerdest, msg) def on_NAMING(self, topic, subtopic, args): log(4, "entered: %s %s %s" % (repr(topic), repr(subtopic), repr(args))) # at this stage of things, we don't know how to handle anything except LOOKUP if subtopic != 'LOOKUP': return # get the details host = args['NAME'] log(4, "looking up host %s" % host) # try to lookup jdest = self.namingService.lookup(host) if not jdest: log(4, "host %s not found" % host) self.samSend("NAMING", "REPLY", RESULT="KEY_NOT_FOUND", NAME=host, ) return try: b64 = I2PDestination(dest=jdest).toBase64() self.samSend("NAMING", "REPLY", RESULT="OK", NAME=host, VALUE=b64, ) log(4, "host %s found and valid key returned" % host) return except: log(4, "host %s found but key invalid" % host) self.samSend("NAMING", "REPLY", RESULT="INVALID_KEY", NAME=host, ) def on_DEST(self, topic, subtopic, args): log(4, "Generating dest") dest = I2PDestination() priv = dest.toBase64Private() pub = dest.toBase64() log(4, "Sending dest to client") self.samSend("DEST", "REPLY", PUB=pub, PRIV=priv) log(4, "done") def on_message(self, msg): """ This callback gets plugged into the I2PSession object, so we can asychronously notify our client when stuff arrives """ if self.samSessionStyle == 'RAW': self.samSend("RAW", "RECEIVE", msg) elif self.samSessionStyle == 'DATAGRAM': # ain't so simple, we gotta rip and validate the header remdestBin = msg[:self.samDestPubBinLen] log(4, "remdestBin=%s" % repr(remdestBin)) sig = msg[self.samDestPubBinLen:self.samDestPubBinLen+self.samSigLen] log(4, "sig=%s" % repr(sig)) data = msg[self.samDestPubBinLen+self.samSigLen:] log(4, "data=%s" % repr(data)) # now try to verify hashed = shahash(remdestBin + self.samDestPubBin + data) log(4, "hashed=%s" % repr(hashed)) remdest = I2PDestination(bin=remdestBin) if remdest.verify(hashed, sig): # fine - very good, pass it on log(4, "sig from peer is valid") self.samSend("DATAGRAM", "RECEIVE", data, DESTINATION=remdest.toBase64(), ) else: log(4, "DATAGRAM sig from peer is invalid") def threadSocketListener(self, sock, dest): """ Listens for incoming socket connections, and notifies the client accordingly """ destb64 = dest.toBase64() log(4, "Listening for connections to %s..." % destb64) sock.bind() sock.listen() while 1: log(4, "Awaiting next connection to %s..." % destb64) newsock = sock.accept() log(4, "Got connection to %s..." % destb64) # need an id, negative id = - self.server.samAllocId() # register it in local and global streams self.localstreams[id] = self.globalstreams[id] = newsock # fire up the receiver thread thread.start_new_thread(self.threadSocketReceiver, (newsock, id)) # who is connected to us? remdest = newsock.remdest remdest_b64 = remdest.toBase64() # and notify the client self.samSend("STREAM", "CONNECTED", DESTINATION=remdest_b64, ID=id) def samParse(self, flds): """ carves up a SAM command, returns it as a 3-tuple: - cmd - command string - subcmd - subcommand string - dargs - dict of args """ cmd = flds[0] subcmd = flds[1] args = flds[2:] dargs = {} for arg in args: try: name, val = arg.split("=", 1) except: logException(3, "failed to process %s in %s" % (repr(arg), repr(flds))) raise dargs[name] = val # read and add data if any if dargs.has_key('SIZE'): size = dargs['SIZE'] = int(dargs['SIZE']) dargs['DATA'] = self._recvbytes(size) #log(4, "\n".join([cmd+" "+subcmd] + [("%s=%s (...)" % (k,v[:40])) for k,v in dargs.items()])) log(4, "\n".join([cmd+" "+subcmd] + [("%s=%s (...)" % (k,v)) for k,v in dargs.items()])) return cmd, subcmd, dargs def samSend(self, topic, subtopic, data=None, **kw): """ Sends a SAM message (reply?) back to client Arguments: - topic - the first word in the reply, eg 'STREAM' - subtopic - the second word of the reply, eg 'CONNECTED' - data - a string of raw data to send back (optional) Keywords: - extra 'name=value' items to pass back. Notes: 1. SIZE is not required. If sending back data, it will be sized and a SIZE arg inserted automatically. 2. a dict of values can be passed to the 'args' keyword, in lieu of direct keywords. This allows for cases where arg names would cause python syntax clashes, eg 'tunnels.depthInbound' """ items = [topic, subtopic] # stick in SIZE if needed if data is not None: kw['SIZE'] = str(len(data)) else: data = '' # for later self.samCreateArgsList(kw, items) # and whack it together buf = " ".join(items) + '\n' + data # and ship it self.sendLock.acquire() try: self._sendbytes(buf) except: self.sendLock.release() raise self.sendLock.release() def samCreateArgsList(self, kw1, lst): for k,v in kw1.items(): if k == 'args': self.samCreateArgsList(v, lst) else: lst.append("=".join([str(k), str(v)])) def _sendbytes(self, raw): self.wfile.write(raw) self.wfile.flush() def _recvbytes(self, count): """ Does a guaranteed read of n bytes """ read = self.rfile.read chunks = [] needed = count while needed > 0: chunk = read(needed) chunklen = len(chunk) needed -= chunklen chunks.append(chunk) raw = "".join(chunks) # done return raw class NoPrivateKey(Exception): """Destination object has no private key""" class I2PSocketError(Exception): """Error working with I2PSocket objects""" def shahash(s): """ Calculates SHA Hash of a string, as a string, using I2P hashing facility """ h = net.i2p.crypto.SHA256Generator().calculateHash(s) h = bytearray2str(h.getData()) return h def base64enc(s): return net.i2p.data.Base64.encode(s) def base64dec(s): return bytearray2str(net.i2p.data.Base64.decode(s)) def str2bytearray(s): """ Convenience - converts python string to java-friendly byte array """ a = [] for c in s: n = ord(c) if n >= 128: n = n - 256 a.append(n) return a def bytearray2str(a): """ Convenience - converts java-friendly byte array to python string """ chars = [] for n in a: if n < 0: n += 256 chars.append(chr(n)) return "".join(chars) def byteoutstream2str(bs): """ Convenience - converts java-friendly byteoutputstream to python string """ chars = [] while 1: c = bs.read() if c >= 0: chars.append(chr(c)) else: break return "".join(chars) def dict2props(d): """ Converts a python dict d into a java.util.Properties object """ props = java.util.Properties() for k,v in d.items(): props[k] = str(v) return props def takeKey(somedict, keyname, default=None): """ Utility function to destructively read a key from a given dict. Same as the dict's 'takeKey' method, except that the key (if found) sill be deleted from the dictionary. """ if somedict.has_key(keyname): val = somedict[keyname] del somedict[keyname] else: val = default return val def log(level, msg, nPrev=0): # ignore messages that are too trivial for chosen verbosity if level > verbosity: return loglock.acquire() try: # rip the stack caller = traceback.extract_stack()[-(2+nPrev)] path, line, func = caller[:3] path = os.path.split(path)[1] full = "%s:%s:%s():\n* %s" % ( path, line, func, msg.replace("\n", "\n + ")) now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) msg = "%s %s\n" % (now, full) if logfile == sys.stdout: print msg else: file(logfile, "a").write(msg+"\n") except: s = StringIO.StringIO() traceback.print_exc(file=s) print s.getvalue() print "Logger crashed" loglock.release() def logException(level, msg=''): s = StringIO.StringIO() traceback.print_exc(file=s) log(level, "%s\n%s" % (s.getvalue(), msg), 1) def usage(detailed=0): print "Usage: %s <options> [<command>]" % sys.argv[0] if not detailed: print "Run with '-h' to get detailed help" sys.exit(0) print "I2PSAM is a bridge that allows I2P client programs to access the" print "I2P network by talking over a plaintext socket connection." print "References:" print " - http://www.freenet.org.nz/i2p - source, doco, downloadables" print " - http://drupal.i2p.net/node/view/144 - I2P SAM specification" print print "Options:" print " -h, -?, --help - display this help" print " -v, --version - print program version" print " -V, --verbosity=n - set verbosity to n, default 2, 1==quiet, 4==noisy" print " -H, --listenhost=host - specify host to listen on for client connections" print " -P, --listenport=port - port to listen on for client connections" print " --i2cphost=host - hostname of I2P router's I2CP interface" print " --i2cpport=port - port of I2P router's I2CP interface" print print "Commands:" print " (run with no commands to launch SAM server)" print " samserver - runs as a SAM server" print " test - run a suite of self-tests" print " testsocket - run only the socket test" print " testbidirsocket - run socket test in bidirectional mode" print sys.exit(0) def main(): argv = sys.argv argc = len(argv) # ------------------------------------------------- # do the getopt command line parsing try: opts, args = getopt.getopt(sys.argv[1:], "h?vV:H:P:", ['help', 'version', 'verbosity=', 'listenhost=', 'listenport=', 'i2cphost=', 'i2cpport=', ]) except: traceback.print_exc(file=sys.stdout) usage("You entered an invalid option") #print "args=%s" % args serveropts = {} for opt, val in opts: if opt in ['-h', '-?', '--help']: usage(1) elif opt in ['-v', '--version']: print "I2P SAM version %s" % version sys.exit(0) elif opt in ['-V', '--verbosity']: serveropts['verbosity'] = int(val) elif opt in ['-H', '--listenhost']: serveropts['host'] = val elif opt in ['-P', '--listenport']: serveropts['port'] = int(val) elif opt in ['--i2cphost']: serveropts['i2cphost'] = val elif opt in ['--i2cpport']: serveropts['i2cpport'] = int(val) else: usage(0) # -------------------------------------------------- # now run in required mode, default is 'samserver' if len(args) == 0: cmd = 'samserver' else: cmd = args[0] if cmd == 'samserver': log(2, "Running I2P SAM Server...") server = I2PSamServer(**serveropts) server.run() elif cmd == 'test': print "RUNNING full I2PSAM Jython TEST SUITE" testsigs() testdests() testsession() testsocket() elif cmd == 'testsocket': print "RUNNING SOCKET TEST" testsocket(0) elif cmd == 'testbidirsocket': print "RUNNING BIDIRECTIONAL SOCKET TEST" testsocket(1) else: # spit at unrecognised option usage(0) def testdests(): """ Demo function which tests out dest generation and import/export """ print print "********************************************" print "Testing I2P destination create/export/import" print "********************************************" print print "Generating a destination" d1 = I2PDestination() print "Exporting and importing dest1 in several forms" print "public binary string..." d1_bin = d1.toBin() d2_bin = I2PDestination(bin=d1_bin) print "public binary file..." d1.toBinFile("temp-d1-bin") d2_binfile = I2PDestination(binfile="temp-d1-bin") print "private binary string..." d1_binprivate = d1.toBinPrivate() d2_binprivate = I2PDestination(binprivate=d1_binprivate) print "private binary file..." d1.toBinFilePrivate("temp-d1-bin-private") d2_binfileprivate = I2PDestination(binfileprivate="temp-d1-bin-private") print "public base64 string..." d1_b64 = d1.toBase64() d2_b64 = I2PDestination(base64=d1_b64) print "public base64 file..." d1.toBase64File("temp-d1-b64") d2_b64file = I2PDestination(base64file="temp-d1-b64") print "private base64 string..." d1_base64private = d1.toBase64Private() d2_b64private = I2PDestination(base64private=d1_base64private) print "private base64 file..." d1.toBase64FilePrivate("temp-d1-b64-private") d2_b64fileprivate = I2PDestination(base64fileprivate="temp-d1-b64-private") print "All destination creation/import/export tests passed!" def testsigs(): global d1, d1pub, d1sig, d1res print print "********************************************" print "Testing I2P dest-based signatures" print "********************************************" print print "Creating dest..." d1 = I2PDestination() s_good = "original stuff that we're signing" s_bad = "non-original stuff we're trying to forge" print "Signing some shit against d1..." d1sig = d1.sign(s_good) print "Creating public dest d1pub" d1pub = I2PDestination(bin=d1.toBin()) print "Verifying original data with d1pub" res = d1pub.verify(s_good, d1sig) print "Result: %s (should be 1)" % repr(res) print "Trying to verify on a different string" res1 = d1pub.verify(s_bad, d1sig) print "Result: %s (should be 0)" % repr(res1) if res and not res1: print "signing/verifying test passed" else: print "SIGNING/VERIFYING TEST FAILED" def testsession(): global c, d1, d2, s1, s2 print print "********************************************" print "Testing I2P dest->dest messaging" print "********************************************" print print "Creating I2P client..." c = I2PClient() print "Creating destination d1..." d1 = c.createDestination() print "Creating destination d2..." d2 = c.createDestination() print "Creating destination d3..." d3 = c.createDestination() print "Creating session s1 on dest d1..." s1 = c.createSession(d1, host='localhost', port=7654) print "Creating session s2 on dest d2..." s2 = c.createSession(d2) print "Connecting session s1..." s1.connect() print "Connecting session s2..." s2.connect() print "Sending message from s1 to d2..." s1.sendMessage(d2, "Hi there, s2!!") print "Retrieving message from s2..." print "got: %s" % repr(s2.getMessage()) print "Sending second message from s1 to d2..." s1.sendMessage(d2, "Hi there again, s2!!") print "Retrieving message from s2..." print "got: %s" % repr(s2.getMessage()) print "Sending message from s1 to d3 (should take ages then fail)..." res = s1.sendMessage(d3, "This is futile!!") print "result of that send was %s (should have been 0)" % res print "Destroying session s1..." s1.destroySession() print "Destroying session s2..." s2.destroySession() print "session tests passed!" def testsocket(bidirectional=0): global d1, d2, s1, s2 print print "********************************************" print "Testing I2P streaming interface" print "********************************************" print print "Creating destinations..." dServer = I2PDestination() dClient = I2PDestination() print "Creating sockets..." sServer = I2PSocket(dServer) sClient = I2PSocket(dClient) # server thread which simply reads a line at a time, then echoes # that line back to the client def servThread(s): print "server: binding socket" s.bind() print "server: setting socket to listen" s.listen() print "server: awaiting connection" sock = s.accept() print "server: got connection" sock.send("Hello, echoing...\n") buf = '' while 1: c = sock.recv(1) if c == '': sock.close() print "server: socket closed" break buf += c if c == '\n': sock.send("SERVER: "+buf) buf = '' # client thread which reads lines and prints them to stdout def clientThread(s): buf = '' while 1: c = s.recv(1) if c == '': s.close() print "client: socket closed" break buf += c if c == '\n': print "client: got %s" % repr(buf) buf = '' print "launching server thread..." thread.start_new_thread(servThread, (sServer,)) if bidirectional: # dummy thread which accepts connections TO client socket def threadDummy(s): print "dummy: listening" s.listen() print "dummy: accepting" sock = s.accept() print "dummy: got connection" print "test - launching dummy client accept thread" thread.start_new_thread(threadDummy, (sClient,)) print "client: trying to connect" sClient.connect(dServer) print "client: connected, launching rx thread" thread.start_new_thread(clientThread, (sClient,)) while 1: line = raw_input("Enter something (q to quit)> ") if line == 'q': print "closing client socket" sClient.close() break sClient.send(line+"\n") print "I2PSocket test apparently succeeded" if __name__ == '__main__': main() @first #!/usr/bin/env python """ Implements a client API for I2CP messaging via SAM Very simple I2P messaging interface, which should prove easy to reimplement in your language of choice This module can be used from cpython or jython Run this module without arguments to see a demo in action (requires SAM server to be already running) """ @others import sys, os, socket, thread, threading, Queue, traceback, StringIO, time from pdb import set_trace # ----------------------------------------- # server access settings i2psamhost = '127.0.0.1' i2psamport = 7656 # ------------------------------------------ # logging settings # 1=v.quiet, 2=normal, 3=verbose, 4=debug, 5=painful verbosity = 5 # change to a filename to log there instead logfile = sys.stdout # when set to 1, and when logfile != sys.stdout, log msgs are written # both to logfile and console stdout log2console = 1 # don't touch this! loglock = threading.Lock() class I2PServerFail(Exception): """ A failure in connecting to the I2CP server """ class I2PCommandFail(Exception): """ A failure in an I2CP command """ pass class I2PStreamClosed(Exception): """ Stream is not open """ class I2PSamClient: """ Implements a reference client for accessing I2CP via i2psam Connects to i2psam's I2PSamServer, sends commands and receives results The primitives should be reasonably self-explanatory Usage summary: 1. create one or more I2PSamClient instances per process (1 should be fine) 2. invoke the L{genkeys} method to create destination keypairs 3. create sessions objects via the L{createSession} method 4. use these session objects to send and receive data 5. destroy the session objects when you're done Refer to the function L{demo} for a simple example """ @others # server host/port settings exist here in case you might # have a reason for overriding in a subclass host = i2psamhost port = i2psamport i2cpHost = None i2cpPort = None def __init__(self, **kw): """ Creates a client connection to i2psam listener Keywords: - host - host to connect to (default 127.0.0.1) - port - port to connect to (default 7656) """ # get optional host/port log(4, "entered") self.host = kw.get('host', self.host) self.port = int(kw.get('port', self.port)) self.cmdLock = threading.Lock() self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.lockHello = threading.Lock() self.sendLock = threading.Lock() self.qNewDests = Queue.Queue() self.qSession = Queue.Queue() self.qDatagrams = Queue.Queue() self.qRawMessages = Queue.Queue() self.namingReplies = {} self.namingCache = {} self.streams = {} # currently open streams, keyed by id self.streamConnectReplies = {} # holds queues awaiting connect resp, keyed by id self.qNewStreams = Queue.Queue() # incoming connections self.samNextIdLock = threading.Lock() self.samNextId = 1 self.isRunning = 1 log(4, "trying connection to SAM server...") try: self.sock.connect((self.host, self.port)) except: raise I2PServerFail( "Connection to i2psam server failed\n" "(are you sure your I2P router is running, and\n" "listening for I2CP connections on %s:%s?)" % (self.host, self.port) ) # fire up receiver thread thread.start_new_thread(self.threadRx, ()) # ping the server try: log(4, "trying to ping SAM server...") self.samHello() except: logException(4, "Exception on handshaking") raise I2PServerFail("Failed to handshake with i2psam server") # connected fine log(2, "I2CP Client successfully connected") def createSession(self, privdest): """ DEPRECATED - use sam* methods instead! Creates a session using private destkey """ #3. createsession: # - client->server: # - createsession <base64private>\n # - server->client: # - ok\n OR # - error[ <reason>]\n self.cmdLock.acquire() try: self._sendline("createsession %s" % privdest) respitems = self._recvline().split(" ", 1) if respitems[0] == 'ok': res = None else: res = respitems[1] except: logException(2, "createsession fail") self.cmdLock.release() raise self.cmdLock.release() if res: raise I2PCommandFail("createsession fail: "+res) return I2PRemoteSession(self, privdest) def destroySession(self, privdest): """ DEPRECATED - use sam* methods instead! Destrlys a session using private destkey """ #4. destroysession: # - client->server: # - destroysession <base64private>\n # - server->client: # - ok\n OR # - error[ <reason>]\n self.cmdLock.acquire() try: self._sendline("destroysession %s" % privdest) respitems = self._recvline().split(" ", 1) if respitems[0] == 'ok': res = None else: res = respitems[1] except: logException(2, "destroysession fail") self.cmdLock.release() raise self.cmdLock.release() if res: raise I2PCommandFail("destroysession fail: " + res) return res def send(self, privdest, peerdest, msg): """ DEPRECATED - use sam* methods instead! Sends a block of data from local dest to remote dest """ #5. send: # - client->server: # - send <size> <localbase64private> <remotebase64dest>\ndata # - server->client: # - ok\n OR # - error[ <reason>]\n self.cmdLock.acquire() try: self._sendline("send %s %s %s" % (len(msg), privdest, peerdest)) self._sendbytes(msg) line = self._recvline() #print "** %s" % line respitems = line.split(" ", 1) if respitems[0] == 'ok': res = None else: res = " ".join(respitems[1:]) except: logException(2, "send fail") self.cmdLock.release() raise self.cmdLock.release() if res: raise I2PCommandFail("send fail: " + res) return res def receive(self, privdest): """ DEPRECATED - use sam* methods instead! receives a block of data, returning string, or None if no data available """ #6. receive: # - client->server: # - receive <localbase64private>\n # - server->client: # - ok <size>\ndata OR # - error[ <reason>]\n self.cmdLock.acquire() try: self._sendline("receive %s" % privdest) respitems = self._recvline().split(" ", 1) if respitems[0] == 'ok': res = None size = int(respitems[1]) msg = self._recvbytes(size) res = None else: res = respitems[1] except: logException(2, "receive fail") self.cmdLock.release() raise self.cmdLock.release() if res: raise I2PCommandFail("destroysession fail: " + res) return msg def samHello(self): """ Sends a quick HELLO PING to SAM server and awaits response Arguments: - none Keywords: - none Returns: - nothing (None) if ping sent and pong received, or raises an exception if failed """ self.lockHello.acquire() self.samSend("HELLO", "PING") self.lockHello.acquire() self.lockHello.release() def samSessionCreate(self, style, dest, **kw): """ Creates a SAM session Arguments: - style - one of 'STREAM', 'DATAGRAM' or 'RAW' - dest - base64 private destination Keywords: - direction - only used for STREAM sessions, can be RECEIVE, CREATE or BOTH (default BOTH) - i2cphost - hostname for the SAM bridge to contact i2p router on - i2cpport - port for the SAM bridge to contact i2p router on Returns: - 'OK' if session was created successfully, or a tuple (keyword, message) if not """ kw1 = dict(kw) kw1['STYLE'] = self.samStyle = style kw1['DESTINATION'] = dest if style == 'STREAM': direction = kw.get('direction', 'BOTH') kw1['DIRECTION'] = direction if direction == 'BOTH': self.canAccept = 1 self.canConnect = 1 elif direction == 'RECEIVE': self.canAccept = 1 self.canConnect = 0 elif direction == 'CREATE': self.canAccept = 0 self.canConnect = 1 else: raise I2PCommandFail("direction keyword must be one of RECEIVE, CREATE or BOTH") # stick in i2cp host/port if specified if kw.has_key('i2cphost'): kw1['I2CP.HOST'] = kw['i2cphost'] if kw.has_key('i2cpport'): kw1['I2CP.PORT'] = kw['i2cpport'] self.samSend("SESSION", "CREATE", **kw1) subtopic, args = self.qSession.get() if args['RESULT'] == 'OK': return 'OK' else: return (args['RESULT'], args['MESSAGE']) def samDestGenerate(self): """ Creates a whole new dest and returns an tuple pub, priv as base64 public and private destination keys """ self.samSend("DEST", "GENERATE") pub, priv = self.qNewDests.get() return pub, priv def samRawSend(self, peerdest, msg): """ Sends a raw anon message to another peer peerdest is the public base64 destination key of the peer """ self.samSend("RAW", "SEND", msg, DESTINATION=peerdest, ) def samRawCheck(self): """ Returns 1 if there are received raw messages available, 0 if not """ return not self.qRawMessages.empty() def samRawReceive(self, blocking=1): """ Returns the next raw message available, blocking if none is available and the blocking arg is set to 0 If blocking is 0, and no messages are available, returns None. Remember that you can check for availability with the .samRawCheck() method """ if not blocking: if self.qRawMessages.empty(): return None return self.qRawMessages.get() def samDatagramSend(self, peerdest, msg): """ Sends a repliable datagram message to another peer peerdest is the public base64 destination key of the peer """ self.samSend("DATAGRAM", "SEND", msg, DESTINATION=peerdest, ) def samDatagramCheck(self): """ Returns 1 if there are datagram messages received messages available, 0 if not """ return not self.qDatagrams.empty() def samDatagramReceive(self, blocking=1): """ Returns the next datagram message available, blocking if none is available. If blocking is set to 0, and no messages are available, returns None. Remember that you can check for availability with the .samRawCheck() method Returns 2-tuple: dest, msg where dest is the base64 destination of the peer from whom the message was received """ if not blocking: if self.qDatagrams.empty(): return None return self.qDatagrams.get() def samNamingLookup(self, host): """ Looks up a host in hosts.txt """ # try the cache first if self.namingCache.has_key(host): log(4, "found host %s in cache" % host) return self.namingCache[host] # make a queue for reply q = self.namingReplies[host] = Queue.Queue() # send off req self.samSend("NAMING", "LOOKUP", NAME=host, ) # get resp resp = q.get() result = resp.get('RESULT', 'none') if result == 'OK': log(4, "adding host %s to cache" % host) val = resp['VALUE'] self.namingCache[host] = val return val else: raise I2PCommandFail("Error looking up '%s': %s %s" % ( host, result, resp.get('MESSAGE', ''))) def samParse(self, flds): """ carves up a SAM command, returns it as a 3-tuple: - cmd - command string - subcmd - subcommand string - dargs - dict of args """ cmd = flds[0] subcmd = flds[1] args = flds[2:] dargs = {} for arg in args: try: name, val = arg.split("=", 1) except: logException(3, "failed to process %s in %s" % (repr(arg), repr(flds))) raise dargs[name] = val # read and add data if any if dargs.has_key('SIZE'): size = dargs['SIZE'] = int(dargs['SIZE']) dargs['DATA'] = self._recvbytes(size) #log(4, "\n".join([cmd+" "+subcmd] + [("%s=%s (...)" % (k,v[:40])) for k,v in dargs.items()])) log(4, "\n".join([cmd+" "+subcmd] + [("%s=%s (...)" % (k,v)) for k,v in dargs.items()])) return cmd, subcmd, dargs def samSend(self, topic, subtopic, data=None, **kw): """ Sends a SAM message (reply?) back to client Arguments: - topic - the first word in the reply, eg 'STREAM' - subtopic - the second word of the reply, eg 'CONNECTED' - data - a string of raw data to send back (optional) Keywords: - extra 'name=value' items to pass back. Notes: 1. SIZE is not required. If sending back data, it will be sized and a SIZE arg inserted automatically. 2. a dict of values can be passed to the 'args' keyword, in lieu of direct keywords. This allows for cases where arg names would cause python syntax clashes, eg 'tunnels.depthInbound' """ items = [topic, subtopic] # stick in SIZE if needed if data is not None: kw['SIZE'] = str(len(data)) else: data = '' # for later self.samCreateArgsList(kw, items) # and whack it together buf = " ".join(items) + '\n' + data # and ship it self.sendLock.acquire() try: self._sendbytes(buf) except: self.sendLock.release() raise self.sendLock.release() def samCreateArgsList(self, kw1, lst): for k,v in kw1.items(): if k == 'args': self.samCreateArgsList(v, lst) else: lst.append("=".join([str(k), str(v)])) def threadRx(self): """ Handles all incoming stuff from SAM, storing in local queues as appropriate """ while self.isRunning: try: log(4, "Awaiting next message from server") line = self._recvline() if line == '': log(3, "I2P server socket closed") return flds = line.split(" ") topic, subtopic, args = self.samParse(flds) log(4, "Got %s %s %s" % (topic, subtopic, args)) handleMsg = getattr(self, "on_"+topic, None) if handleMsg: handleMsg(topic, subtopic, args) else: log(2, "No handler for '%s' message" % topic) except: #logException(3, "Exception handling %s %s\n%s" % (topic, subtopic, args)) logException(3, "Exception handling %s" % repr(line)) def on_HELLO(self, topic, subtopic, args): """ Handles HELLO PONG messages from server """ # just wake up the caller log(4, "got HELLO") self.lockHello.release() def on_SESSION(self, topic, subtopic, args): """ Handles SESSION messages from server """ # just stick whatever on the queue and wake up the caller res = subtopic, args self.qSession.put(res) def on_STREAM(self, topic, subtopic, args): """ Handles STREAM messages from server STREAM STATUS RESULT=$result ID=$id [MESSAGE=...] STREAM CONNECTED DESTINATION=$base64key ID=$id STREAM RECEIVED ID=$id SIZE=$numBytes\n[$numBytes of data] STREAM CLOSED RESULT=$result ID=$id [MESSAGE=...] """ log(4, "got %s %s %s" % (topic, subtopic, args)) # which stream? id = int(args['ID']) # result of prior connection attempt if subtopic == 'STATUS': # stick it on the queue that the caller is waiting on and let the # caller interpret the result self.streamConnectReplies[id].put(args) return # notice of incoming connection if subtopic == 'CONNECTED': # grab details dest = args['DESTINATION'] # wrap it in a stream obj conn = I2PSAMStream(self, id, dest) self.streams[id] = conn # and put it there for anyone calling samStreamAccept() self.qNewStreams.put(conn) # done return # notice of received data elif subtopic == 'RECEIVED': # grab details data = args['DATA'] # lookup the connection conn = self.streams.get(id, None) if not conn: # conn not known, just ditch log(2, "got data, but don't recall any conn with id %s" % id) return # and post the received data conn._notifyIncomingData(data) log(4, "wrote data to conn's inbound queue") # done return elif subtopic == 'CLOSED': # lookup the connection conn = self.streams.get(id, None) if not conn: # conn not known, just ditch return # mark conn as closed and forget it conn._notifyIncomingData("") # special signal to close conn.isOpen = 0 del self.streams[id] # done return def on_DATAGRAM(self, topic, subtopic, args): """ Handles DATAGRAM messages from server """ remdest = args['DESTINATION'] data = args['DATA'] self.qDatagrams.put((remdest, data)) def on_RAW(self, topic, subtopic, args): """ Handles RAW messages from server """ data = args['DATA'] log(3, "Got anonymous datagram %s" % repr(data)) self.qRawMessages.put(data) def on_NAMING(self, topic, subtopic, args): """ Handles NAMING messages from server """ # just find out hostname, and stick it on resp q host = args['NAME'] self.namingReplies[host].put(args) def on_DEST(self, topic, subtopic, args): """ Handles DEST messages from server """ pubkey = args['PUB'] privkey = args['PRIV'] res = pubkey, privkey self.qNewDests.put(res) def _recvline(self): """ Guaranteed read of a full line """ chars = [] while 1: c = self.sock.recv(1) if c in ['', '\n']: break chars.append(c) return "".join(chars) def _recvbytes(self, num): """ Guaranteed read of num bytes """ if num <= 0: return "" reqd = num chunks = [] while reqd > 0: chunk = self.sock.recv(reqd) if not chunk: raise I2PServerFail("Buffer read fail") chunks.append(chunk) reqd -= len(chunk) return "".join(chunks) def _sendbytes(self, buf): """ Guaranteed complete send of a buffer """ reqd = len(buf) while reqd > 0: nsent = self.sock.send(buf) if nsent == 0: raise I2PServerFail("Send to server failed") buf = buf[nsent:] reqd -= nsent def _sendline(self, line): """ just tacks on a newline and sends """ self._sendbytes(line+"\n") class I2PRemoteSession: """ DEPRECATED Wrapper for I2CP connections Do not instantiate this directly - it gets created by I2PSamClient.createSession() """ @others def __init__(self, client, dest): """ Do not instantiate this directly """ self.client = client self.dest = dest def send(self, peerdest, msg): """ """ return self.client.send(self.dest, peerdest, msg) def receive(self): return self.client.receive(self.dest) def destroy(self): return self.client.destroySession(self.dest) def log(level, msg, nPrev=0): # ignore messages that are too trivial for chosen verbosity if level > verbosity: return loglock.acquire() try: # rip the stack caller = traceback.extract_stack()[-(2+nPrev)] path, line, func = caller[:3] path = os.path.split(path)[1] full = "%s:%s:%s():\n* %s" % ( path, line, func, msg.replace("\n", "\n + ")) now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) msg = "%s %s\n" % (now, full) if logfile == sys.stdout: print msg else: file(logfile, "a").write(msg+"\n") except: s = StringIO.StringIO() traceback.print_exc(file=s) print s.getvalue() print "Logger crashed" loglock.release() def logException(level, msg=''): s = StringIO.StringIO() traceback.print_exc(file=s) log(level, "%s\n%s" % (s.getvalue(), msg), 1) def demoNAMING(): """ Demonstrates the NAMING service """ print "Starting SAM NAMING demo..." print print "Instantiating client connection..." c0 = I2PSamClient() print "Client connection created" for host in ['duck.i2p', 'nonexistent.i2p']: print "Sending query for host '%s'..." % host try: res = c0.samNamingLookup(host) print "query for %s returned:" % host print repr(res) except I2PCommandFail, e: print "got exception: %s" % repr(e.args) print print "---------------------------------" print "NAMING service tests succeeded" print "---------------------------------" print def demoRAW(): """ Runs a demo of SAM RAW messaging """ print "Starting SAM RAW demo..." print print "Instantiating client connections..." c1 = I2PSamClient() c2 = I2PSamClient() print "Creating dests via SAM" pub1, priv1 = c1.samDestGenerate() pub2, priv2 = c2.samDestGenerate() print "SAM Dests generated ok" print "Creating SAM RAW SESSION on connection c1..." res = c1.samSessionCreate("RAW", priv1) if res != 'OK': print "Failed to create session on connection c1: %s" % repr(res) return print "Session on connection c1 created successfully" print "Creating SAM SESSION on connection c2..." res = c2.samSessionCreate("RAW", priv2) if res != 'OK': print "Failed to create session on connection c2: %s" % repr(res) return print "Session on connection c2 created successfully" msg = "Hi there!" print "sending from c1 to c2: %s" % repr(msg) c1.samRawSend(pub2, msg) print "now try to receive from c2 (will block)..." msg1 = c2.samRawReceive() print "Connection c2 got %s" % repr(msg1) print print "---------------------------------" print "RAW data transfer tests succeeded" print "---------------------------------" print def demoDATAGRAM(): """ Runs a demo of SAM DATAGRAM messaging """ print "Starting SAM DATAGRAM demo..." print print "Instantiating 2 more client connections..." c3 = I2PSamClient() c4 = I2PSamClient() print "Creating more dests via SAM" pub3, priv3 = c3.samDestGenerate() pub4, priv4 = c4.samDestGenerate() print "Creating SAM DATAGRAM SESSION on connection c3..." res = c3.samSessionCreate("DATAGRAM", priv3) if res != 'OK': print "Failed to create DATAGRAM session on connection c3: %s" % repr(res) return print "DATAGRAM Session on connection c3 created successfully" print "Creating SAM DATAGRAM SESSION on connection c4..." res = c4.samSessionCreate("DATAGRAM", priv4) if res != 'OK': print "Failed to create DATAGRAM session on connection c4: %s" % repr(res) return print "Session on connection c4 created successfully" msg = "Hi there, this is a datagram!" print "sending from c3 to c4: %s" % repr(msg) c3.samDatagramSend(pub4, msg) print "now try to receive from c4 (will block)..." remdest, msg1 = c4.samDatagramReceive() print "Connection c4 got %s from %s..." % (repr(msg1), repr(remdest)) print print "--------------------------------------" print "DATAGRAM data transfer tests succeeded" print "--------------------------------------" print def demoSTREAM(): """ Runs a demo of SAM STREAM messaging """ print "Starting SAM STREAM demo..." print print "Instantiating client c6..." c6 = I2PSamClient() print "Creating dest for c6" pub6, priv6 = c6.samDestGenerate() print "Creating SAM STREAM SESSION on connection c6..." res = c6.samSessionCreate("STREAM", priv6, direction="RECEIVE") if res != 'OK': print "Failed to create STREAM session on connection c6: %s" % repr(res) return print "STREAM Session on connection c6 created successfully" print "Launching acceptor thread..." thread.start_new_thread(demoSTREAM_thread, (c6,)) #print "sleep a while and give the server a chance..." #time.sleep(10) print "----------------------------------------" print "Instantiating client c5..." c5 = I2PSamClient() print "Creating dest for c5" pub5, priv5 = c5.samDestGenerate() print "Creating SAM STREAM SESSION on connection c5..." res = c5.samSessionCreate("STREAM", priv5, direction="CREATE") if res != 'OK': print "Failed to create STREAM session on connection c5: %s" % repr(res) return print "STREAM Session on connection c5 created successfully" print "----------------------------------------" print "Making connection from c5 to c6..." #set_trace() try: conn_c5 = c5.samStreamConnect(pub6) except: print "Stream Connection failed" return print "Stream connect succeeded" print "Receiving from c5..." buf = conn_c5.readline() print "Got %s" % repr(buf) #print "Try to accept connection on c6..." #conn_c6 = c6.sam print print "--------------------------------------" print "DATAGRAM data transfer tests succeeded" print "--------------------------------------" print def demo(): """ This is a simple and straightforward demo of talking to the i2psam server socket via the I2PSamClient class. Read the source, Luke, it's never been so easy... """ print print "-----------------------------------------" print "Running i2psamclient demo..." print "-----------------------------------------" print #demoNAMING() #demoRAW() #demoDATAGRAM() demoSTREAM() print print "-----------------------------------------" print "Demo Finished" print "-----------------------------------------" return if __name__ == '__main__': demo() def samStreamConnect(self, dest): """ Makes a STREAM connection to a remote dest STREAM STATUS RESULT=$result ID=$id [MESSAGE=...] """ # need an ID id = self.samAllocId() # create queue for connect reply q = self.streamConnectReplies[id] = Queue.Queue() # send req self.samSend("STREAM", "CONNECT", ID=id, DESTINATION=dest, ) # await reply - comes back as a dict resp = q.get() # ditch queue del self.streamConnectReplies[id] del q # check out response result = resp['RESULT'] if result == 'OK': conn = I2PSAMStream(self, id, dest) self.streams[id] = conn return conn else: msg = resp.get('MESSAGE', '') raise I2PCommandFail(result, msg, "STREAM CONNECT") def samAllocId(self): """ Allocates a new unique id as required by SAM protocol """ self.samNextIdLock.acquire() id = self.samNextId self.samNextId += 1 self.samNextIdLock.release() return id class I2PSAMStream: """ Wrapper for a stream object """ @others def __init__(self, client, id, dest): """ """ self.client = client self.id = id self.dest = dest self.qIncomingData = Queue.Queue() self.inbuf = '' self.isOpen = 1 def _notifyIncomingData(self, data): """ Called by client receiver to notify incoming data """ log(4, "got %s" % repr(data)) self.qIncomingData.put(data) def samStreamSend(self, conn, data): """ DO NOT CALL THIS DIRECTLY Invoked by an I2PSAMStream object to transfer data Use the object's .send() method instead. conn is the I2PSAMStream STREAM SEND ID=$id SIZE=$numBytes\n[$numBytes of data] """ # dispatch self.samSend("STREAM", "SEND", data, ID=conn.id) # useless, but mimics socket paradigm return len(data) def send(self, data): """ Sends data to a stream connection """ # barf if stream not open if not self.isOpen: raise I2PStreamClosed # can send return self.client.samStreamSend(self, data) def samStreamClose(self, conn): """ DO NOT CALL DIRECTLY Invoked by I2PSAMStream to close stream Use the object's .send() method instead. STREAM CLOSE ID=$id """ self.samSend("STREAM", "CLOSE", ID=conn.id) del self.streams[conn.id] def recv(self, size): """ Retrieves n bytes from peer """ chunks = [] while self.isOpen and size > 0: # try internal buffer first if self.inbuf: chunk = self.inbuf[:size] chunklen = len(chunk) self.inbuf = self.inbuf[chunklen:] chunks.append(chunk) size -= chunklen else: # replenish input buffer log(4, "I2PSAMStream.recv: replenishing input buffer") buf = self.qIncomingData.get() if buf == '': # connection closed by peer self.isOpen = 0 break else: # got more data log(4, "I2PSAMStream: queue returned %s" % repr(buf)) self.inbuf += buf # return whatever we've got, hopefully all return "".join(chunks) def close(self): """ close this stream connection """ log(4, "closing stream") self.client.samStreamClose(self) log(4, "stream closed") self.isOpen = 0 # and just to make sure... self.qIncomingData.put("") # busts out of recv() loops def __del__(self): """ Dropping last ref to this object closes stream """ self.close() def demoSTREAM_thread(sess): while 1: sock = sess.samStreamAccept() log(4, "got incoming connection") print "**ACCEPTOR SLEEPING 10 secs BEFORE SENDING" time.sleep(10) sock.send("Hi there, what do you want?\n") print "**ACCEPTOR SLEEPING 5 MINS BEFORE CLOSING" time.sleep(300) print "**ACCEPTOR CLOSING STREAM" sock.close() def samStreamAccept(self): """ Waits for an incoming connection, returning a wrapped conn obj """ log(4, "waiting for connection") conn = self.qNewStreams.get() log(4, "got connection") return conn def threadSocketReceiver(self, sock, id): """ One of these gets launched each time a new stream connection is created. Due to the lack of callback mechanism within the ministreaming API, we have to actively poll for and send back received data """ while 1: #avail = sock.available() #if avail <= 0: # print "threadSocketReceiver: waiting for data on %s (%s avail)..." % (id, avail) # time.sleep(5) # continue #log(4, "reading a byte") try: buf = sock.recv(1) except: logException(4, "Exception reading first byte") if buf == '': log(4, "stream closed") # notify a close self.samSend("STREAM", "CLOSED", ID=id) return # grab more if there's any available navail = sock.available() if navail > 0: #log(4, "%d more bytes available, reading..." % navail) rest = sock.recv(navail) buf += rest # send if off log(4, "got from peer: %s" % repr(buf)) self.samSend("STREAM", "RECEIVED", buf, ID=id, ) def readline(self): """ Read a line of text from stream, return the line without trailing newline This method really shouldn't exist in a class that's trying to look a bit like a socket object, but what the hell! """ chars = [] while 1: char = self.recv(1) if char in ['', '\n']: break chars.append(char) return "".join(chars)