Fixed build.xml to detect os, and launch 'jythonc' or 'jythonc.bat'
according to whether we're running on *nix or windoze. build.xml should now work on your platform, as long as you have jython installed and jython is on your execution path. Got SAM STREAMs working - test code added to i2psamclient.py as function demoSTREAM()
This commit is contained in:
@ -17,6 +17,8 @@ Run this module without arguments to see a demo in action
|
||||
#@+node:imports
|
||||
import sys, os, socket, thread, threading, Queue, traceback, StringIO, time
|
||||
|
||||
from pdb import set_trace
|
||||
|
||||
#@-node:imports
|
||||
#@+node:globals
|
||||
# -----------------------------------------
|
||||
@ -53,6 +55,11 @@ class I2PCommandFail(Exception):
|
||||
A failure in an I2CP command
|
||||
"""
|
||||
pass
|
||||
|
||||
class I2PStreamClosed(Exception):
|
||||
"""
|
||||
Stream is not open
|
||||
"""
|
||||
#@-node:exceptions
|
||||
#@+node:class I2PSamClient
|
||||
class I2PSamClient:
|
||||
@ -110,10 +117,20 @@ class I2PSamClient:
|
||||
self.qSession = Queue.Queue()
|
||||
self.qDatagrams = Queue.Queue()
|
||||
self.qRawMessages = Queue.Queue()
|
||||
|
||||
self.namingReplies = {}
|
||||
self.namingCache = {}
|
||||
|
||||
self.streams = {} # currently open streams, keyed by id
|
||||
self.streamConnectReplies = {} # holds queues awaiting connect resp, keyed by id
|
||||
self.qNewStreams = Queue.Queue() # incoming connections
|
||||
|
||||
self.samNextIdLock = threading.Lock()
|
||||
self.samNextId = 1
|
||||
|
||||
self.isRunning = 1
|
||||
|
||||
|
||||
log(4, "trying connection to SAM server...")
|
||||
try:
|
||||
self.sock.connect((self.host, self.port))
|
||||
@ -312,6 +329,8 @@ class I2PSamClient:
|
||||
- dest - base64 private destination
|
||||
|
||||
Keywords:
|
||||
- direction - only used for STREAM sessions, can be RECEIVE,
|
||||
CREATE or BOTH (default BOTH)
|
||||
- i2cphost - hostname for the SAM bridge to contact i2p router on
|
||||
- i2cpport - port for the SAM bridge to contact i2p router on
|
||||
|
||||
@ -322,6 +341,20 @@ class I2PSamClient:
|
||||
kw1 = dict(kw)
|
||||
kw1['STYLE'] = self.samStyle = style
|
||||
kw1['DESTINATION'] = dest
|
||||
if style == 'STREAM':
|
||||
direction = kw.get('direction', 'BOTH')
|
||||
kw1['DIRECTION'] = direction
|
||||
if direction == 'BOTH':
|
||||
self.canAccept = 1
|
||||
self.canConnect = 1
|
||||
elif direction == 'RECEIVE':
|
||||
self.canAccept = 1
|
||||
self.canConnect = 0
|
||||
elif direction == 'CREATE':
|
||||
self.canAccept = 0
|
||||
self.canConnect = 1
|
||||
else:
|
||||
raise I2PCommandFail("direction keyword must be one of RECEIVE, CREATE or BOTH")
|
||||
|
||||
# stick in i2cp host/port if specified
|
||||
if kw.has_key('i2cphost'):
|
||||
@ -423,6 +456,92 @@ class I2PSamClient:
|
||||
return None
|
||||
return self.qDatagrams.get()
|
||||
#@-node:samDatagramReceive
|
||||
#@+node:samStreamConnect
|
||||
def samStreamConnect(self, dest):
|
||||
"""
|
||||
Makes a STREAM connection to a remote dest
|
||||
|
||||
STREAM STATUS
|
||||
RESULT=$result
|
||||
ID=$id
|
||||
[MESSAGE=...]
|
||||
"""
|
||||
# need an ID
|
||||
id = self.samAllocId()
|
||||
|
||||
# create queue for connect reply
|
||||
q = self.streamConnectReplies[id] = Queue.Queue()
|
||||
|
||||
# send req
|
||||
self.samSend("STREAM", "CONNECT",
|
||||
ID=id,
|
||||
DESTINATION=dest,
|
||||
)
|
||||
|
||||
# await reply - comes back as a dict
|
||||
resp = q.get()
|
||||
|
||||
# ditch queue
|
||||
del self.streamConnectReplies[id]
|
||||
del q
|
||||
|
||||
# check out response
|
||||
result = resp['RESULT']
|
||||
if result == 'OK':
|
||||
conn = I2PSAMStream(self, id, dest)
|
||||
self.streams[id] = conn
|
||||
return conn
|
||||
else:
|
||||
msg = resp.get('MESSAGE', '')
|
||||
raise I2PCommandFail(result, msg, "STREAM CONNECT")
|
||||
|
||||
#@-node:samStreamConnect
|
||||
#@+node:samStreamAccept
|
||||
def samStreamAccept(self):
|
||||
"""
|
||||
Waits for an incoming connection, returning a wrapped conn obj
|
||||
"""
|
||||
log(4, "waiting for connection")
|
||||
conn = self.qNewStreams.get()
|
||||
log(4, "got connection")
|
||||
return conn
|
||||
#@-node:samStreamAccept
|
||||
#@+node:samStreamSend
|
||||
def samStreamSend(self, conn, data):
|
||||
"""
|
||||
DO NOT CALL THIS DIRECTLY
|
||||
|
||||
Invoked by an I2PSAMStream object to transfer data
|
||||
Use the object's .send() method instead.
|
||||
|
||||
conn is the I2PSAMStream
|
||||
|
||||
STREAM SEND
|
||||
ID=$id
|
||||
SIZE=$numBytes\n[$numBytes of data]
|
||||
"""
|
||||
# dispatch
|
||||
self.samSend("STREAM", "SEND", data, ID=conn.id)
|
||||
|
||||
# useless, but mimics socket paradigm
|
||||
return len(data)
|
||||
|
||||
#@-node:samStreamSend
|
||||
#@+node:samStreamClose
|
||||
def samStreamClose(self, conn):
|
||||
"""
|
||||
DO NOT CALL DIRECTLY
|
||||
|
||||
Invoked by I2PSAMStream to close stream
|
||||
Use the object's .send() method instead.
|
||||
|
||||
STREAM CLOSE
|
||||
ID=$id
|
||||
"""
|
||||
self.samSend("STREAM", "CLOSE", ID=conn.id)
|
||||
del self.streams[conn.id]
|
||||
|
||||
#@-node:samStreamClose
|
||||
#@+node:samNamingLookup
|
||||
def samNamingLookup(self, host):
|
||||
"""
|
||||
@ -472,7 +591,7 @@ class I2PSamClient:
|
||||
try:
|
||||
name, val = arg.split("=", 1)
|
||||
except:
|
||||
logException(3, "failed to process %s" % repr(arg))
|
||||
logException(3, "failed to process %s in %s" % (repr(arg), repr(flds)))
|
||||
raise
|
||||
dargs[name] = val
|
||||
|
||||
@ -488,6 +607,8 @@ class I2PSamClient:
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
#@-node:samParse
|
||||
#@+node:samSend
|
||||
def samSend(self, topic, subtopic, data=None, **kw):
|
||||
@ -586,7 +707,90 @@ class I2PSamClient:
|
||||
def on_STREAM(self, topic, subtopic, args):
|
||||
"""
|
||||
Handles STREAM messages from server
|
||||
|
||||
STREAM STATUS
|
||||
RESULT=$result
|
||||
ID=$id
|
||||
[MESSAGE=...]
|
||||
|
||||
STREAM CONNECTED
|
||||
DESTINATION=$base64key
|
||||
ID=$id
|
||||
|
||||
STREAM RECEIVED
|
||||
ID=$id
|
||||
SIZE=$numBytes\n[$numBytes of data]
|
||||
|
||||
STREAM CLOSED
|
||||
RESULT=$result
|
||||
ID=$id
|
||||
[MESSAGE=...]
|
||||
"""
|
||||
log(4, "got %s %s %s" % (topic, subtopic, args))
|
||||
|
||||
# which stream?
|
||||
id = int(args['ID'])
|
||||
|
||||
# result of prior connection attempt
|
||||
if subtopic == 'STATUS':
|
||||
# stick it on the queue that the caller is waiting on and let the
|
||||
# caller interpret the result
|
||||
self.streamConnectReplies[id].put(args)
|
||||
return
|
||||
|
||||
# notice of incoming connection
|
||||
if subtopic == 'CONNECTED':
|
||||
|
||||
# grab details
|
||||
dest = args['DESTINATION']
|
||||
|
||||
# wrap it in a stream obj
|
||||
conn = I2PSAMStream(self, id, dest)
|
||||
self.streams[id] = conn
|
||||
|
||||
# and put it there for anyone calling samStreamAccept()
|
||||
self.qNewStreams.put(conn)
|
||||
|
||||
# done
|
||||
return
|
||||
|
||||
# notice of received data
|
||||
elif subtopic == 'RECEIVED':
|
||||
# grab details
|
||||
data = args['DATA']
|
||||
|
||||
# lookup the connection
|
||||
conn = self.streams.get(id, None)
|
||||
if not conn:
|
||||
# conn not known, just ditch
|
||||
log(2, "got data, but don't recall any conn with id %s" % id)
|
||||
return
|
||||
|
||||
# and post the received data
|
||||
conn._notifyIncomingData(data)
|
||||
|
||||
log(4, "wrote data to conn's inbound queue")
|
||||
|
||||
# done
|
||||
return
|
||||
|
||||
elif subtopic == 'CLOSED':
|
||||
# lookup the connection
|
||||
conn = self.streams.get(id, None)
|
||||
if not conn:
|
||||
# conn not known, just ditch
|
||||
return
|
||||
|
||||
# mark conn as closed and forget it
|
||||
conn._notifyIncomingData("") # special signal to close
|
||||
conn.isOpen = 0
|
||||
del self.streams[id]
|
||||
|
||||
# done
|
||||
return
|
||||
|
||||
|
||||
|
||||
#@-node:on_STREAM
|
||||
#@+node:on_DATAGRAM
|
||||
def on_DATAGRAM(self, topic, subtopic, args):
|
||||
@ -627,6 +831,17 @@ class I2PSamClient:
|
||||
res = pubkey, privkey
|
||||
self.qNewDests.put(res)
|
||||
#@-node:on_DEST
|
||||
#@+node:samAllocId
|
||||
def samAllocId(self):
|
||||
"""
|
||||
Allocates a new unique id as required by SAM protocol
|
||||
"""
|
||||
self.samNextIdLock.acquire()
|
||||
id = self.samNextId
|
||||
self.samNextId += 1
|
||||
self.samNextIdLock.release()
|
||||
return id
|
||||
#@-node:samAllocId
|
||||
#@+node:_recvline
|
||||
def _recvline(self):
|
||||
"""
|
||||
@ -680,6 +895,117 @@ class I2PSamClient:
|
||||
#@-node:_sendline
|
||||
#@-others
|
||||
#@-node:class I2PSamClient
|
||||
#@+node:class I2PSAMStream
|
||||
class I2PSAMStream:
|
||||
"""
|
||||
Wrapper for a stream object
|
||||
"""
|
||||
#@ @+others
|
||||
#@+node:__init__
|
||||
def __init__(self, client, id, dest):
|
||||
"""
|
||||
"""
|
||||
self.client = client
|
||||
self.id = id
|
||||
self.dest = dest
|
||||
|
||||
self.qIncomingData = Queue.Queue()
|
||||
|
||||
self.inbuf = ''
|
||||
self.isOpen = 1
|
||||
#@-node:__init__
|
||||
#@+node:send
|
||||
def send(self, data):
|
||||
"""
|
||||
Sends data to a stream connection
|
||||
"""
|
||||
# barf if stream not open
|
||||
if not self.isOpen:
|
||||
raise I2PStreamClosed
|
||||
|
||||
# can send
|
||||
return self.client.samStreamSend(self, data)
|
||||
#@-node:send
|
||||
#@+node:recv
|
||||
def recv(self, size):
|
||||
"""
|
||||
Retrieves n bytes from peer
|
||||
"""
|
||||
chunks = []
|
||||
|
||||
while self.isOpen and size > 0:
|
||||
# try internal buffer first
|
||||
if self.inbuf:
|
||||
chunk = self.inbuf[:size]
|
||||
chunklen = len(chunk)
|
||||
self.inbuf = self.inbuf[chunklen:]
|
||||
chunks.append(chunk)
|
||||
size -= chunklen
|
||||
else:
|
||||
# replenish input buffer
|
||||
log(4, "I2PSAMStream.recv: replenishing input buffer")
|
||||
buf = self.qIncomingData.get()
|
||||
if buf == '':
|
||||
# connection closed by peer
|
||||
self.isOpen = 0
|
||||
break
|
||||
else:
|
||||
# got more data
|
||||
log(4, "I2PSAMStream: queue returned %s" % repr(buf))
|
||||
self.inbuf += buf
|
||||
|
||||
# return whatever we've got, hopefully all
|
||||
return "".join(chunks)
|
||||
|
||||
|
||||
#@-node:recv
|
||||
#@+node:readline
|
||||
def readline(self):
|
||||
"""
|
||||
Read a line of text from stream, return the line without trailing newline
|
||||
|
||||
This method really shouldn't exist in a class that's trying to look a bit
|
||||
like a socket object, but what the hell!
|
||||
"""
|
||||
chars = []
|
||||
while 1:
|
||||
char = self.recv(1)
|
||||
if char in ['', '\n']:
|
||||
break
|
||||
chars.append(char)
|
||||
return "".join(chars)
|
||||
#@-node:readline
|
||||
#@+node:close
|
||||
def close(self):
|
||||
"""
|
||||
close this stream connection
|
||||
"""
|
||||
log(4, "closing stream")
|
||||
self.client.samStreamClose(self)
|
||||
log(4, "stream closed")
|
||||
self.isOpen = 0
|
||||
|
||||
# and just to make sure...
|
||||
self.qIncomingData.put("") # busts out of recv() loops
|
||||
|
||||
#@-node:close
|
||||
#@+node:__del__
|
||||
def __del__(self):
|
||||
"""
|
||||
Dropping last ref to this object closes stream
|
||||
"""
|
||||
self.close()
|
||||
#@-node:__del__
|
||||
#@+node:_notifyIncomingData
|
||||
def _notifyIncomingData(self, data):
|
||||
"""
|
||||
Called by client receiver to notify incoming data
|
||||
"""
|
||||
log(4, "got %s" % repr(data))
|
||||
self.qIncomingData.put(data)
|
||||
#@-node:_notifyIncomingData
|
||||
#@-others
|
||||
#@-node:class I2PSAMStream
|
||||
#@+node:class I2PRemoteSession
|
||||
class I2PRemoteSession:
|
||||
"""
|
||||
@ -701,7 +1027,8 @@ class I2PRemoteSession:
|
||||
#@-node:__init__
|
||||
#@+node:send
|
||||
def send(self, peerdest, msg):
|
||||
|
||||
"""
|
||||
"""
|
||||
return self.client.send(self.dest, peerdest, msg)
|
||||
#@-node:send
|
||||
#@+node:recv
|
||||
@ -885,35 +1212,59 @@ def demoSTREAM():
|
||||
print "Starting SAM STREAM demo..."
|
||||
print
|
||||
|
||||
print "Instantiating 2 more client connections..."
|
||||
c5 = I2PSamClient()
|
||||
print "Instantiating client c6..."
|
||||
c6 = I2PSamClient()
|
||||
|
||||
print "Creating more dests via SAM"
|
||||
pub5, priv5 = c5.samDestGenerate()
|
||||
print "Creating dest for c6"
|
||||
pub6, priv6 = c6.samDestGenerate()
|
||||
|
||||
print "Creating SAM STREAM SESSION on connection c3..."
|
||||
res = c5.samSessionCreate("STREAM", priv5)
|
||||
print "Creating SAM STREAM SESSION on connection c6..."
|
||||
res = c6.samSessionCreate("STREAM", priv6, direction="RECEIVE")
|
||||
if res != 'OK':
|
||||
print "Failed to create STREAM session on connection c6: %s" % repr(res)
|
||||
return
|
||||
print "STREAM Session on connection c6 created successfully"
|
||||
|
||||
print "Launching acceptor thread..."
|
||||
thread.start_new_thread(demoSTREAM_thread, (c6,))
|
||||
|
||||
#print "sleep a while and give the server a chance..."
|
||||
#time.sleep(10)
|
||||
|
||||
print "----------------------------------------"
|
||||
|
||||
print "Instantiating client c5..."
|
||||
c5 = I2PSamClient()
|
||||
|
||||
print "Creating dest for c5"
|
||||
pub5, priv5 = c5.samDestGenerate()
|
||||
|
||||
print "Creating SAM STREAM SESSION on connection c5..."
|
||||
res = c5.samSessionCreate("STREAM", priv5, direction="CREATE")
|
||||
if res != 'OK':
|
||||
print "Failed to create STREAM session on connection c5: %s" % repr(res)
|
||||
return
|
||||
print "STREAM Session on connection c5 created successfully"
|
||||
|
||||
print "Creating SAM STREAM SESSION on connection c6..."
|
||||
res = c6.samSessionCreate("STREAM", priv6)
|
||||
if res != 'OK':
|
||||
print "Failed to create STREAM session on connection c4: %s" % repr(res)
|
||||
print "----------------------------------------"
|
||||
|
||||
print "Making connection from c5 to c6..."
|
||||
|
||||
#set_trace()
|
||||
|
||||
try:
|
||||
conn_c5 = c5.samStreamConnect(pub6)
|
||||
except:
|
||||
print "Stream Connection failed"
|
||||
return
|
||||
print "STREAM Session on connection c4 created successfully"
|
||||
print "Stream connect succeeded"
|
||||
|
||||
print "Receiving from c5..."
|
||||
buf = conn_c5.readline()
|
||||
print "Got %s" % repr(buf)
|
||||
|
||||
msg = "Hi there, this is a datagram!"
|
||||
print "sending from c5 to c6: %s" % repr(msg)
|
||||
c5.samStreamSend(pub6, msg)
|
||||
|
||||
print "now try to receive from c6 (will block)..."
|
||||
msg1 = c6.samStreamReceive()
|
||||
print "Connection c6 got %s from %s..." % (repr(msg1), repr(remdest))
|
||||
#print "Try to accept connection on c6..."
|
||||
#conn_c6 = c6.sam
|
||||
|
||||
print
|
||||
print "--------------------------------------"
|
||||
@ -921,7 +1272,31 @@ def demoSTREAM():
|
||||
print "--------------------------------------"
|
||||
print
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
#@-node:demoSTREAM
|
||||
#@+node:demoSTREAM_thread
|
||||
def demoSTREAM_thread(sess):
|
||||
|
||||
while 1:
|
||||
sock = sess.samStreamAccept()
|
||||
log(4, "got incoming connection")
|
||||
|
||||
print "**ACCEPTOR SLEEPING 10 secs BEFORE SENDING"
|
||||
|
||||
time.sleep(10)
|
||||
|
||||
sock.send("Hi there, what do you want?\n")
|
||||
|
||||
print "**ACCEPTOR SLEEPING 5 MINS BEFORE CLOSING"
|
||||
time.sleep(300)
|
||||
print "**ACCEPTOR CLOSING STREAM"
|
||||
|
||||
sock.close()
|
||||
|
||||
#@-node:demoSTREAM_thread
|
||||
#@+node:demo
|
||||
def demo():
|
||||
"""
|
||||
@ -936,10 +1311,10 @@ def demo():
|
||||
print "-----------------------------------------"
|
||||
print
|
||||
|
||||
demoNAMING()
|
||||
demoRAW()
|
||||
demoDATAGRAM()
|
||||
#demoSTREAM()
|
||||
#demoNAMING()
|
||||
#demoRAW()
|
||||
#demoDATAGRAM()
|
||||
demoSTREAM()
|
||||
|
||||
print
|
||||
print "-----------------------------------------"
|
||||
|
Reference in New Issue
Block a user