diff --git a/apps/sam/code.leo b/apps/sam/code.leo
index 52cd3703f6..8869595259 100644
--- a/apps/sam/code.leo
+++ b/apps/sam/code.leo
@@ -1,8 +1,8 @@
-
-
-
+
+
+
@@ -13,9 +13,9 @@
I2P SAM Server and Client
-@file jython/src/i2psam.py
-imports
-globals
+@file jython/src/i2psam.py
+imports
+globals
I2CP Interface Classes
class JavaWrapper
class I2PDestination
@@ -110,6 +110,7 @@
on_DEST
on_message
threadSocketListener
+threadSocketReceiver
samParse
samSend
samCreateArgsList
@@ -129,18 +130,18 @@
takeKey
log
logException
-usage
-main
-
Tests
testdests
testsigs
testsession
testsocket
+usage
+main
+
MAINLINE
-@file python/src/i2psamclient.py
+@file python/src/i2psamclient.py
imports
globals
exceptions
@@ -161,6 +162,10 @@
samDatagramSend
samDatagramCheck
samDatagramReceive
+samStreamConnect
+samStreamAccept
+samStreamSend
+samStreamClose
samNamingLookup
samParse
samSend
@@ -177,12 +182,22 @@
on_DEST
Utility Methods
+samAllocId
_recvline
_recvbytes
_sendbytes
_sendline
+class I2PSAMStream
+__init__
+send
+recv
+readline
+close
+__del__
+_notifyIncomingData
+
class I2PRemoteSession
__init__
send
@@ -196,6 +211,7 @@
demoRAW
demoDATAGRAM
demoSTREAM
+demoSTREAM_thread
demo
MAINLINE
@@ -244,15 +260,13 @@ import java
# i2p-specific imports
import net.i2p
import net.i2p.client # to shut up epydoc
-
-# shut up java with a few more imports
+#import net.i2p.client.I2PClient
+#import net.i2p.client.I2PClientFactory
+#import net.i2p.client.I2PSessionListener
+import net.i2p.client.naming
import net.i2p.client.streaming
import net.i2p.crypto
import net.i2p.data
-import net.i2p.client.I2PClient
-import net.i2p.client.I2PClientFactory
-import net.i2p.client.naming
-#import net.i2p.client.I2PSessionListener
# handy shorthand refs
i2p = net.i2p
@@ -285,7 +299,7 @@ i2cpPort = 7654
# logging settings
# 1=v.quiet, 2=normal, 3=verbose, 4=debug, 5=painful
-verbosity = 5
+verbosity = 2
# change to a filename to log there instead
logfile = sys.stdout
@@ -1017,23 +1031,28 @@ port = i2cpPort
self.dest = dest
if kw.has_key('sock') \
- and kw.has_key('dest') \
and kw.has_key('remdest') \
and kw.has_key('instream') \
and kw.has_key('outstream'):
+
# wrapping an accept()'ed connection
+ log(4, "accept()'ed a connection, wrapping...")
+
self.sock = kw['sock']
- self.dest = kw['dest']
+ self.dest = dest
self.remdest = kw['remdest']
self.instream = kw['instream']
self.outstream = kw['outstream']
else:
+ log(4, "creating new I2PSocket %s" % dest)
+
# process keywords
self.host = kw.get('host', self.host)
self.port = int(kw.get('port', self.port))
# we need a factory, don't we?
self.sockmgrFact = i2p.client.streaming.I2PSocketManagerFactory()
+
def bind(self, dest=None):
"""
@@ -1047,7 +1066,8 @@ port = i2cpPort
self.dest = dest
elif not self.dest:
# create new dest, client should interrogate it at some time
- self.dest = Destination()
+ log(4, "bind: socket has no dest, creating one")
+ self.dest = I2PDestination()
def listen(self, *args, **kw):
"""
@@ -1058,6 +1078,8 @@ port = i2cpPort
raise I2PSocketError(".sockmgr already present - have you already called listen?")
if not self.dest:
raise I2PSocketError("socket is not bound to a destination")
+
+ log(4, "listening on socket")
# create the socket manager
self._createSockmgr()
@@ -1090,6 +1112,10 @@ port = i2cpPort
def connect(self, remdest):
"""
Connects to a remote destination
+
+ This has one totally major difference from the normal socket
+ paradigm, and that is that you can have n outbound connections
+ to different dests.
"""
# sanity check
if self.sockmgr:
@@ -1097,6 +1123,7 @@ port = i2cpPort
# create whole new dest if none was provided to constructor
if self.dest is None:
+ log(4, "connect: creating whole new dest")
self.dest = I2PDestination()
# create the socket manager
@@ -1107,13 +1134,22 @@ port = i2cpPort
opts = net.i2p.client.streaming.I2PSocketOptions()
try:
- self.sock = self.sockmgr.connect(remdest._item, opts)
+ log(4, "trying to connect to %s" % remdest.toBase64())
+ sock = self.sock = self.sockmgr.connect(remdest._item, opts)
self.remdest = remdest
except:
logException(2, "apparent exception, continuing...")
- self.instream = self.sock.getInputStream()
- self.outstream = self.sock.getOutputStream()
+
+ self.instream = sock.getInputStream()
+ self.outstream = sock.getOutputStream()
+
+ sockobj = I2PSocket(dest=self.dest,
+ remdest=remdest,
+ sock=sock,
+ instream=self.instream,
+ outstream=self.outstream)
self._connected = 1
+ return sockobj
def recv(self, nbytes):
"""
@@ -1150,22 +1186,26 @@ port = i2cpPort
raise I2PSocketError("Socket is not connected")
# and write it out
- #print "send: writing '%s' to outstream..." % repr(buf)
+ log(4, "send: writing '%s' to outstream..." % repr(buf))
outstream = self.outstream
for c in buf:
outstream.write(ord(c))
# flush just in case
- #print "send: flushing..."
+ log(4, "send: flushing...")
self.outstream.flush()
- #print "send: done"
+ log(4, "send: done")
+
def available(self):
"""
Returns the number of bytes available for recv()
"""
- return self.sock.available()
+ #print "available: sock is %s" % repr(self.sock)
+
+ return self.instream.available()
+
def close(self):
@@ -1192,6 +1232,9 @@ port = i2cpPort
def _createSockmgr(self):
+ if getattr(self, 'sockmgr', None):
+ return
+
#options = {jI2PClient.PROP_TCP_HOST: self.host,
# jI2PClient.PROP_TCP_PORT: self.port}
options = {}
@@ -1328,6 +1371,9 @@ version = version
self.samSessionIsOpen = 0
self.samSessionStyle = ''
+ # localise the id allocator
+ self.samAllocId = self.server.samAllocId
+
# need a local sending lock
self.sendLock = threading.Lock()
@@ -1729,6 +1775,26 @@ version = version
else: # STREAM
# no need to create session object, because we're using streaming api
+ log(4, "Creating STREAM session")
+
+ # what kind of stream?
+ direction = args.get('DIRECTION', 'BOTH')
+ if direction not in ['BOTH', 'RECEIVE', 'CREATE']:
+ self.samSend("SESSION", "STATUS",
+ RESULT="I2P_ERROR",
+ MESSAGE="Illegal_direction_keyword_%s" % direction.replace(" ","_"),
+ )
+ return
+
+ if direction == 'BOTH':
+ self.canConnect = 1
+ self.canAccept = 1
+ elif direction == 'RECEIVE':
+ self.canConnect = 0
+ self.canAccept = 1
+ elif direction == 'CREATE':
+ self.canConnect = 1
+ self.canAccept = 0
# but we do need to mark it as being in use
localsessions[destb64] = globalsessions[destb64] = None
@@ -1736,8 +1802,9 @@ version = version
# make a local socket
sock = self.samSock = I2PSocket(dest)
- # and we also need to fire up a socket listener
- thread.start_new_thread(self.threadSocketListener, (sock, dest))
+ # and we also need to fire up a socket listener, if not CREATE-only
+ if self.canAccept:
+ thread.start_new_thread(self.threadSocketListener, (sock, dest))
# finally, we can reply with the good news
self.samSend("SESSION", "STATUS",
@@ -1784,21 +1851,40 @@ version = version
if subtopic == 'CONNECT':
# who are we connecting to again?
- remdest = I2PDestionation(b64=args['DESTINATION'])
- id = args['ID']
+ remdest = I2PDestination(base64=args['DESTINATION'])
+ id = int(args['ID'])
try:
- self.samSock.connect(remdest)
+ log(4, "Trying to connect to remote peer %s..." % args['DESTINATION'])
+ sock = self.samSock.connect(remdest)
+ log(4, "Connected to remote peer %s..." % args['DESTINATION'])
+ self.localstreams[id] = sock
self.samSend("STREAM", "STATUS",
RESULT='OK',
ID=id,
)
+ thread.start_new_thread(self.threadSocketReceiver, (sock, id))
+
except:
+ log(4, "Failed to connect to remote peer %s..." % args['DESTINATION'])
self.samSend("STREAM", "STATUS",
RESULT='I2P_ERROR',
- MESSAGE='exception on connect',
+ MESSAGE='exception_on_connect',
+ ID=id,
)
+ elif subtopic == 'SEND':
+ # send to someone
+ id = int(args['ID'])
+ try:
+ sock = self.localstreams[id]
+ sock.send(args['DATA'])
+ except:
+ logException(4, "send failed")
+
+
+
+
def on_DATAGRAM(self, topic, subtopic, args):
r"""
@@ -1953,15 +2039,24 @@ version = version
"""
destb64 = dest.toBase64()
- log(4, "Listening for connections to %s..." % destb64[:40])
+ log(4, "Listening for connections to %s..." % destb64)
+
+ sock.bind()
+ sock.listen()
+
while 1:
+ log(4, "Awaiting next connection to %s..." % destb64)
newsock = sock.accept()
-
+ log(4, "Got connection to %s..." % destb64)
+
# need an id, negative
id = - self.server.samAllocId()
# register it in local and global streams
self.localstreams[id] = self.globalstreams[id] = newsock
+
+ # fire up the receiver thread
+ thread.start_new_thread(self.threadSocketReceiver, (newsock, id))
# who is connected to us?
remdest = newsock.remdest
@@ -1971,6 +2066,7 @@ version = version
self.samSend("STREAM", "CONNECTED",
DESTINATION=remdest_b64,
ID=id)
+
def samParse(self, flds):
"""
@@ -1988,7 +2084,7 @@ version = version
try:
name, val = arg.split("=", 1)
except:
- logException(3, "failed to process %s" % repr(arg))
+ logException(3, "failed to process %s in %s" % (repr(arg), repr(flds)))
raise
dargs[name] = val
@@ -2224,18 +2320,20 @@ class I2PSocketError(Exception):
print " (run with no commands to launch SAM server)"
print " samserver - runs as a SAM server"
print " test - run a suite of self-tests"
+ print " testsocket - run only the socket test"
+ print " testbidirsocket - run socket test in bidirectional mode"
print
sys.exit(0)
-
-
-
def main():
argv = sys.argv
argc = len(argv)
+ # -------------------------------------------------
+ # do the getopt command line parsing
+
try:
opts, args = getopt.getopt(sys.argv[1:],
"h?vV:H:P:",
@@ -2247,14 +2345,9 @@ class I2PSocketError(Exception):
traceback.print_exc(file=sys.stdout)
usage("You entered an invalid option")
- cmd = 'samserver'
+ #print "args=%s" % args
- # we prolly should pass all these parms in constructor call, but
- # what the heck!
- #global verbosity, i2psamhost, i2psamport, i2cpHost, i2cpPort
-
serveropts = {}
-
for opt, val in opts:
if opt in ['-h', '-?', '--help']:
usage(1)
@@ -2274,6 +2367,9 @@ class I2PSocketError(Exception):
else:
usage(0)
+ # --------------------------------------------------
+ # now run in required mode, default is 'samserver'
+
if len(args) == 0:
cmd = 'samserver'
else:
@@ -2287,14 +2383,25 @@ class I2PSocketError(Exception):
elif cmd == 'test':
- print "RUNNING I2P Jython TESTS"
+ print "RUNNING full I2PSAM Jython TEST SUITE"
testsigs()
testdests()
testsession()
testsocket()
+ elif cmd == 'testsocket':
+
+ print "RUNNING SOCKET TEST"
+ testsocket(0)
+
+ elif cmd == 'testbidirsocket':
+ print "RUNNING BIDIRECTIONAL SOCKET TEST"
+ testsocket(1)
+
else:
+ # spit at unrecognised option
usage(0)
+
def testdests():
@@ -2441,7 +2548,7 @@ class I2PSocketError(Exception):
print "session tests passed!"
-def testsocket():
+def testsocket(bidirectional=0):
global d1, d2, s1, s2
@@ -2501,6 +2608,19 @@ class I2PSocketError(Exception):
print "launching server thread..."
thread.start_new_thread(servThread, (sServer,))
+ if bidirectional:
+ # dummy thread which accepts connections TO client socket
+ def threadDummy(s):
+ print "dummy: listening"
+ s.listen()
+ print "dummy: accepting"
+
+ sock = s.accept()
+ print "dummy: got connection"
+
+ print "test - launching dummy client accept thread"
+ thread.start_new_thread(threadDummy, (sClient,))
+
print "client: trying to connect"
sClient.connect(dServer)
@@ -2517,6 +2637,7 @@ class I2PSocketError(Exception):
print "I2PSocket test apparently succeeded"
+
if __name__ == '__main__':
main()
@@ -2539,6 +2660,8 @@ Run this module without arguments to see a demo in action
import sys, os, socket, thread, threading, Queue, traceback, StringIO, time
+from pdb import set_trace
+
# -----------------------------------------
# server access settings
@@ -2573,6 +2696,11 @@ class I2PCommandFail(Exception):
A failure in an I2CP command
"""
pass
+
+class I2PStreamClosed(Exception):
+ """
+ Stream is not open
+ """
class I2PSamClient:
"""
@@ -2628,10 +2756,20 @@ i2cpPort = None
self.qSession = Queue.Queue()
self.qDatagrams = Queue.Queue()
self.qRawMessages = Queue.Queue()
+
self.namingReplies = {}
self.namingCache = {}
+
+ self.streams = {} # currently open streams, keyed by id
+ self.streamConnectReplies = {} # holds queues awaiting connect resp, keyed by id
+ self.qNewStreams = Queue.Queue() # incoming connections
+
+ self.samNextIdLock = threading.Lock()
+ self.samNextId = 1
+
self.isRunning = 1
+
log(4, "trying connection to SAM server...")
try:
self.sock.connect((self.host, self.port))
@@ -2825,6 +2963,8 @@ i2cpPort = None
- dest - base64 private destination
Keywords:
+ - direction - only used for STREAM sessions, can be RECEIVE,
+ CREATE or BOTH (default BOTH)
- i2cphost - hostname for the SAM bridge to contact i2p router on
- i2cpport - port for the SAM bridge to contact i2p router on
@@ -2835,6 +2975,20 @@ i2cpPort = None
kw1 = dict(kw)
kw1['STYLE'] = self.samStyle = style
kw1['DESTINATION'] = dest
+ if style == 'STREAM':
+ direction = kw.get('direction', 'BOTH')
+ kw1['DIRECTION'] = direction
+ if direction == 'BOTH':
+ self.canAccept = 1
+ self.canConnect = 1
+ elif direction == 'RECEIVE':
+ self.canAccept = 1
+ self.canConnect = 0
+ elif direction == 'CREATE':
+ self.canAccept = 0
+ self.canConnect = 1
+ else:
+ raise I2PCommandFail("direction keyword must be one of RECEIVE, CREATE or BOTH")
# stick in i2cp host/port if specified
if kw.has_key('i2cphost'):
@@ -2974,7 +3128,7 @@ i2cpPort = None
try:
name, val = arg.split("=", 1)
except:
- logException(3, "failed to process %s" % repr(arg))
+ logException(3, "failed to process %s in %s" % (repr(arg), repr(flds)))
raise
dargs[name] = val
@@ -2990,6 +3144,8 @@ i2cpPort = None
+
+
def samSend(self, topic, subtopic, data=None, **kw):
"""
@@ -3083,7 +3239,90 @@ i2cpPort = None
def on_STREAM(self, topic, subtopic, args):
"""
Handles STREAM messages from server
+
+ STREAM STATUS
+ RESULT=$result
+ ID=$id
+ [MESSAGE=...]
+
+ STREAM CONNECTED
+ DESTINATION=$base64key
+ ID=$id
+
+ STREAM RECEIVED
+ ID=$id
+ SIZE=$numBytes\n[$numBytes of data]
+
+ STREAM CLOSED
+ RESULT=$result
+ ID=$id
+ [MESSAGE=...]
"""
+ log(4, "got %s %s %s" % (topic, subtopic, args))
+
+ # which stream?
+ id = int(args['ID'])
+
+ # result of prior connection attempt
+ if subtopic == 'STATUS':
+ # stick it on the queue that the caller is waiting on and let the
+ # caller interpret the result
+ self.streamConnectReplies[id].put(args)
+ return
+
+ # notice of incoming connection
+ if subtopic == 'CONNECTED':
+
+ # grab details
+ dest = args['DESTINATION']
+
+ # wrap it in a stream obj
+ conn = I2PSAMStream(self, id, dest)
+ self.streams[id] = conn
+
+ # and put it there for anyone calling samStreamAccept()
+ self.qNewStreams.put(conn)
+
+ # done
+ return
+
+ # notice of received data
+ elif subtopic == 'RECEIVED':
+ # grab details
+ data = args['DATA']
+
+ # lookup the connection
+ conn = self.streams.get(id, None)
+ if not conn:
+ # conn not known, just ditch
+ log(2, "got data, but don't recall any conn with id %s" % id)
+ return
+
+ # and post the received data
+ conn._notifyIncomingData(data)
+
+ log(4, "wrote data to conn's inbound queue")
+
+ # done
+ return
+
+ elif subtopic == 'CLOSED':
+ # lookup the connection
+ conn = self.streams.get(id, None)
+ if not conn:
+ # conn not known, just ditch
+ return
+
+ # mark conn as closed and forget it
+ conn._notifyIncomingData("") # special signal to close
+ conn.isOpen = 0
+ del self.streams[id]
+
+ # done
+ return
+
+
+
def on_DATAGRAM(self, topic, subtopic, args):
"""
@@ -3187,7 +3426,8 @@ i2cpPort = None
self.dest = dest
def send(self, peerdest, msg):
-
+ """
+ """
return self.client.send(self.dest, peerdest, msg)
def receive(self):
@@ -3360,35 +3600,59 @@ i2cpPort = None
print "Starting SAM STREAM demo..."
print
- print "Instantiating 2 more client connections..."
- c5 = I2PSamClient()
+ print "Instantiating client c6..."
c6 = I2PSamClient()
- print "Creating more dests via SAM"
- pub5, priv5 = c5.samDestGenerate()
+ print "Creating dest for c6"
pub6, priv6 = c6.samDestGenerate()
- print "Creating SAM STREAM SESSION on connection c3..."
- res = c5.samSessionCreate("STREAM", priv5)
+ print "Creating SAM STREAM SESSION on connection c6..."
+ res = c6.samSessionCreate("STREAM", priv6, direction="RECEIVE")
+ if res != 'OK':
+ print "Failed to create STREAM session on connection c6: %s" % repr(res)
+ return
+ print "STREAM Session on connection c6 created successfully"
+
+ print "Launching acceptor thread..."
+ thread.start_new_thread(demoSTREAM_thread, (c6,))
+
+ #print "sleep a while and give the server a chance..."
+ #time.sleep(10)
+
+ print "----------------------------------------"
+
+ print "Instantiating client c5..."
+ c5 = I2PSamClient()
+
+ print "Creating dest for c5"
+ pub5, priv5 = c5.samDestGenerate()
+
+ print "Creating SAM STREAM SESSION on connection c5..."
+ res = c5.samSessionCreate("STREAM", priv5, direction="CREATE")
if res != 'OK':
print "Failed to create STREAM session on connection c5: %s" % repr(res)
return
print "STREAM Session on connection c5 created successfully"
- print "Creating SAM STREAM SESSION on connection c6..."
- res = c6.samSessionCreate("STREAM", priv6)
- if res != 'OK':
- print "Failed to create STREAM session on connection c4: %s" % repr(res)
+ print "----------------------------------------"
+
+ print "Making connection from c5 to c6..."
+
+ #set_trace()
+
+ try:
+ conn_c5 = c5.samStreamConnect(pub6)
+ except:
+ print "Stream Connection failed"
return
- print "STREAM Session on connection c4 created successfully"
+ print "Stream connect succeeded"
+
+ print "Receiving from c5..."
+ buf = conn_c5.readline()
+ print "Got %s" % repr(buf)
- msg = "Hi there, this is a datagram!"
- print "sending from c5 to c6: %s" % repr(msg)
- c5.samStreamSend(pub6, msg)
-
- print "now try to receive from c6 (will block)..."
- msg1 = c6.samStreamReceive()
- print "Connection c6 got %s from %s..." % (repr(msg1), repr(remdest))
+ #print "Try to accept connection on c6..."
+ #conn_c6 = c6.sam
print
print "--------------------------------------"
@@ -3396,6 +3660,10 @@ i2cpPort = None
print "--------------------------------------"
print
+
+
+
+
def demo():
"""
@@ -3410,10 +3678,10 @@ i2cpPort = None
print "-----------------------------------------"
print
- demoNAMING()
- demoRAW()
- demoDATAGRAM()
- #demoSTREAM()
+ #demoNAMING()
+ #demoRAW()
+ #demoDATAGRAM()
+ demoSTREAM()
print
print "-----------------------------------------"
@@ -3426,5 +3694,264 @@ i2cpPort = None
demo()
+def samStreamConnect(self, dest):
+ """
+ Makes a STREAM connection to a remote dest
+
+ STREAM STATUS
+ RESULT=$result
+ ID=$id
+ [MESSAGE=...]
+ """
+ # need an ID
+ id = self.samAllocId()
+
+ # create queue for connect reply
+ q = self.streamConnectReplies[id] = Queue.Queue()
+
+ # send req
+ self.samSend("STREAM", "CONNECT",
+ ID=id,
+ DESTINATION=dest,
+ )
+
+ # await reply - comes back as a dict
+ resp = q.get()
+
+ # ditch queue
+ del self.streamConnectReplies[id]
+ del q
+
+ # check out response
+ result = resp['RESULT']
+ if result == 'OK':
+ conn = I2PSAMStream(self, id, dest)
+ self.streams[id] = conn
+ return conn
+ else:
+ msg = resp.get('MESSAGE', '')
+ raise I2PCommandFail(result, msg, "STREAM CONNECT")
+
+
+def samAllocId(self):
+ """
+ Allocates a new unique id as required by SAM protocol
+ """
+ self.samNextIdLock.acquire()
+ id = self.samNextId
+ self.samNextId += 1
+ self.samNextIdLock.release()
+ return id
+
+class I2PSAMStream:
+ """
+ Wrapper for a stream object
+ """
+ @others
+
+def __init__(self, client, id, dest):
+ """
+ """
+ self.client = client
+ self.id = id
+ self.dest = dest
+
+ self.qIncomingData = Queue.Queue()
+
+ self.inbuf = ''
+ self.isOpen = 1
+
+def _notifyIncomingData(self, data):
+ """
+ Called by client receiver to notify incoming data
+ """
+ log(4, "got %s" % repr(data))
+ self.qIncomingData.put(data)
+
+def samStreamSend(self, conn, data):
+ """
+ DO NOT CALL THIS DIRECTLY
+
+ Invoked by an I2PSAMStream object to transfer data
+ Use the object's .send() method instead.
+
+ conn is the I2PSAMStream
+
+ STREAM SEND
+ ID=$id
+ SIZE=$numBytes\n[$numBytes of data]
+ """
+ # dispatch
+ self.samSend("STREAM", "SEND", data, ID=conn.id)
+
+ # useless, but mimics socket paradigm
+ return len(data)
+
+
+def send(self, data):
+ """
+ Sends data to a stream connection
+ """
+ # barf if stream not open
+ if not self.isOpen:
+ raise I2PStreamClosed
+
+ # can send
+ return self.client.samStreamSend(self, data)
+
+def samStreamClose(self, conn):
+ """
+ DO NOT CALL DIRECTLY
+
+ Invoked by I2PSAMStream to close stream
+ Use the object's .send() method instead.
+
+ STREAM CLOSE
+ ID=$id
+ """
+ self.samSend("STREAM", "CLOSE", ID=conn.id)
+ del self.streams[conn.id]
+
+
+def recv(self, size):
+ """
+ Retrieves n bytes from peer
+ """
+ chunks = []
+
+ while self.isOpen and size > 0:
+ # try internal buffer first
+ if self.inbuf:
+ chunk = self.inbuf[:size]
+ chunklen = len(chunk)
+ self.inbuf = self.inbuf[chunklen:]
+ chunks.append(chunk)
+ size -= chunklen
+ else:
+ # replenish input buffer
+ log(4, "I2PSAMStream.recv: replenishing input buffer")
+ buf = self.qIncomingData.get()
+ if buf == '':
+ # connection closed by peer
+ self.isOpen = 0
+ break
+ else:
+ # got more data
+ log(4, "I2PSAMStream: queue returned %s" % repr(buf))
+ self.inbuf += buf
+
+ # return whatever we've got, hopefully all
+ return "".join(chunks)
+
+
+
+def close(self):
+ """
+ close this stream connection
+ """
+ log(4, "closing stream")
+ self.client.samStreamClose(self)
+ log(4, "stream closed")
+ self.isOpen = 0
+
+ # and just to make sure...
+ self.qIncomingData.put("") # busts out of recv() loops
+
+
+def __del__(self):
+ """
+ Dropping last ref to this object closes stream
+ """
+ self.close()
+
+def demoSTREAM_thread(sess):
+
+ while 1:
+ sock = sess.samStreamAccept()
+ log(4, "got incoming connection")
+
+ print "**ACCEPTOR SLEEPING 10 secs BEFORE SENDING"
+
+ time.sleep(10)
+
+ sock.send("Hi there, what do you want?\n")
+
+ print "**ACCEPTOR SLEEPING 5 MINS BEFORE CLOSING"
+ time.sleep(300)
+ print "**ACCEPTOR CLOSING STREAM"
+
+ sock.close()
+
+
+def samStreamAccept(self):
+ """
+ Waits for an incoming connection, returning a wrapped conn obj
+ """
+ log(4, "waiting for connection")
+ conn = self.qNewStreams.get()
+ log(4, "got connection")
+ return conn
+
+def threadSocketReceiver(self, sock, id):
+ """
+ One of these gets launched each time a new stream connection
+ is created. Due to the lack of callback mechanism within the
+ ministreaming API, we have to actively poll for and send back
+ received data
+ """
+ while 1:
+ #avail = sock.available()
+ #if avail <= 0:
+ # print "threadSocketReceiver: waiting for data on %s (%s avail)..." % (id, avail)
+ # time.sleep(5)
+ # continue
+ #log(4, "reading a byte")
+
+ try:
+ buf = sock.recv(1)
+ except:
+ logException(4, "Exception reading first byte")
+
+ if buf == '':
+ log(4, "stream closed")
+
+ # notify a close
+ self.samSend("STREAM", "CLOSED",
+ ID=id)
+ return
+
+ # grab more if there's any available
+ navail = sock.available()
+ if navail > 0:
+ #log(4, "%d more bytes available, reading..." % navail)
+ rest = sock.recv(navail)
+ buf += rest
+
+ # send if off
+ log(4, "got from peer: %s" % repr(buf))
+
+ self.samSend("STREAM", "RECEIVED", buf,
+ ID=id,
+ )
+
+
+
+
+
+def readline(self):
+ """
+ Read a line of text from stream, return the line without trailing newline
+
+ This method really shouldn't exist in a class that's trying to look a bit
+ like a socket object, but what the hell!
+ """
+ chars = []
+ while 1:
+ char = self.recv(1)
+ if char in ['', '\n']:
+ break
+ chars.append(char)
+ return "".join(chars)
+
diff --git a/apps/sam/jython/build.xml b/apps/sam/jython/build.xml
index fed7912659..4a7656152f 100644
--- a/apps/sam/jython/build.xml
+++ b/apps/sam/jython/build.xml
@@ -11,17 +11,25 @@
-
-
+
+
+
+
+
+
+
+
+
+
+
-
-
-
+
+
-
+
diff --git a/apps/sam/jython/src/i2psam.py b/apps/sam/jython/src/i2psam.py
index b5ad210a93..fef0b482ee 100644
--- a/apps/sam/jython/src/i2psam.py
+++ b/apps/sam/jython/src/i2psam.py
@@ -79,7 +79,7 @@ i2cpPort = 7654
# logging settings
# 1=v.quiet, 2=normal, 3=verbose, 4=debug, 5=painful
-verbosity = 5
+verbosity = 2
# change to a filename to log there instead
logfile = sys.stdout
@@ -851,23 +851,28 @@ class I2PSocket:
self.dest = dest
if kw.has_key('sock') \
- and kw.has_key('dest') \
and kw.has_key('remdest') \
and kw.has_key('instream') \
and kw.has_key('outstream'):
+
# wrapping an accept()'ed connection
+ log(4, "accept()'ed a connection, wrapping...")
+
self.sock = kw['sock']
- self.dest = kw['dest']
+ self.dest = dest
self.remdest = kw['remdest']
self.instream = kw['instream']
self.outstream = kw['outstream']
else:
+ log(4, "creating new I2PSocket %s" % dest)
+
# process keywords
self.host = kw.get('host', self.host)
self.port = int(kw.get('port', self.port))
# we need a factory, don't we?
self.sockmgrFact = i2p.client.streaming.I2PSocketManagerFactory()
+
#@-node:__init__
#@+node:bind
def bind(self, dest=None):
@@ -882,7 +887,8 @@ class I2PSocket:
self.dest = dest
elif not self.dest:
# create new dest, client should interrogate it at some time
- self.dest = Destination()
+ log(4, "bind: socket has no dest, creating one")
+ self.dest = I2PDestination()
#@-node:bind
#@+node:listen
def listen(self, *args, **kw):
@@ -894,6 +900,8 @@ class I2PSocket:
raise I2PSocketError(".sockmgr already present - have you already called listen?")
if not self.dest:
raise I2PSocketError("socket is not bound to a destination")
+
+ log(4, "listening on socket")
# create the socket manager
self._createSockmgr()
@@ -936,11 +944,12 @@ class I2PSocket:
to different dests.
"""
# sanity check
- #if self.sockmgr:
- # raise I2PSocketError(".sockmgr already present - have you already called listen/connect?")
+ if self.sockmgr:
+ raise I2PSocketError(".sockmgr already present - have you already called listen/connect?")
# create whole new dest if none was provided to constructor
if self.dest is None:
+ log(4, "connect: creating whole new dest")
self.dest = I2PDestination()
# create the socket manager
@@ -951,6 +960,7 @@ class I2PSocket:
opts = net.i2p.client.streaming.I2PSocketOptions()
try:
+ log(4, "trying to connect to %s" % remdest.toBase64())
sock = self.sock = self.sockmgr.connect(remdest._item, opts)
self.remdest = remdest
except:
@@ -962,8 +972,8 @@ class I2PSocket:
sockobj = I2PSocket(dest=self.dest,
remdest=remdest,
sock=sock,
- instream=instream,
- outstream=outstream)
+ instream=self.instream,
+ outstream=self.outstream)
self._connected = 1
return sockobj
#@-node:connect
@@ -1004,23 +1014,27 @@ class I2PSocket:
raise I2PSocketError("Socket is not connected")
# and write it out
- #print "send: writing '%s' to outstream..." % repr(buf)
+ log(4, "send: writing '%s' to outstream..." % repr(buf))
outstream = self.outstream
for c in buf:
outstream.write(ord(c))
# flush just in case
- #print "send: flushing..."
+ log(4, "send: flushing...")
self.outstream.flush()
- #print "send: done"
+ log(4, "send: done")
+
#@-node:send
#@+node:available
def available(self):
"""
Returns the number of bytes available for recv()
"""
- return self.sock.available()
+ #print "available: sock is %s" % repr(self.sock)
+
+ return self.instream.available()
+
#@-node:available
#@+node:close
@@ -1049,6 +1063,9 @@ class I2PSocket:
#@+node:_createSockmgr
def _createSockmgr(self):
+ if getattr(self, 'sockmgr', None):
+ return
+
#options = {jI2PClient.PROP_TCP_HOST: self.host,
# jI2PClient.PROP_TCP_PORT: self.port}
options = {}
@@ -1606,6 +1623,26 @@ class I2PSamClientHandler(StreamRequestHandler):
else: # STREAM
# no need to create session object, because we're using streaming api
+ log(4, "Creating STREAM session")
+
+ # what kind of stream?
+ direction = args.get('DIRECTION', 'BOTH')
+ if direction not in ['BOTH', 'RECEIVE', 'CREATE']:
+ self.samSend("SESSION", "STATUS",
+ RESULT="I2P_ERROR",
+ MESSAGE="Illegal_direction_keyword_%s" % direction.replace(" ","_"),
+ )
+ return
+
+ if direction == 'BOTH':
+ self.canConnect = 1
+ self.canAccept = 1
+ elif direction == 'RECEIVE':
+ self.canConnect = 0
+ self.canAccept = 1
+ elif direction == 'CREATE':
+ self.canConnect = 1
+ self.canAccept = 0
# but we do need to mark it as being in use
localsessions[destb64] = globalsessions[destb64] = None
@@ -1613,8 +1650,9 @@ class I2PSamClientHandler(StreamRequestHandler):
# make a local socket
sock = self.samSock = I2PSocket(dest)
- # and we also need to fire up a socket listener
- thread.start_new_thread(self.threadSocketListener, (sock, dest))
+ # and we also need to fire up a socket listener, if not CREATE-only
+ if self.canAccept:
+ thread.start_new_thread(self.threadSocketListener, (sock, dest))
# finally, we can reply with the good news
self.samSend("SESSION", "STATUS",
@@ -1663,22 +1701,39 @@ class I2PSamClientHandler(StreamRequestHandler):
if subtopic == 'CONNECT':
# who are we connecting to again?
- remdest = I2PDestionation(b64=args['DESTINATION'])
- id = args['ID']
+ remdest = I2PDestination(base64=args['DESTINATION'])
+ id = int(args['ID'])
try:
+ log(4, "Trying to connect to remote peer %s..." % args['DESTINATION'])
sock = self.samSock.connect(remdest)
+ log(4, "Connected to remote peer %s..." % args['DESTINATION'])
self.localstreams[id] = sock
self.samSend("STREAM", "STATUS",
RESULT='OK',
ID=id,
)
+ thread.start_new_thread(self.threadSocketReceiver, (sock, id))
+
except:
+ log(4, "Failed to connect to remote peer %s..." % args['DESTINATION'])
self.samSend("STREAM", "STATUS",
RESULT='I2P_ERROR',
MESSAGE='exception_on_connect',
+ ID=id,
)
+ elif subtopic == 'SEND':
+ # send to someone
+ id = int(args['ID'])
+ try:
+ sock = self.localstreams[id]
+ sock.send(args['DATA'])
+ except:
+ logException(4, "send failed")
+
+
+
#@-node:on_STREAM
#@+node:on_DATAGRAM
@@ -1840,18 +1895,24 @@ class I2PSamClientHandler(StreamRequestHandler):
"""
destb64 = dest.toBase64()
- log(4, "Listening for connections to %s..." % destb64[:40])
+ log(4, "Listening for connections to %s..." % destb64)
+ sock.bind()
sock.listen()
while 1:
+ log(4, "Awaiting next connection to %s..." % destb64)
newsock = sock.accept()
-
+ log(4, "Got connection to %s..." % destb64)
+
# need an id, negative
id = - self.server.samAllocId()
# register it in local and global streams
self.localstreams[id] = self.globalstreams[id] = newsock
+
+ # fire up the receiver thread
+ thread.start_new_thread(self.threadSocketReceiver, (newsock, id))
# who is connected to us?
remdest = newsock.remdest
@@ -1861,7 +1922,55 @@ class I2PSamClientHandler(StreamRequestHandler):
self.samSend("STREAM", "CONNECTED",
DESTINATION=remdest_b64,
ID=id)
+
#@-node:threadSocketListener
+ #@+node:threadSocketReceiver
+ def threadSocketReceiver(self, sock, id):
+ """
+ One of these gets launched each time a new stream connection
+ is created. Due to the lack of callback mechanism within the
+ ministreaming API, we have to actively poll for and send back
+ received data
+ """
+ while 1:
+ #avail = sock.available()
+ #if avail <= 0:
+ # print "threadSocketReceiver: waiting for data on %s (%s avail)..." % (id, avail)
+ # time.sleep(5)
+ # continue
+ #log(4, "reading a byte")
+
+ try:
+ buf = sock.recv(1)
+ except:
+ logException(4, "Exception reading first byte")
+
+ if buf == '':
+ log(4, "stream closed")
+
+ # notify a close
+ self.samSend("STREAM", "CLOSED",
+ ID=id)
+ return
+
+ # grab more if there's any available
+ navail = sock.available()
+ if navail > 0:
+ #log(4, "%d more bytes available, reading..." % navail)
+ rest = sock.recv(navail)
+ buf += rest
+
+ # send if off
+ log(4, "got from peer: %s" % repr(buf))
+
+ self.samSend("STREAM", "RECEIVED", buf,
+ ID=id,
+ )
+
+
+
+
+ #@-node:threadSocketReceiver
#@+node:samParse
def samParse(self, flds):
"""
@@ -1879,7 +1988,7 @@ class I2PSamClientHandler(StreamRequestHandler):
try:
name, val = arg.split("=", 1)
except:
- logException(3, "failed to process %s" % repr(arg))
+ logException(3, "failed to process %s in %s" % (repr(arg), repr(flds)))
raise
dargs[name] = val
@@ -2108,106 +2217,6 @@ def logException(level, msg=''):
traceback.print_exc(file=s)
log(level, "%s\n%s" % (s.getvalue(), msg), 1)
#@-node:logException
-#@+node:usage
-def usage(detailed=0):
-
- print "Usage: %s []" % sys.argv[0]
- if not detailed:
- print "Run with '-h' to get detailed help"
- sys.exit(0)
-
- print "I2PSAM is a bridge that allows I2P client programs to access the"
- print "I2P network by talking over a plaintext socket connection."
- print "References:"
- print " - http://www.freenet.org.nz/i2p - source, doco, downloadables"
- print " - http://drupal.i2p.net/node/view/144 - I2P SAM specification"
- print
- print "Options:"
- print " -h, -?, --help - display this help"
- print " -v, --version - print program version"
- print " -V, --verbosity=n - set verbosity to n, default 2, 1==quiet, 4==noisy"
- print " -H, --listenhost=host - specify host to listen on for client connections"
- print " -P, --listenport=port - port to listen on for client connections"
- print " --i2cphost=host - hostname of I2P router's I2CP interface"
- print " --i2cpport=port - port of I2P router's I2CP interface"
- print
- print "Commands:"
- print " (run with no commands to launch SAM server)"
- print " samserver - runs as a SAM server"
- print " test - run a suite of self-tests"
- print
-
- sys.exit(0)
-
-
-
-#@-node:usage
-#@+node:main
-def main():
-
- argv = sys.argv
- argc = len(argv)
-
- try:
- opts, args = getopt.getopt(sys.argv[1:],
- "h?vV:H:P:",
- ['help', 'version', 'verbosity=',
- 'listenhost=', 'listenport=',
- 'i2cphost=', 'i2cpport=',
- ])
- except:
- traceback.print_exc(file=sys.stdout)
- usage("You entered an invalid option")
-
- cmd = 'samserver'
-
- # we prolly should pass all these parms in constructor call, but
- # what the heck!
- #global verbosity, i2psamhost, i2psamport, i2cpHost, i2cpPort
-
- serveropts = {}
-
- for opt, val in opts:
- if opt in ['-h', '-?', '--help']:
- usage(1)
- elif opt in ['-v', '--version']:
- print "I2P SAM version %s" % version
- sys.exit(0)
- elif opt in ['-V', '--verbosity']:
- serveropts['verbosity'] = int(val)
- elif opt in ['-H', '--listenhost']:
- serveropts['host'] = val
- elif opt in ['-P', '--listenport']:
- serveropts['port'] = int(val)
- elif opt in ['--i2cphost']:
- serveropts['i2cphost'] = val
- elif opt in ['--i2cpport']:
- serveropts['i2cpport'] = int(val)
- else:
- usage(0)
-
- if len(args) == 0:
- cmd = 'samserver'
- else:
- cmd = args[0]
-
- if cmd == 'samserver':
-
- log(2, "Running I2P SAM Server...")
- server = I2PSamServer(**serveropts)
- server.run()
-
- elif cmd == 'test':
-
- print "RUNNING I2P Jython TESTS"
- testsigs()
- testdests()
- testsession()
- testsocket()
-
- else:
- usage(0)
-#@-node:main
#@+node:testdests
def testdests():
"""
@@ -2356,7 +2365,7 @@ def testsession():
print "session tests passed!"
#@-node:testsession
#@+node:testsocket
-def testsocket():
+def testsocket(bidirectional=0):
global d1, d2, s1, s2
@@ -2416,6 +2425,19 @@ def testsocket():
print "launching server thread..."
thread.start_new_thread(servThread, (sServer,))
+ if bidirectional:
+ # dummy thread which accepts connections TO client socket
+ def threadDummy(s):
+ print "dummy: listening"
+ s.listen()
+ print "dummy: accepting"
+
+ sock = s.accept()
+ print "dummy: got connection"
+
+ print "test - launching dummy client accept thread"
+ thread.start_new_thread(threadDummy, (sClient,))
+
print "client: trying to connect"
sClient.connect(dServer)
@@ -2432,7 +2454,119 @@ def testsocket():
print "I2PSocket test apparently succeeded"
+
#@-node:testsocket
+#@+node:usage
+def usage(detailed=0):
+
+ print "Usage: %s []" % sys.argv[0]
+ if not detailed:
+ print "Run with '-h' to get detailed help"
+ sys.exit(0)
+
+ print "I2PSAM is a bridge that allows I2P client programs to access the"
+ print "I2P network by talking over a plaintext socket connection."
+ print "References:"
+ print " - http://www.freenet.org.nz/i2p - source, doco, downloadables"
+ print " - http://drupal.i2p.net/node/view/144 - I2P SAM specification"
+ print
+ print "Options:"
+ print " -h, -?, --help - display this help"
+ print " -v, --version - print program version"
+ print " -V, --verbosity=n - set verbosity to n, default 2, 1==quiet, 4==noisy"
+ print " -H, --listenhost=host - specify host to listen on for client connections"
+ print " -P, --listenport=port - port to listen on for client connections"
+ print " --i2cphost=host - hostname of I2P router's I2CP interface"
+ print " --i2cpport=port - port of I2P router's I2CP interface"
+ print
+ print "Commands:"
+ print " (run with no commands to launch SAM server)"
+ print " samserver - runs as a SAM server"
+ print " test - run a suite of self-tests"
+ print " testsocket - run only the socket test"
+ print " testbidirsocket - run socket test in bidirectional mode"
+ print
+
+ sys.exit(0)
+#@-node:usage
+#@+node:main
+def main():
+
+ argv = sys.argv
+ argc = len(argv)
+
+ # -------------------------------------------------
+ # do the getopt command line parsing
+
+ try:
+ opts, args = getopt.getopt(sys.argv[1:],
+ "h?vV:H:P:",
+ ['help', 'version', 'verbosity=',
+ 'listenhost=', 'listenport=',
+ 'i2cphost=', 'i2cpport=',
+ ])
+ except:
+ traceback.print_exc(file=sys.stdout)
+ usage("You entered an invalid option")
+
+ #print "args=%s" % args
+
+ serveropts = {}
+ for opt, val in opts:
+ if opt in ['-h', '-?', '--help']:
+ usage(1)
+ elif opt in ['-v', '--version']:
+ print "I2P SAM version %s" % version
+ sys.exit(0)
+ elif opt in ['-V', '--verbosity']:
+ serveropts['verbosity'] = int(val)
+ elif opt in ['-H', '--listenhost']:
+ serveropts['host'] = val
+ elif opt in ['-P', '--listenport']:
+ serveropts['port'] = int(val)
+ elif opt in ['--i2cphost']:
+ serveropts['i2cphost'] = val
+ elif opt in ['--i2cpport']:
+ serveropts['i2cpport'] = int(val)
+ else:
+ usage(0)
+
+ # --------------------------------------------------
+ # now run in required mode, default is 'samserver'
+
+ if len(args) == 0:
+ cmd = 'samserver'
+ else:
+ cmd = args[0]
+
+ if cmd == 'samserver':
+
+ log(2, "Running I2P SAM Server...")
+ server = I2PSamServer(**serveropts)
+ server.run()
+
+ elif cmd == 'test':
+
+ print "RUNNING full I2PSAM Jython TEST SUITE"
+ testsigs()
+ testdests()
+ testsession()
+ testsocket()
+
+ elif cmd == 'testsocket':
+
+ print "RUNNING SOCKET TEST"
+ testsocket(0)
+
+ elif cmd == 'testbidirsocket':
+ print "RUNNING BIDIRECTIONAL SOCKET TEST"
+ testsocket(1)
+
+ else:
+ # spit at unrecognised option
+ usage(0)
+
+#@-node:main
#@+node:MAINLINE
if __name__ == '__main__':
main()
diff --git a/apps/sam/python/src/i2psamclient.py b/apps/sam/python/src/i2psamclient.py
index 03a2c8e6cd..2070bfd9ab 100644
--- a/apps/sam/python/src/i2psamclient.py
+++ b/apps/sam/python/src/i2psamclient.py
@@ -17,6 +17,8 @@ Run this module without arguments to see a demo in action
#@+node:imports
import sys, os, socket, thread, threading, Queue, traceback, StringIO, time
+from pdb import set_trace
+
#@-node:imports
#@+node:globals
# -----------------------------------------
@@ -53,6 +55,11 @@ class I2PCommandFail(Exception):
A failure in an I2CP command
"""
pass
+
+class I2PStreamClosed(Exception):
+ """
+ Stream is not open
+ """
#@-node:exceptions
#@+node:class I2PSamClient
class I2PSamClient:
@@ -110,10 +117,20 @@ class I2PSamClient:
self.qSession = Queue.Queue()
self.qDatagrams = Queue.Queue()
self.qRawMessages = Queue.Queue()
+
self.namingReplies = {}
self.namingCache = {}
+
+ self.streams = {} # currently open streams, keyed by id
+ self.streamConnectReplies = {} # holds queues awaiting connect resp, keyed by id
+ self.qNewStreams = Queue.Queue() # incoming connections
+
+ self.samNextIdLock = threading.Lock()
+ self.samNextId = 1
+
self.isRunning = 1
+
log(4, "trying connection to SAM server...")
try:
self.sock.connect((self.host, self.port))
@@ -312,6 +329,8 @@ class I2PSamClient:
- dest - base64 private destination
Keywords:
+ - direction - only used for STREAM sessions, can be RECEIVE,
+ CREATE or BOTH (default BOTH)
- i2cphost - hostname for the SAM bridge to contact i2p router on
- i2cpport - port for the SAM bridge to contact i2p router on
@@ -322,6 +341,20 @@ class I2PSamClient:
kw1 = dict(kw)
kw1['STYLE'] = self.samStyle = style
kw1['DESTINATION'] = dest
+ if style == 'STREAM':
+ direction = kw.get('direction', 'BOTH')
+ kw1['DIRECTION'] = direction
+ if direction == 'BOTH':
+ self.canAccept = 1
+ self.canConnect = 1
+ elif direction == 'RECEIVE':
+ self.canAccept = 1
+ self.canConnect = 0
+ elif direction == 'CREATE':
+ self.canAccept = 0
+ self.canConnect = 1
+ else:
+ raise I2PCommandFail("direction keyword must be one of RECEIVE, CREATE or BOTH")
# stick in i2cp host/port if specified
if kw.has_key('i2cphost'):
@@ -423,6 +456,92 @@ class I2PSamClient:
return None
return self.qDatagrams.get()
#@-node:samDatagramReceive
+ #@+node:samStreamConnect
+ def samStreamConnect(self, dest):
+ """
+ Makes a STREAM connection to a remote dest
+
+ STREAM STATUS
+ RESULT=$result
+ ID=$id
+ [MESSAGE=...]
+ """
+ # need an ID
+ id = self.samAllocId()
+
+ # create queue for connect reply
+ q = self.streamConnectReplies[id] = Queue.Queue()
+
+ # send req
+ self.samSend("STREAM", "CONNECT",
+ ID=id,
+ DESTINATION=dest,
+ )
+
+ # await reply - comes back as a dict
+ resp = q.get()
+
+ # ditch queue
+ del self.streamConnectReplies[id]
+ del q
+
+ # check out response
+ result = resp['RESULT']
+ if result == 'OK':
+ conn = I2PSAMStream(self, id, dest)
+ self.streams[id] = conn
+ return conn
+ else:
+ msg = resp.get('MESSAGE', '')
+ raise I2PCommandFail(result, msg, "STREAM CONNECT")
+
+ #@-node:samStreamConnect
+ #@+node:samStreamAccept
+ def samStreamAccept(self):
+ """
+ Waits for an incoming connection, returning a wrapped conn obj
+ """
+ log(4, "waiting for connection")
+ conn = self.qNewStreams.get()
+ log(4, "got connection")
+ return conn
+ #@-node:samStreamAccept
+ #@+node:samStreamSend
+ def samStreamSend(self, conn, data):
+ """
+ DO NOT CALL THIS DIRECTLY
+
+ Invoked by an I2PSAMStream object to transfer data
+ Use the object's .send() method instead.
+
+ conn is the I2PSAMStream
+
+ STREAM SEND
+ ID=$id
+ SIZE=$numBytes\n[$numBytes of data]
+ """
+ # dispatch
+ self.samSend("STREAM", "SEND", data, ID=conn.id)
+
+ # useless, but mimics socket paradigm
+ return len(data)
+
+ #@-node:samStreamSend
+ #@+node:samStreamClose
+ def samStreamClose(self, conn):
+ """
+ DO NOT CALL DIRECTLY
+
+ Invoked by I2PSAMStream to close stream
+ Use the object's .send() method instead.
+
+ STREAM CLOSE
+ ID=$id
+ """
+ self.samSend("STREAM", "CLOSE", ID=conn.id)
+ del self.streams[conn.id]
+
+ #@-node:samStreamClose
#@+node:samNamingLookup
def samNamingLookup(self, host):
"""
@@ -472,7 +591,7 @@ class I2PSamClient:
try:
name, val = arg.split("=", 1)
except:
- logException(3, "failed to process %s" % repr(arg))
+ logException(3, "failed to process %s in %s" % (repr(arg), repr(flds)))
raise
dargs[name] = val
@@ -488,6 +607,8 @@ class I2PSamClient:
+
+
#@-node:samParse
#@+node:samSend
def samSend(self, topic, subtopic, data=None, **kw):
@@ -586,7 +707,90 @@ class I2PSamClient:
def on_STREAM(self, topic, subtopic, args):
"""
Handles STREAM messages from server
+
+ STREAM STATUS
+ RESULT=$result
+ ID=$id
+ [MESSAGE=...]
+
+ STREAM CONNECTED
+ DESTINATION=$base64key
+ ID=$id
+
+ STREAM RECEIVED
+ ID=$id
+ SIZE=$numBytes\n[$numBytes of data]
+
+ STREAM CLOSED
+ RESULT=$result
+ ID=$id
+ [MESSAGE=...]
"""
+ log(4, "got %s %s %s" % (topic, subtopic, args))
+
+ # which stream?
+ id = int(args['ID'])
+
+ # result of prior connection attempt
+ if subtopic == 'STATUS':
+ # stick it on the queue that the caller is waiting on and let the
+ # caller interpret the result
+ self.streamConnectReplies[id].put(args)
+ return
+
+ # notice of incoming connection
+ if subtopic == 'CONNECTED':
+
+ # grab details
+ dest = args['DESTINATION']
+
+ # wrap it in a stream obj
+ conn = I2PSAMStream(self, id, dest)
+ self.streams[id] = conn
+
+ # and put it there for anyone calling samStreamAccept()
+ self.qNewStreams.put(conn)
+
+ # done
+ return
+
+ # notice of received data
+ elif subtopic == 'RECEIVED':
+ # grab details
+ data = args['DATA']
+
+ # lookup the connection
+ conn = self.streams.get(id, None)
+ if not conn:
+ # conn not known, just ditch
+ log(2, "got data, but don't recall any conn with id %s" % id)
+ return
+
+ # and post the received data
+ conn._notifyIncomingData(data)
+
+ log(4, "wrote data to conn's inbound queue")
+
+ # done
+ return
+
+ elif subtopic == 'CLOSED':
+ # lookup the connection
+ conn = self.streams.get(id, None)
+ if not conn:
+ # conn not known, just ditch
+ return
+
+ # mark conn as closed and forget it
+ conn._notifyIncomingData("") # special signal to close
+ conn.isOpen = 0
+ del self.streams[id]
+
+ # done
+ return
+
+
+
#@-node:on_STREAM
#@+node:on_DATAGRAM
def on_DATAGRAM(self, topic, subtopic, args):
@@ -627,6 +831,17 @@ class I2PSamClient:
res = pubkey, privkey
self.qNewDests.put(res)
#@-node:on_DEST
+ #@+node:samAllocId
+ def samAllocId(self):
+ """
+ Allocates a new unique id as required by SAM protocol
+ """
+ self.samNextIdLock.acquire()
+ id = self.samNextId
+ self.samNextId += 1
+ self.samNextIdLock.release()
+ return id
+ #@-node:samAllocId
#@+node:_recvline
def _recvline(self):
"""
@@ -680,6 +895,117 @@ class I2PSamClient:
#@-node:_sendline
#@-others
#@-node:class I2PSamClient
+#@+node:class I2PSAMStream
+class I2PSAMStream:
+ """
+ Wrapper for a stream object
+ """
+ #@ @+others
+ #@+node:__init__
+ def __init__(self, client, id, dest):
+ """
+ """
+ self.client = client
+ self.id = id
+ self.dest = dest
+
+ self.qIncomingData = Queue.Queue()
+
+ self.inbuf = ''
+ self.isOpen = 1
+ #@-node:__init__
+ #@+node:send
+ def send(self, data):
+ """
+ Sends data to a stream connection
+ """
+ # barf if stream not open
+ if not self.isOpen:
+ raise I2PStreamClosed
+
+ # can send
+ return self.client.samStreamSend(self, data)
+ #@-node:send
+ #@+node:recv
+ def recv(self, size):
+ """
+ Retrieves n bytes from peer
+ """
+ chunks = []
+
+ while self.isOpen and size > 0:
+ # try internal buffer first
+ if self.inbuf:
+ chunk = self.inbuf[:size]
+ chunklen = len(chunk)
+ self.inbuf = self.inbuf[chunklen:]
+ chunks.append(chunk)
+ size -= chunklen
+ else:
+ # replenish input buffer
+ log(4, "I2PSAMStream.recv: replenishing input buffer")
+ buf = self.qIncomingData.get()
+ if buf == '':
+ # connection closed by peer
+ self.isOpen = 0
+ break
+ else:
+ # got more data
+ log(4, "I2PSAMStream: queue returned %s" % repr(buf))
+ self.inbuf += buf
+
+ # return whatever we've got, hopefully all
+ return "".join(chunks)
+
+
+ #@-node:recv
+ #@+node:readline
+ def readline(self):
+ """
+ Read a line of text from stream, return the line without trailing newline
+
+ This method really shouldn't exist in a class that's trying to look a bit
+ like a socket object, but what the hell!
+ """
+ chars = []
+ while 1:
+ char = self.recv(1)
+ if char in ['', '\n']:
+ break
+ chars.append(char)
+ return "".join(chars)
+ #@-node:readline
+ #@+node:close
+ def close(self):
+ """
+ close this stream connection
+ """
+ log(4, "closing stream")
+ self.client.samStreamClose(self)
+ log(4, "stream closed")
+ self.isOpen = 0
+
+ # and just to make sure...
+ self.qIncomingData.put("") # busts out of recv() loops
+
+ #@-node:close
+ #@+node:__del__
+ def __del__(self):
+ """
+ Dropping last ref to this object closes stream
+ """
+ self.close()
+ #@-node:__del__
+ #@+node:_notifyIncomingData
+ def _notifyIncomingData(self, data):
+ """
+ Called by client receiver to notify incoming data
+ """
+ log(4, "got %s" % repr(data))
+ self.qIncomingData.put(data)
+ #@-node:_notifyIncomingData
+ #@-others
+#@-node:class I2PSAMStream
#@+node:class I2PRemoteSession
class I2PRemoteSession:
"""
@@ -701,7 +1027,8 @@ class I2PRemoteSession:
#@-node:__init__
#@+node:send
def send(self, peerdest, msg):
-
+ """
+ """
return self.client.send(self.dest, peerdest, msg)
#@-node:send
#@+node:recv
@@ -885,35 +1212,59 @@ def demoSTREAM():
print "Starting SAM STREAM demo..."
print
- print "Instantiating 2 more client connections..."
- c5 = I2PSamClient()
+ print "Instantiating client c6..."
c6 = I2PSamClient()
- print "Creating more dests via SAM"
- pub5, priv5 = c5.samDestGenerate()
+ print "Creating dest for c6"
pub6, priv6 = c6.samDestGenerate()
- print "Creating SAM STREAM SESSION on connection c3..."
- res = c5.samSessionCreate("STREAM", priv5)
+ print "Creating SAM STREAM SESSION on connection c6..."
+ res = c6.samSessionCreate("STREAM", priv6, direction="RECEIVE")
+ if res != 'OK':
+ print "Failed to create STREAM session on connection c6: %s" % repr(res)
+ return
+ print "STREAM Session on connection c6 created successfully"
+
+ print "Launching acceptor thread..."
+ thread.start_new_thread(demoSTREAM_thread, (c6,))
+
+ #print "sleep a while and give the server a chance..."
+ #time.sleep(10)
+
+ print "----------------------------------------"
+
+ print "Instantiating client c5..."
+ c5 = I2PSamClient()
+
+ print "Creating dest for c5"
+ pub5, priv5 = c5.samDestGenerate()
+
+ print "Creating SAM STREAM SESSION on connection c5..."
+ res = c5.samSessionCreate("STREAM", priv5, direction="CREATE")
if res != 'OK':
print "Failed to create STREAM session on connection c5: %s" % repr(res)
return
print "STREAM Session on connection c5 created successfully"
- print "Creating SAM STREAM SESSION on connection c6..."
- res = c6.samSessionCreate("STREAM", priv6)
- if res != 'OK':
- print "Failed to create STREAM session on connection c4: %s" % repr(res)
+ print "----------------------------------------"
+
+ print "Making connection from c5 to c6..."
+
+ #set_trace()
+
+ try:
+ conn_c5 = c5.samStreamConnect(pub6)
+ except:
+ print "Stream Connection failed"
return
- print "STREAM Session on connection c4 created successfully"
+ print "Stream connect succeeded"
+
+ print "Receiving from c5..."
+ buf = conn_c5.readline()
+ print "Got %s" % repr(buf)
- msg = "Hi there, this is a datagram!"
- print "sending from c5 to c6: %s" % repr(msg)
- c5.samStreamSend(pub6, msg)
-
- print "now try to receive from c6 (will block)..."
- msg1 = c6.samStreamReceive()
- print "Connection c6 got %s from %s..." % (repr(msg1), repr(remdest))
+ #print "Try to accept connection on c6..."
+ #conn_c6 = c6.sam
print
print "--------------------------------------"
@@ -921,7 +1272,31 @@ def demoSTREAM():
print "--------------------------------------"
print
+
+
+
+
#@-node:demoSTREAM
+#@+node:demoSTREAM_thread
+def demoSTREAM_thread(sess):
+
+ while 1:
+ sock = sess.samStreamAccept()
+ log(4, "got incoming connection")
+
+ print "**ACCEPTOR SLEEPING 10 secs BEFORE SENDING"
+
+ time.sleep(10)
+
+ sock.send("Hi there, what do you want?\n")
+
+ print "**ACCEPTOR SLEEPING 5 MINS BEFORE CLOSING"
+ time.sleep(300)
+ print "**ACCEPTOR CLOSING STREAM"
+
+ sock.close()
+
+#@-node:demoSTREAM_thread
#@+node:demo
def demo():
"""
@@ -936,10 +1311,10 @@ def demo():
print "-----------------------------------------"
print
- demoNAMING()
- demoRAW()
- demoDATAGRAM()
- #demoSTREAM()
+ #demoNAMING()
+ #demoRAW()
+ #demoDATAGRAM()
+ demoSTREAM()
print
print "-----------------------------------------"