From 5214436d18f5b6d82a342c269790ed6f6903eb5c Mon Sep 17 00:00:00 2001 From: jrandom Date: Wed, 21 Jul 2004 07:42:29 +0000 Subject: [PATCH] initial import of Connelly's public domain I2P python lib --- apps/sam/python/bugs.txt | 18 + apps/sam/python/doc/guide/eeproxy.html | 33 + apps/sam/python/doc/guide/i2p.eep.html | 55 ++ apps/sam/python/doc/guide/i2p.html | 61 ++ apps/sam/python/doc/guide/i2p.router.html | 87 ++ apps/sam/python/doc/guide/i2p.sam.html | 308 +++++++ apps/sam/python/doc/guide/index.html | 76 ++ apps/sam/python/doc/guide/samproxy.html | 37 + apps/sam/python/doc/index.html | 14 + apps/sam/python/doc/pydoc/i2p.eep.html | 40 + apps/sam/python/doc/pydoc/i2p.html | 90 ++ apps/sam/python/doc/pydoc/i2p.router.html | 38 + apps/sam/python/doc/pydoc/i2p.sam.html | 374 +++++++++ apps/sam/python/doc/pydoc/i2p.samclasses.html | 314 +++++++ apps/sam/python/doc/pydoc/makedoc.py | 36 + apps/sam/python/readme.txt | 34 + apps/sam/python/setup.py | 11 + apps/sam/python/src/examples/datagram.py | 15 + .../python/src/examples/datagram_noblock.py | 20 + .../python/src/examples/datagram_server.py | 14 + apps/sam/python/src/examples/dos.py | 33 + apps/sam/python/src/examples/examples.txt | 14 + apps/sam/python/src/examples/raw.py | 10 + apps/sam/python/src/examples/raw_noblock.py | 19 + apps/sam/python/src/examples/raw_server.py | 13 + apps/sam/python/src/examples/stream.py | 11 + apps/sam/python/src/examples/stream_eepget.py | 17 + .../sam/python/src/examples/stream_noblock.py | 39 + apps/sam/python/src/examples/stream_server.py | 28 + apps/sam/python/src/i2p/__init__.py | 20 + apps/sam/python/src/i2p/eep.py | 47 ++ apps/sam/python/src/i2p/router.py | 188 +++++ apps/sam/python/src/i2p/sam.py | 706 ++++++++++++++++ apps/sam/python/src/i2p/samclasses.py | 769 ++++++++++++++++++ apps/sam/python/src/i2p/test/readme.txt | 6 + apps/sam/python/src/i2p/test/test_eep.py | 37 + apps/sam/python/src/i2p/test/test_sam.py | 442 ++++++++++ .../python/src/i2p/test/test_samclasses.py | 446 ++++++++++ apps/sam/python/todo.txt | 13 + 39 files changed, 4533 insertions(+) create mode 100644 apps/sam/python/bugs.txt create mode 100644 apps/sam/python/doc/guide/eeproxy.html create mode 100644 apps/sam/python/doc/guide/i2p.eep.html create mode 100644 apps/sam/python/doc/guide/i2p.html create mode 100644 apps/sam/python/doc/guide/i2p.router.html create mode 100644 apps/sam/python/doc/guide/i2p.sam.html create mode 100644 apps/sam/python/doc/guide/index.html create mode 100644 apps/sam/python/doc/guide/samproxy.html create mode 100644 apps/sam/python/doc/index.html create mode 100644 apps/sam/python/doc/pydoc/i2p.eep.html create mode 100644 apps/sam/python/doc/pydoc/i2p.html create mode 100644 apps/sam/python/doc/pydoc/i2p.router.html create mode 100644 apps/sam/python/doc/pydoc/i2p.sam.html create mode 100644 apps/sam/python/doc/pydoc/i2p.samclasses.html create mode 100644 apps/sam/python/doc/pydoc/makedoc.py create mode 100644 apps/sam/python/readme.txt create mode 100644 apps/sam/python/setup.py create mode 100644 apps/sam/python/src/examples/datagram.py create mode 100644 apps/sam/python/src/examples/datagram_noblock.py create mode 100644 apps/sam/python/src/examples/datagram_server.py create mode 100644 apps/sam/python/src/examples/dos.py create mode 100644 apps/sam/python/src/examples/examples.txt create mode 100644 apps/sam/python/src/examples/raw.py create mode 100644 apps/sam/python/src/examples/raw_noblock.py create mode 100644 apps/sam/python/src/examples/raw_server.py create mode 100644 apps/sam/python/src/examples/stream.py create mode 100644 apps/sam/python/src/examples/stream_eepget.py create mode 100644 apps/sam/python/src/examples/stream_noblock.py create mode 100644 apps/sam/python/src/examples/stream_server.py create mode 100644 apps/sam/python/src/i2p/__init__.py create mode 100644 apps/sam/python/src/i2p/eep.py create mode 100644 apps/sam/python/src/i2p/router.py create mode 100644 apps/sam/python/src/i2p/sam.py create mode 100644 apps/sam/python/src/i2p/samclasses.py create mode 100644 apps/sam/python/src/i2p/test/readme.txt create mode 100644 apps/sam/python/src/i2p/test/test_eep.py create mode 100644 apps/sam/python/src/i2p/test/test_sam.py create mode 100644 apps/sam/python/src/i2p/test/test_samclasses.py create mode 100644 apps/sam/python/todo.txt diff --git a/apps/sam/python/bugs.txt b/apps/sam/python/bugs.txt new file mode 100644 index 000000000..93005b13b --- /dev/null +++ b/apps/sam/python/bugs.txt @@ -0,0 +1,18 @@ + +Known Bugs: + * TunnelServer may crash the I2P router in the following + ways when a large file is downloaded: + + * Out of memory exception (for large files) + * Mysterious router death with no errors in the router logs + (more recently) + * BUG! in SAM proxy + See http://oregonstate.edu/~barnesc/temp/sam_crash.txt + * A small number of datagram packets sent are lost (even in a local + loopback). This is apparently a bug in I2P. + * Errors raised for sockets are non entirely consistent. + See todo.txt for how to fix this. + * A session does not close until a program exits. + This should be fine once I2P is patched to allow multiple + programs to use a single session at once. + * i2p.router.start() does not work. \ No newline at end of file diff --git a/apps/sam/python/doc/guide/eeproxy.html b/apps/sam/python/doc/guide/eeproxy.html new file mode 100644 index 000000000..f5a2e6945 --- /dev/null +++ b/apps/sam/python/doc/guide/eeproxy.html @@ -0,0 +1,33 @@ + +Eeproxy - Wikipedia + + + + + + + + +

Eeproxy

From Python-I2P. + + +

+The Eeproxy is run by the I2P router. The proxy is normally used for web browsers, as a means of accessing eepsites. + + +

+The eeproxy is usually available at http://127.0.0.1:4444/. + +

+

+

+ + \ No newline at end of file diff --git a/apps/sam/python/doc/guide/i2p.eep.html b/apps/sam/python/doc/guide/i2p.eep.html new file mode 100644 index 000000000..3cd24db4d --- /dev/null +++ b/apps/sam/python/doc/guide/i2p.eep.html @@ -0,0 +1,55 @@ + +User's Guide:i2p.eep - Wikipedia + + + + + + + + +

User's Guide:i2p.eep

From Python-I2P. + + +

+Module i2p.eep allows Python programs to access the Eeproxy. + + +

+With this module, a program can easily download eepsites. + +

+ +

Functions

+ +

+urlopen(url, eepaddr='127.0.0.1:4444') +

+urlget(url, eepaddr='127.0.0.1:4444') + + +

+

+

+ + + \ No newline at end of file diff --git a/apps/sam/python/doc/guide/i2p.html b/apps/sam/python/doc/guide/i2p.html new file mode 100644 index 000000000..a5922a4b8 --- /dev/null +++ b/apps/sam/python/doc/guide/i2p.html @@ -0,0 +1,61 @@ + +User's Guide:i2p - Wikipedia + + + + + + + + +

User's Guide:i2p

From Python-I2P. + + +

+Package i2p is a container package for more specific modules. + + +

+It exports the following names: +

+ +

+class Error(Exception): +

+ +

+class RouterError(Error): + +

+ +

+

+

+ + + \ No newline at end of file diff --git a/apps/sam/python/doc/guide/i2p.router.html b/apps/sam/python/doc/guide/i2p.router.html new file mode 100644 index 000000000..782496f5f --- /dev/null +++ b/apps/sam/python/doc/guide/i2p.router.html @@ -0,0 +1,87 @@ + +User's Guide:i2p.router - Wikipedia + + + + + + + + +

User's Guide:i2p.router

From Python-I2P. + + +

+Module i2p.sam allows Python programs to control the I2P router. + + +

+ +

Functions

+ +

+check(dir=None) +

+find(dir=None) + +start(dir=None, hidden=False) + +stop(dir=None, force=False) + + +

+

+

+ + diff --git a/apps/sam/python/doc/guide/i2p.sam.html b/apps/sam/python/doc/guide/i2p.sam.html new file mode 100644 index 000000000..b697927a5 --- /dev/null +++ b/apps/sam/python/doc/guide/i2p.sam.html @@ -0,0 +1,308 @@ + +User's Guide:i2p.sam - Wikipedia + + + + + + + + +

User's Guide:i2p.sam

From Python-I2P. + + +

+Module i2p.sam allows Python programs to access the SAM proxy. + + +

+With this module, a program can send stream data, datagrams, and raw packets across the I2P network. + +

+ +

+Table of contents
+ + + + + +

+

Sockets

+ +

+ +socket(session, type, samaddr='127.0.0.1:7656', **kwargs) +

+socket() object properties: + +poll() + + + +

+resolve(host, samaddr='127.0.0.1:7656') +

+ +

+select(readlist, writelist, errlist, timeout=None) +

+ +

+ +

Tunnels

+ +

+Tunnels allow stream sockets to be joined, so that connections to a listening socket are relayed to one or more sending sockets. This allows an ordinary web server to be exposed as an I2P Destination, or an I2P Destination to be bound as a local port, and so on. + +

+class Tunnel(self, receive, make_send, nconnect=-1, timeout=60.0) +

+ +

+close() +

+ +

+ +

Tunnel Server

+ +

+class TunnelServer(session, port, samaddr='127.0.0.1:7656', nconnect=-1, timeout=None, **kwargs) +

+ +

+TunnelServer properties: + +

+ +

+ +

Tunnel Client

+ +

+class TunnelClient(session, port, dest, samaddr='127.0.0.1:7656', nconnect=-1, timeout=None, **kwargs) +

+ +

+TunnelClient properties: +

+ +

+ +

Errors

+ +

+class Error(i2p.Error) +

+class BlockError(Error) + + +class ClosedError(Error) + +class NetworkError(Error) + + +

+ +

Constants

+ +

+Socket types +

+Packet sizes + +Flags for recv() + +Polling flags + + + +

+

+

+ + diff --git a/apps/sam/python/doc/guide/index.html b/apps/sam/python/doc/guide/index.html new file mode 100644 index 000000000..3c9f587d0 --- /dev/null +++ b/apps/sam/python/doc/guide/index.html @@ -0,0 +1,76 @@ + +Main Page - Wikipedia + + + + + + + + +

Main Page

From Python-I2P. + + +

+Python-I2P is a Python interface to I2P. + + +

+ +

Quick Start

+ +

+Install: + +

+

+ +

+Use: + +

+ +

+ +

+ +

User's Guide

+ +

+The following modules are available: + +

+

+ +

+

+

+ + diff --git a/apps/sam/python/doc/guide/samproxy.html b/apps/sam/python/doc/guide/samproxy.html new file mode 100644 index 000000000..ff43ebdd6 --- /dev/null +++ b/apps/sam/python/doc/guide/samproxy.html @@ -0,0 +1,37 @@ + +SAM proxy - Wikipedia + + + + + + + + +

SAM proxy

From Python-I2P. + + +

+A SAM proxy is run by the I2P router. The proxy allows streams, datagrams, and raw packets to be sent through the I2P network. A client application uses a telnet-like session to communicate with the proxy. In this way, the core features of the I2P library can be used by any language. + + +

+The protocol used for SAM is described in SAM 1.0. + +

+In practice, a SAM library is usually written, so that client applications do not need to use the low-level SAM commands. + +

+

+

+ + + \ No newline at end of file diff --git a/apps/sam/python/doc/index.html b/apps/sam/python/doc/index.html new file mode 100644 index 000000000..ff847ead4 --- /dev/null +++ b/apps/sam/python/doc/index.html @@ -0,0 +1,14 @@ + + + +

Python-I2P

+ +Documentation: + + + + + \ No newline at end of file diff --git a/apps/sam/python/doc/pydoc/i2p.eep.html b/apps/sam/python/doc/pydoc/i2p.eep.html new file mode 100644 index 000000000..553ef3d9e --- /dev/null +++ b/apps/sam/python/doc/pydoc/i2p.eep.html @@ -0,0 +1,40 @@ + + +Python: module i2p.eep + + + + +
 
+ 
i2p.eep
index
d:\code\i2p\i2p\eep.py
+

Eeproxy Python API

+

+ + + + + +
 
+Modules
       
urllib2
+

+ + + + + +
 
+Functions
       
urlget(url, eepaddr='127.0.0.1:4444')
Get contents of an eepsite.
+Example: urlget('http://duck.i2p/').
+
urlopen(url, eepaddr='127.0.0.1:4444')
Like urllib2.urlopen(url), but only works for eep-sites.
+Example: f = urlopen('http://duck.i2p/index.html')
+

+ + + + + +
 
+Data
       eepaddr = '127.0.0.1:4444'
+ \ No newline at end of file diff --git a/apps/sam/python/doc/pydoc/i2p.html b/apps/sam/python/doc/pydoc/i2p.html new file mode 100644 index 000000000..3b884e22e --- /dev/null +++ b/apps/sam/python/doc/pydoc/i2p.html @@ -0,0 +1,90 @@ + + +Python: package i2p + + + + +
 
+ 
i2p
index
d:\code\i2p\i2p\__init__.py
+

i2p -- I2P Python interface

+

+ + + + + +
 
+Package Contents
       
eep
+
router
+
sam
+
samclasses
+

+ + + + + +
 
+Classes
       
+
exceptions.Exception +
+
+
Error +
+
+
RouterError +
+
+
+
+
+

+ + + + + + + +
 
+class Error(exceptions.Exception)
   Base class for all I2P errors.
 
 Methods inherited from exceptions.Exception:
+
__getitem__(...)
+ +
__init__(...)
+ +
__str__(...)
+ +

+ + + + + + + +
 
+class RouterError(Error)
   Could not connect to router.
 
 
Method resolution order:
+
RouterError
+
Error
+
exceptions.Exception
+
+
+Methods inherited from exceptions.Exception:
+
__getitem__(...)
+ +
__init__(...)
+ +
__str__(...)
+ +

+ + + + + +
 
+Data
       __all__ = ['Error', 'RouterError', 'sam', 'eep', 'router']
+ \ No newline at end of file diff --git a/apps/sam/python/doc/pydoc/i2p.router.html b/apps/sam/python/doc/pydoc/i2p.router.html new file mode 100644 index 000000000..e4b888345 --- /dev/null +++ b/apps/sam/python/doc/pydoc/i2p.router.html @@ -0,0 +1,38 @@ + + +Python: module i2p.router + + + + +
 
+ 
i2p.router
index
d:\code\i2p\i2p\router.py
+

Router Control API for Python

+

+ + + + + +
 
+Modules
       
i2p
+os
+
socket
+sys
+
threading
+time
+
urllib2
+

+ + + + + +
 
+Data
       check_addrlist = ['127.0.0.1:7656', '127.0.0.1:4444']
+our_router = False
+our_router_lock = <thread.lock object at 0x008AD0F0>
+router_config = 'router.config'
+ \ No newline at end of file diff --git a/apps/sam/python/doc/pydoc/i2p.sam.html b/apps/sam/python/doc/pydoc/i2p.sam.html new file mode 100644 index 000000000..dcf9e2029 --- /dev/null +++ b/apps/sam/python/doc/pydoc/i2p.sam.html @@ -0,0 +1,374 @@ + + +Python: module i2p.sam + + + + +
 
+ 
i2p.sam
index
d:\code\i2p\i2p\sam.py
+

SAM Python API

+

+ + + + + +
 
+Modules
       
Queue
+copy
+i2p
+
select
+socket
+i2p.samclasses
+
thread
+threading
+time
+

+ + + + + +
 
+Classes
       
+
i2p.Error(exceptions.Exception) +
+
+
Error +
+
+
BlockError +
ClosedError +
NetworkError +
Timeout +
+
+
+
+
Poll +
Socket +
Tunnel +
+
+
TunnelClient +
TunnelServer +
+
+
+

+ + + + + + + +
 
+class BlockError(Error)
   Socket call would have blocked.
 
 
Method resolution order:
+
BlockError
+
Error
+
i2p.Error
+
exceptions.Exception
+
+
+Methods inherited from exceptions.Exception:
+
__getitem__(...)
+ +
__init__(...)
+ +
__str__(...)
+ +

+ + + + + + + +
 
+class ClosedError(Error)
   A command was used on a socket that closed gracefully.
 
 
Method resolution order:
+
ClosedError
+
Error
+
i2p.Error
+
exceptions.Exception
+
+
+Methods inherited from exceptions.Exception:
+
__getitem__(...)
+ +
__init__(...)
+ +
__str__(...)
+ +

+ + + + + + + +
 
+class Error(i2p.Error)
   Base class for all SAM errors.
 
 
Method resolution order:
+
Error
+
i2p.Error
+
exceptions.Exception
+
+
+Methods inherited from exceptions.Exception:
+
__getitem__(...)
+ +
__init__(...)
+ +
__str__(...)
+ +

+ + + + + + + +
 
+class NetworkError(Error)
   Network error occurred within I2P.
+The error object is a 2-tuple: (errtag, errdesc).
+errtag is a SAM error string,
+errdesc is a human readable error description.
 
 
Method resolution order:
+
NetworkError
+
Error
+
i2p.Error
+
exceptions.Exception
+
+
+Methods inherited from exceptions.Exception:
+
__getitem__(...)
+ +
__init__(...)
+ +
__str__(...)
+ +

+ + + + + + + +
 
+class Poll
   Class implementing poll interface.  Works for Python sockets
+and SAM sockets.
 
 Methods defined here:
+
__init__(self)
+ +
poll(self, timeout=None)
+ +
register(self, fd, eventmask=13)
+ +
unregister(self, fd)
+ +

+ + + + + + + +
 
+class Socket
   A socket object.
 
 Methods defined here:
+
__deepcopy__(self, memo)
+ +
__init__(self, session, type, samaddr='127.0.0.1:7656', **kwargs)
Equivalent to socket().
+ +
accept(self)
+ +
bind(self, address)
+ +
close(self)
+ +
connect(self, address)
+ +
connect_ex(self, address)
+ +
getpeername(self)
+ +
getsockname(self)
+ +
gettimeout(self)
+ +
listen(self, backlog)
+ +
makefile(self, mode='r', bufsize=-1)
+ +
recv(self, bufsize, flags=0)
+ +
recvfrom(self, bufsize, flags=0)
For a datagram or raw socket, bufsize = -1 indicates that the
+entire packet should be retrieved.
+ +
send(self, string, flags=0)
+ +
sendall(self, string, flags=0)
+ +
sendto(self, string, flags, address)
+ +
setblocking(self, flag)
+ +
settimeout(self, value)
+ +
+Properties defined here:
+
dest
+
Local I2P Destination of socket
+
+
session
+
Session name
+
+
type
+
Socket type: SOCK_STREAM, SOCK_DGRAM, or SOCK_RAW.
+
+

+ + + + + + + +
 
+class Timeout(Error)
   Time out occurred for a socket which had timeouts enabled
+via a prior call to settimeout().
 
 
Method resolution order:
+
Timeout
+
Error
+
i2p.Error
+
exceptions.Exception
+
+
+Methods inherited from exceptions.Exception:
+
__getitem__(...)
+ +
__init__(...)
+ +
__str__(...)
+ +

+ + + + + +
 
+class Tunnel
    Methods defined here:
+
__init__(self, receive, make_send, nconnect=-1, timeout=60.0)
Tunnel relays connections from a 'receive' socket to one
+or more 'send' sockets.  The receive socket must be bound
+and listening.  For each incoming connection, a new send
+socket is created by calling make_send().  Data is then
+exchanged between the created streams until one socket is
+closed.  nconnect is the maximum number of simultaneous
+connections (-1 for infinite), and timeout is the time that
+a single connection can last for (None allows a connection
+to last forever).

+Sockets must accept stream traffic and support the Python
+socket interface.  A separate daemonic thread is created to
+manage the tunnel.  For high performance, make_send() should
+make a socket and connect in non-blocking mode (you should
+catch and discard the sam.BlockError or socket.error due to
+executing connect on a non-blocking socket).

+Security Note:
+A firewall is needed to maintain the end user's anonymity.
+An attacker could keep a tunnel socket open by pinging it
+regularly.  The accepted sockets from 'receive' must prevent
+this by closing down eventually.

+Socket errors do not cause the Tunnel to shut down.
+ +
close(self)
Close all connections made for this tunnel.
+ +

+ + + + + +
 
+class TunnelClient(Tunnel)
    Methods defined here:
+
__init__(self, session, port, dest, samaddr='127.0.0.1:7656', nconnect=-1, timeout=None, **kwargs)
Tunnels localhost:port --> I2P Destination dest.

+A session named 'session' is created locally, for purposes
+of routing to 'dest'.  nconnect and timeout are the maximum
+number of connections and maximum time per connection.  All
+other arguments are passed to sam.socket().  This call blocks
+until the tunnel is ready.
+ +
+Properties defined here:
+
dest
+
get = 'Local Destination used for routing.'
+
+
remotedest
+
Remote Destination.
+
+
session
+
get = 'Session name for local Destination.'
+
+
+Methods inherited from Tunnel:
+
close(self)
Close all connections made for this tunnel.
+ +

+ + + + + +
 
+class TunnelServer(Tunnel)
    Methods defined here:
+
__init__(self, session, port, samaddr='127.0.0.1:7656', nconnect=-1, timeout=None, **kwargs)
Tunnels incoming SAM streams --> localhost:port.

+nconnect and timeout are the maximum number of connections
+and maximum time per connection.  All other arguments are
+passed to sam.socket().  This call blocks until the tunnel
+is ready.
+ +
+Properties defined here:
+
dest
+
I2P Destination of server.
+
+
session
+
Session name for server.
+
+
+Methods inherited from Tunnel:
+
close(self)
Close all connections made for this tunnel.
+ +

+ + + + + +
 
+Data
       MAX_DGRAM = 31744
+MAX_RAW = 32768
+MSG_DONTWAIT = 128
+MSG_PEEK = 2
+MSG_WAITALL = 64
+POLLERR = 8
+POLLHUP = 16
+POLLIN = 1
+POLLNVAL = 32
+POLLOUT = 4
+POLLPRI = 1
+SOCK_DGRAM = 2
+SOCK_RAW = 3
+SOCK_STREAM = 1
+samaddr = '127.0.0.1:7656'
+samver = 1.0
+ \ No newline at end of file diff --git a/apps/sam/python/doc/pydoc/i2p.samclasses.html b/apps/sam/python/doc/pydoc/i2p.samclasses.html new file mode 100644 index 000000000..111b1e48a --- /dev/null +++ b/apps/sam/python/doc/pydoc/i2p.samclasses.html @@ -0,0 +1,314 @@ + + +Python: module i2p.samclasses + + + + +
 
+ 
i2p.samclasses
index
d:\code\i2p\i2p\samclasses.py
+

Lower-level SAM API, interfaces with SAM Bridge.

+For internal use only.

+Use the higher level i2p.sam module for your own programs.

+For details on SAM, see "Simple Anonymous Messaging (SAM) v1.0,"
+as published by jrandom.

+Class Overview:

+  SAMTerminal:     Message sender/reader, talks to SAM Bridge
+                   through a single socket.
+  StringBuffer:    Queue for character data.
+  BaseSession:     SAM session classes are derived from this.
+  StreamSession:   Manipulate a SAM stream session through a
+                   threadsafe, high-level interface.
+  DatagramSession: SAM datagram session, threadsafe, high level.
+  RawSession:      SAM raw session, threadsafe, high level.

+Note that a 'None' timeout is an infinite timeout: it
+blocks forever if necessary.

+Todo:
+  * Error handling is a huge mess.  Neaten it up.
+    Subclass a ErrorMixin class, then use set_error(e),
+    check_error(), get_error().
+  * Streams are a huge mess.  Neaten them up.
+  * This whole interface is a tad confusing.  Neaten it up.

+

+ + + + + +
 
+Modules
       
Queue
+i2p
+random
+
shlex
+socket
+string
+
sys
+thread
+threading
+
time
+traceback
+

+ + + + + +
 
+Classes
       
+
BaseSession +
+
+
DatagramSession +
RawSession +
StreamSession +
+
+
Deque +
+
+
StringBuffer +
+
+
SAMTerminal +
Stream +
+

+ + + + + + + +
 
+class BaseSession
   Base session, from which StreamSessionDatagramSession,
+and RawSession are derived.
 
 Methods defined here:
+
__init__(self, addr='')
+ +
close(self)
Close the session.
+ +

+ + + + + + + +
 
+class DatagramSession(BaseSession)
   Datagram session.  All methods are blocking and threadsafe.
 
 Methods defined here:
+
__init__(self, name, addr='', **kwargs)
+ +
__len__(self)
Number of packets in read buffer.
+ +
recv(self, timeout=None, peek=False)
Get a single packet.  Blocks for up to timeout seconds if
+n > 0 and no packet is available (timeout=None means wait
+forever).  If still no packet is available, raises BlockError
+or Timeout.  Returns the pair (data, address).  If peek is
+True, the data is not removed.
+ +
send(self, s, dest)
Send packet with contents s to given destination.
+ +
+Methods inherited from BaseSession:
+
close(self)
Close the session.
+ +

+ + + + + + + +
 
+class Deque
   A double-ended queue.
 
 Methods defined here:
+
__init__(self)
+ +
__len__(self)
Number of items in the deque.
+ +
pop_first(self)
Pop an item off the beginning of the deque, and return it.
+ +
pop_last(self)
Pop an item off the end of the deque, and return it.
+ +
push_first(self, obj)
Prepend obj to the beginning of the deque.
+ +
push_last(self, obj)
Append obj to the end of the deque.
+ +

+ + + + + + + +
 
+class RawSession(BaseSession)
   Raw session.  All methods are blocking and threadsafe.
 
 Methods defined here:
+
__init__(self, name, addr='', **kwargs)
+ +
__len__(self)
Number of packets in read buffer.
+ +
recv(self, timeout=None, peek=False)
Identical to DatagramSocket.recv.  The from address is an
+empty string.
+ +
send(self, s, dest)
Send packet with contents s to given destination.
+ +
+Methods inherited from BaseSession:
+
close(self)
Close the session.
+ +

+ + + + + + + +
 
+class SAMTerminal
   Message-by-message communication with SAM through a single
+socket.  _on_* messages are dispatched to msgobj.
 
 Methods defined here:
+
__init__(self, addr, msgobj)
+ +
check(self)
Raise an error if terminal was closed, otherwise do
+nothing.
+ +
check_message(self, kwargs)
Raises an error if kwargs['RESULT'] != 'OK'.
+ +
close(self)
Close the SAM terminal.
+ +
on_message(self, msg, kwargs)
Process a SAM message that was received.  Dispatch to
+_on_MESSAGE_NAME(**kwargs).
+ +
queue_get(self, q)
Identical to q.get() unless a call to check() fails,
+in which case the waiting is cut short with an error.
+ +
send_message(self, msg)
Send a message to the SAM bridge.  A newline will be
+automatically added if none is present.
+ +

+ + + + + + + +
 
+class Stream
   Receives and sends data for an individual stream.
 
 Methods defined here:
+
__del__(self)
+ +
__init__(self, parent, remotedest, id, didconnect=True)
+ +
__len__(self)
Current length of read buffer.
+ +
close(self)
Close the stream.  Threadsafe.
+ +
on_close(self, e)
+ +
on_receive(self, s)
+ +
recv(self, n, timeout=None, peek=False, waitall=False)
Reads up to n bytes in a manner identical to socket.recv.
+Blocks for up to timeout seconds if n > 0 and no data is
+available (timeout=None means wait forever).  If still no data
+is available, raises BlockError or Timeout.  For a closed
+stream, recv will read the data stored in the buffer until
+EOF, at which point the read data will be truncated.  If peek
+is True, the data is not removed.  If waitall is True, reads
+exactly n bytes, or raises BlockError or Timeout as
+appropriate.  Returns data.
+ +
send(self, s)
Sends the string s, blocking if necessary.
+ +

+ + + + + + + +
 
+class StreamSession(BaseSession)
   Stream session.  All methods are blocking and threadsafe.
 
 Methods defined here:
+
__init__(self, name, addr='', **kwargs)
+ +
__len__(self)
Unconnected session; has no read data available.
+ +
accept(self, timeout=None)
Wait for incoming connection, and return a Stream object
+for it.
+ +
connect(self, dest, timeout=None)
Create a stream connected to remote destination 'dest'.  The
+id is random.  If the timeout is exceeded, do NOT raise an
+error; rather, return a Stream object with .didconnect set
+to False.
+ +
listen(self, backlog)
Set maximum number of queued connections.
+ +
+Methods inherited from BaseSession:
+
close(self)
Close the session.
+ +

+ + + + + + + +
 
+class StringBuffer(Deque)
   A FIFO for characters.  Strings can be efficiently
+appended to the end, and read from the beginning.

+Example:
+  B = StringBuffer('Hello W')
+  B.append('orld!')
+  print B.read(5)        # 'Hello'
+  print B.read()         # 'World!'
 
 Methods defined here:
+
__init__(self, s='')
+ +
__len__(self)
+ +
__repr__(self)
+ +
__str__(self)
+ +
append(self, s)
Append string data to the end of the buffer.
+ +
peek(self, n=None)
Like read(), but do not remove the data that is returned.
+ +
prepend(self, s)
Prepend string data to the beginning of the buffer.
+ +
read(self, n=None)
Read n bytes of data (or less if less data available) from the
+beginning of the buffer.  The data is removed.  If n is
+omitted, read the entire buffer.
+ +
+Methods inherited from Deque:
+
pop_first(self)
Pop an item off the beginning of the deque, and return it.
+ +
pop_last(self)
Pop an item off the end of the deque, and return it.
+ +
push_first(self, obj)
Prepend obj to the beginning of the deque.
+ +
push_last(self, obj)
Append obj to the end of the deque.
+ +

+ + + + + +
 
+Data
       sam_log = False
+ \ No newline at end of file diff --git a/apps/sam/python/doc/pydoc/makedoc.py b/apps/sam/python/doc/pydoc/makedoc.py new file mode 100644 index 000000000..73857ae2f --- /dev/null +++ b/apps/sam/python/doc/pydoc/makedoc.py @@ -0,0 +1,36 @@ + +# ------------------------------------------------------------- +# makedoc.py: Make pydoc documentation for Python SAM API +# ------------------------------------------------------------- + +import os, inspect +import pydoc as pydoc_ + +def pydoc(args): + """Run pydoc (command line) with given argument string.""" + filename = inspect.getsourcefile(pydoc_) + os.system('python ' + filename + ' ' + args) + +def move(f1, f2): + """Moves filename f1 to filename f2, overwriting if f2 already exists.""" + try: os.remove(f2) + except: pass + os.rename(f1, f2) + +def makedoc(): + """Make all HTML documentation for Python I2P library.""" + modules = ['i2p', 'i2p.sam', 'i2p.eep', 'i2p.router', 'i2p.samclasses'] + + origdir = os.getcwd() + os.chdir('../..') + + for m in modules: + pydoc('-w ' + m) + + os.chdir(origdir) + + for m in modules: + move('../../' + m + '.html', './' + m + '.html') + +if __name__ == '__main__': + makedoc() \ No newline at end of file diff --git a/apps/sam/python/readme.txt b/apps/sam/python/readme.txt new file mode 100644 index 000000000..d67dffc2e --- /dev/null +++ b/apps/sam/python/readme.txt @@ -0,0 +1,34 @@ + +---------------------------------------- +Python-I2P v0.9 +---------------------------------------- + +Python-I2P is a Python interface to I2P. + +All files in this directory and subdirectories +have been placed in the public domain by +Connelly Barnes. + +---------------------------------------- +Quick Start +---------------------------------------- + +Install: + + python setup.py install + +Use: + + >>> from i2p import sam + >>> s = sam.socket('Alice', sam.SOCK_STREAM) + >>> s.connect('duck.i2p') + >>> s.send('GET / HTTP/1.0\r\n\r\n') + >>> s.recv(1000) + (Response from duck.i2p) + + +---------------------------------------- +Full Start +---------------------------------------- + +See the docs directory for HTML documentation. diff --git a/apps/sam/python/setup.py b/apps/sam/python/setup.py new file mode 100644 index 000000000..abc88897f --- /dev/null +++ b/apps/sam/python/setup.py @@ -0,0 +1,11 @@ + +from distutils.core import setup + +setup(name="Python I2P API", + version="0.9", + description="Python Interface to I2P", + author="Connelly Barnes", + author_email="'Y29ubmVsbHliYXJuZXNAeWFob28uY29t\n'.decode('base64')", + url="http://www.i2p.net/", + packages=['i2p'], + ) diff --git a/apps/sam/python/src/examples/datagram.py b/apps/sam/python/src/examples/datagram.py new file mode 100644 index 000000000..276d0f264 --- /dev/null +++ b/apps/sam/python/src/examples/datagram.py @@ -0,0 +1,15 @@ + +# ----------------------------------------------- +# datagram.py: Datagram client +# ----------------------------------------------- + +from i2p import sam + +dest = sam.resolve('yourserver.i2p') +S = sam.socket('Bob', sam.SOCK_DGRAM) +S.sendto('Hello packet', 0, dest) + +# Get packet up to 1000 bytes -- the rest is discarded. +(data, dest) = S.recvfrom(1000) + +print data diff --git a/apps/sam/python/src/examples/datagram_noblock.py b/apps/sam/python/src/examples/datagram_noblock.py new file mode 100644 index 000000000..2cbc742e0 --- /dev/null +++ b/apps/sam/python/src/examples/datagram_noblock.py @@ -0,0 +1,20 @@ + +# --------------------------------------------------- +# datagram_noblock.py: Non-blocking datagram server +# --------------------------------------------------- + +from i2p import sam +import time + +S = sam.socket('Eve', sam.SOCK_DGRAM) +print 'Serving at:', S.dest +S.setblocking(False) + +while True: + try: + (data, dest) = S.recvfrom(1000) # Read packet + print 'Got', data, 'from', dest + S.sendto('Hi client!', 0, dest) + except sam.BlockError: # No data available + pass + time.sleep(0.01) # Reduce CPU usage \ No newline at end of file diff --git a/apps/sam/python/src/examples/datagram_server.py b/apps/sam/python/src/examples/datagram_server.py new file mode 100644 index 000000000..24a8a985d --- /dev/null +++ b/apps/sam/python/src/examples/datagram_server.py @@ -0,0 +1,14 @@ + +# ----------------------------------------------- +# datagram_server.py: Datagram server +# ----------------------------------------------- + +from i2p import sam + +S = sam.socket('Eve', sam.SOCK_DGRAM) +print 'Serving at:', S.dest + +while True: + (data, dest) = S.recvfrom(1000) # Read packet + print 'Got', data, 'from', dest + S.sendto('Hi client!', 0, dest) diff --git a/apps/sam/python/src/examples/dos.py b/apps/sam/python/src/examples/dos.py new file mode 100644 index 000000000..0868a0a54 --- /dev/null +++ b/apps/sam/python/src/examples/dos.py @@ -0,0 +1,33 @@ + +# ----------------------------------------------- +# dos.py: Noneffective denial of service tool +# ----------------------------------------------- + +from i2p import sam +import threading, sys + +def dos_stream(dest): + """Perform a DOS attack on a stream server.""" + dest = sam.resolve(dest) + + # DOS code, runs in n separate threads. + def f(): + while True: + S = sam.socket(dest, sam.SOCK_STREAM) + S.connect(dest) + S.send('GET / HTTP/1.0\r\n\r\n') + S.close() + + # Start up the threads. + for i in range(128): + T = threading.Thread(target=f) + T.start() + +def syntax(): + print "Usage: python dos.py Destination" + +if __name__ == '__main__': + if len(sys.argv) == 2: + dos_stream(sys.argv[1]) + else: + syntax() diff --git a/apps/sam/python/src/examples/examples.txt b/apps/sam/python/src/examples/examples.txt new file mode 100644 index 000000000..0a1f37809 --- /dev/null +++ b/apps/sam/python/src/examples/examples.txt @@ -0,0 +1,14 @@ + +Examples: + +datagram.py - Datagram client +datagram_noblock.py - Non-blocking datagram server +datagram_server.py - Blocking datagram server +dos.py - Denial of service tool +raw.py - Raw client +raw_noblock.py - Non-blocking raw server +raw_server.py - Raw server +stream.py - Stream client +stream_eepget.py - Get an eepsite using sockets +stream_noblock.py - Non-blocking stream server +stream_server.py - Blocking stream server diff --git a/apps/sam/python/src/examples/raw.py b/apps/sam/python/src/examples/raw.py new file mode 100644 index 000000000..9a0af60c3 --- /dev/null +++ b/apps/sam/python/src/examples/raw.py @@ -0,0 +1,10 @@ + +# ----------------------------------------------- +# raw.py: Raw client +# ----------------------------------------------- + +from i2p import sam + +dest = sam.resolve('yourserver.i2p') # Send to dest +S = sam.socket('Carol', sam.SOCK_RAW) +S.sendto('Hello packet', 0, dest) diff --git a/apps/sam/python/src/examples/raw_noblock.py b/apps/sam/python/src/examples/raw_noblock.py new file mode 100644 index 000000000..777ef9a9c --- /dev/null +++ b/apps/sam/python/src/examples/raw_noblock.py @@ -0,0 +1,19 @@ + +# --------------------------------------------------- +# raw_noblock.py: Non-blocking raw server +# --------------------------------------------------- + +from i2p import sam +import time + +S = sam.socket('Eve', sam.SOCK_RAW) +print 'Serving at:', S.dest +S.setblocking(False) + +while True: + try: + data = S.recv(1000) # Read packet + print 'Got', data + except sam.BlockError: # No data available + pass + time.sleep(0.01) # Reduce CPU usage \ No newline at end of file diff --git a/apps/sam/python/src/examples/raw_server.py b/apps/sam/python/src/examples/raw_server.py new file mode 100644 index 000000000..a337b1487 --- /dev/null +++ b/apps/sam/python/src/examples/raw_server.py @@ -0,0 +1,13 @@ + +# ----------------------------------------------- +# raw_server.py: Raw server +# ----------------------------------------------- + +from i2p import sam + +S = sam.socket('Eve', sam.SOCK_RAW) +print 'Serving at:', S.dest + +while True: + data = S.recv(1000) # Read packet + print 'Got', data diff --git a/apps/sam/python/src/examples/stream.py b/apps/sam/python/src/examples/stream.py new file mode 100644 index 000000000..b219a1df4 --- /dev/null +++ b/apps/sam/python/src/examples/stream.py @@ -0,0 +1,11 @@ + +# ----------------------------------------------- +# stream.py: Simple stream client +# ----------------------------------------------- + +from i2p import sam + +S = sam.socket('Alice', sam.SOCK_STREAM) +S.connect('duck.i2p') +S.send('GET / HTTP/1.0\r\n\r\n') # Send request +print S.recv(1000) # Read up to 1000 bytes diff --git a/apps/sam/python/src/examples/stream_eepget.py b/apps/sam/python/src/examples/stream_eepget.py new file mode 100644 index 000000000..366f3bb56 --- /dev/null +++ b/apps/sam/python/src/examples/stream_eepget.py @@ -0,0 +1,17 @@ + +# ----------------------------------------------- +# stream_eepget.py: Get an eepsite using sockets +# ----------------------------------------------- + +from i2p import sam + +S = sam.socket('Alice', sam.SOCK_STREAM) +S.connect('duck.i2p') +S.send('GET / HTTP/1.0\r\n\r\n') # Send request +f = S.makefile() # File object + +while True: # Read header + line = f.readline().strip() # Read a line + if line == '': break # Content begins + +print f.read() # Read file object diff --git a/apps/sam/python/src/examples/stream_noblock.py b/apps/sam/python/src/examples/stream_noblock.py new file mode 100644 index 000000000..762a94632 --- /dev/null +++ b/apps/sam/python/src/examples/stream_noblock.py @@ -0,0 +1,39 @@ + +# ----------------------------------------------- +# stream_noblock.py: Non-blocking stream server +# ----------------------------------------------- + +import i2p +from i2p import sam +import thread, time + +S = sam.socket('Dave', sam.SOCK_STREAM) +S.listen(10) # Queue up to 10 connections +S.setblocking(False) # Non-blocking +print 'Serving at:', S.dest + +def handle_connection(C): + """Handle a single connection in a thread of its own.""" + try: + f = C.makefile() # File object + req = f.readline() # Read HTTP request + + s = '

Hello!

' # String to send back + + f.write('HTTP/1.0 200 OK\r\nContent-Type: text/html' + + '\r\nContent-Length: ' + str(int(len(s))) + '\r\n\r\n' + s) + + f.close() # Close file + C.close() # Close connection + except sam.Error, e: + # Recover from SAM errors + print 'Warning:', str(e) + +while True: + try: + (C, remotedest) = S.accept() # Get a connection + thread.start_new_thread(handle_connection, (C,)) + except sam.BlockError: + # Ignore 'command would have blocked' errors + pass + time.sleep(0.01) # Reduce CPU usage \ No newline at end of file diff --git a/apps/sam/python/src/examples/stream_server.py b/apps/sam/python/src/examples/stream_server.py new file mode 100644 index 000000000..a76e518d1 --- /dev/null +++ b/apps/sam/python/src/examples/stream_server.py @@ -0,0 +1,28 @@ + +# ----------------------------------------------- +# stream_server.py: Simple stream server +# ----------------------------------------------- + +import i2p +from i2p import sam + +S = sam.socket('Dave', sam.SOCK_STREAM) +S.listen(10) # Queue up to 10 connections +print 'Serving at:', S.dest + +while True: + try: + (C, remotedest) = S.accept() # Get a connection + f = C.makefile() # File object + req = f.readline() # Read HTTP request + + s = '

Hello!

' # String to send back + + f.write('HTTP/1.0 200 OK\r\nContent-Type: text/html' + + '\r\nContent-Length: ' + str(int(len(s))) + '\r\n\r\n' + s) + + f.close() # Close file + C.close() # Close connection + except sam.Error, e: + # Recover from SAM errors + print 'Warning:', str(e) diff --git a/apps/sam/python/src/i2p/__init__.py b/apps/sam/python/src/i2p/__init__.py new file mode 100644 index 000000000..14d6d9f14 --- /dev/null +++ b/apps/sam/python/src/i2p/__init__.py @@ -0,0 +1,20 @@ + +""" +i2p -- I2P Python interface +""" + +__all__ = ['Error', 'RouterError', 'sam', 'eep', 'router'] + +class Error(Exception): + """Base class for all I2P errors.""" + +class RouterError(Error): + """Could not connect to router.""" + +import sam +import eep +import router + +# Internal use only +#import samclasses as _samclasses + diff --git a/apps/sam/python/src/i2p/eep.py b/apps/sam/python/src/i2p/eep.py new file mode 100644 index 000000000..08b410c2c --- /dev/null +++ b/apps/sam/python/src/i2p/eep.py @@ -0,0 +1,47 @@ + +# ------------------------------------------------------------- +# eep.py: I2P Project -- Eeproxy Python API +# ------------------------------------------------------------- + +""" +Eeproxy Python API +""" + +import urllib2 + +eepaddr = '127.0.0.1:4444' # Default port for eeproxy + +# -------------------------------------------------- +# Functions +# -------------------------------------------------- + +def urlopen(url, eepaddr=eepaddr): + """Like urllib2.urlopen(url), but only works for eep-sites. + Example: f = urlopen('http://duck.i2p/index.html')""" + if url.find('http://') != 0: url = 'http://' + url + + # Handle I2P Destination + if len(url) >= 256: + suffix = url[len('http://'):] + if suffix[:4] != 'i2p/': url = 'http://i2p/' + suffix + + # Add trailing slash + if url.find('/', len('http://')) < 0: url = url + '/' + + # Remove http:// and trailing slash from eepaddr. + if eepaddr.find('http://') == 0: eepaddr = eepaddr[len('http://'):] + eepaddr = eepaddr.rstrip('/') + + proxy = urllib2.ProxyHandler( \ + {'http': 'http://anonymous:passwd@' + eepaddr}) + opener = urllib2.build_opener(proxy, \ + urllib2.HTTPBasicAuthHandler(), urllib2.HTTPHandler) + return opener.open(url) + +def urlget(url, eepaddr=eepaddr): + """Get contents of an eepsite. + Example: urlget('http://duck.i2p/').""" + f = urlopen(url, eepaddr=eepaddr) + ans = f.read() + f.close() + return ans diff --git a/apps/sam/python/src/i2p/router.py b/apps/sam/python/src/i2p/router.py new file mode 100644 index 000000000..b625b0ace --- /dev/null +++ b/apps/sam/python/src/i2p/router.py @@ -0,0 +1,188 @@ + +# ------------------------------------------------------------- +# router.py: I2P Project -- Router Control API for Python +# ------------------------------------------------------------- + +""" +Router Control API for Python +""" + +import i2p +import i2p.sam +import i2p.eep + +import socket as pysocket +import os, sys +import os.path +import time +import threading +import urllib2 + +check_addrlist = [i2p.sam.samaddr, i2p.eep.eepaddr] + +router_config = 'router.config' # Router config filename + +# True if our Python program started the router +our_router = False +our_router_lock = threading.Lock() + + +def find(dir=None): + """Find the absolute path to a locally installed I2P router. + + An I2P installation is located by looking in the + environment I2P, then in PATH, then in the dir argument + given to the function. It looks for startRouter.sh or + startRouter.bat. Raises ValueError if an I2P installation + could not be located. + """ + if sys.platform[:3] == 'win': + sep = ';' + else: + sep = ':' + L = [] + if 'PATH' in os.environ: L += os.environ['PATH'].split(sep) + if 'I2P' in os.environ: L += os.environ['I2P'].split(sep) + if dir != None and dir != '': L += dir.split(sep) + for dirname in L: + filename = os.path.join(dirname, 'startRouter.bat') + if os.path.exists(filename): + return dirname + filename = os.path.join(dirname, 'startRouter.sh') + if os.path.exists(filename): + return dirname + raise ValueError('I2P installation not found') + + +def check(dir=None): + """Checks whether a locally installed router is running. Does + nothing if successful, otherwise raises i2p.RouterError. + + An I2P installation is located by using find(dir). + The router.config file is parsed for 'router.adminPort'. + This port is queried to determine whether the router is + running. + """ + config = _parse_config(os.path.join(find(dir), router_config)) + port = config.get('router.adminPort', '') + try: + port = int(port) + except: + raise ValueError('router.adminPort missing or bad in ' + + router_config) + + try: + s = pysocket.socket(pysocket.AF_INET, pysocket.SOCK_STREAM) + s.connect(('127.0.0.1', port)) + s.close() + except pysocket.error: + raise i2p.RouterError('could not contact 127.0.0.1:' + str(port)) + +def _run_program(filename): + """Runs the given program in a new process and new terminal.""" + if sys.platform[:3] == 'win': + os.startfile(filename) + global our_router + our_router = True + else: + # Linux possibilities: + # sh -c command + # xterm -e command + # bash -c command + # Try os.spawnl() with the above. + raise ValueError('unimplemented') + +def start(dir=None, hidden=False): + """Start a locally installed I2P router. Does nothing if + the router has already been started. + + An I2P installation is located by using find(dir). + + If hidden is True, do not show a terminal for the router. + """ + routerdir = find(dir) + router = os.path.join(routerdir, 'startRouter.bat') + try: + check(dir) + return # Already running + except: + pass # Not yet running + + olddir = os.getcwd() + + if hidden: + raise ValueError('unimplemented') + + our_router_lock.acquire() + try: + os.chdir(routerdir) + try: + _run_program(router) + finally: + os.chdir(olddir) + finally: + our_router_lock.release() + + # Ideas for hidden=True: + # Parse startRouter.bat, and run same command with javaw + # on Windows to hide command box. + # Perhaps use javaw (?) or javaws (j2sdk1.4.2/jre/javaws/javaws) + # Perhaps /path-to/program 2>/dev/null 1>/dev/null& + +def _parse_config(filename): + """Return a dict with (name, value) items for the given I2P configuration file.""" + f = open(filename, 'r') + s = f.read() + f.close() + ans = {} + for line in s.split('\n'): + line = line.strip() + if '#' in line: line = line[:line.find('#')] + pair = line.split('=') + if len(pair) == 2: + ans[pair[0].strip()] = pair[1].strip() + return ans + +def stop(dir=None, force=False): + """Stop a locally installed I2P router, if it was started by + the current Python program. If force is True, stop the + router even if it was started by another process. Do nothing + if force is False and the router was started by another program. + + The file 'router.config' is located using the same search + process used for find(dir). It is parsed for + 'router.shutdownPassword' and 'router.adminPort'. The + router is shut down through the admin port. + + Raises i2p.RouterError if the I2P router is running but cannot + be stopped. You must uncomment the + 'router.shutdownPassword' line for this command to work. + """ + if force == False and our_router == False: + return + + config = _parse_config(os.path.join(find(dir), router_config)) + + password = config.get('router.shutdownPassword', '') + if password == '': + raise ValueError('router.shutdownPassword not found in ' + + router_config) + admin_port = config.get('router.adminPort', '') + if admin_port == '': + raise ValueError('router.adminPort not found in ' + router_config) + + try: + admin_port = int(admin_port) + except: + raise ValueError('invalid router.adminPort in ' + router_config) + + try: + sock = pysocket.socket(pysocket.AF_INET, pysocket.SOCK_STREAM) + sock.connect(('127.0.0.1', admin_port)) + sock.send('GET /shutdown?password=' + password + ' HTTP/1.0\r\n\r\n') + time.sleep(0.01) + sock.close() + except: + raise i2p.RouterError('router shutdown failed') + + # Assume shutdown succeeded (it will take 30 seconds). diff --git a/apps/sam/python/src/i2p/sam.py b/apps/sam/python/src/i2p/sam.py new file mode 100644 index 000000000..f9e8fb15b --- /dev/null +++ b/apps/sam/python/src/i2p/sam.py @@ -0,0 +1,706 @@ + +# ------------------------------------------------------------- +# sam.py: I2P Project -- SAM Python API +# ------------------------------------------------------------- + +""" +SAM Python API +""" + +import i2p + +import samclasses, threading, time, copy, Queue, thread +import socket as pysocket +import select as pyselect + +# -------------------------------------------------- +# Global variables +# -------------------------------------------------- + +# Ports +samaddr = '127.0.0.1:7656' # Default port for SAM Bridge + +# Flags for recv, recvfrom. +MSG_PEEK = 2 # Peek at incoming message +MSG_WAITALL = 64 # Wait for data or error +MSG_DONTWAIT = 128 # Nonblocking + +# Packet sizes +MAX_DGRAM = 31744 # Max size of datagram packet +MAX_RAW = 32768 # Max size of raw packet + +# Socket types +SOCK_STREAM = 1 # Stream socket +SOCK_DGRAM = 2 # Datagram socket +SOCK_RAW = 3 # Raw socket + +# Miscellaneous +samver = 1.0 # SAM version implemented + +# -------------------------------------------------- +# Errors +# -------------------------------------------------- + +class Error(i2p.Error): + """Base class for all SAM errors.""" + +class NetworkError(Error): + """Network error occurred within I2P. + The error object is a 2-tuple: (errtag, errdesc). + errtag is a SAM error string, + errdesc is a human readable error description. + """ + +class ClosedError(Error): + """A command was used on a socket that closed gracefully.""" + +class BlockError(Error): + """Socket call would have blocked.""" + +class Timeout(Error): + """Time out occurred for a socket which had timeouts enabled + via a prior call to settimeout().""" + +# -------------------------------------------------- +# Sockets +# -------------------------------------------------- + +# Note: socket(), __make_session() and Socket() should have same args +def socket(session, type, samaddr=samaddr, **kwargs): + r"""Create a new socket. Argument session should be a session + name -- if the name has not yet been used, an I2P + Destination will be created for it, otherwise, the + existing Destination will be re-used. An empty session + string causes a transient session to be created. Argument + type is one of SOCK_STREAM, SOCK_DGRAM, or SOCK_RAW. + + I2P configuration keyword arguments: + + * in_depth - depth of incoming tunnel (default 2) + * out_depth - depth of outgoing tunnel (default 2) + + A single session may be shared by more than one socket, if + the sockets are the same type, and if the sockets are + created within the same Python process. The socket + objects are multithread-safe. + + Examples: + a = i2p.socket('Alice', i2p.SOCK_STREAM) + b = i2p.socket('Bob', i2p.SOCK_DGRAM, + in_depth=2, out_depth=5) + + The created object behaves identically to a socket from + module socket, with the following exceptions: + + * I2P Destinations are used as address arguments [1]. + * bind is a no-op: sockets are always bound. + * send* methods send all data and are non-blocking. + + A given session name can only be open in a single Python + program at a time. If you need to overcome this + limitation, consider patching I2P. + + [1]. Alternatively, a host name can be used as an address. + It will be resolved using hosts.txt. + + For details on how to use socket objects, see + http://www.python.org/doc/current/lib/socket-objects.html + + See the examples directory for code examples. + """ + + return Socket(session, type, samaddr, **kwargs) + + +# -------------------------------------------------- +# Socket session objects +# -------------------------------------------------- + +# Global list of session objects. +_sessions = {} +_session_lock = threading.Lock() + +def _make_session(session, type, samaddr, **kwargs): + """Make a session object (eg samclasses.StreamSession). Same + arguments as socket(). Return an existing session object + if one has been previously created under the given name. + """ + # Synchronize + _session_lock.acquire() + try: + if type == SOCK_STREAM: C = samclasses.StreamSession + elif type == SOCK_DGRAM: C = samclasses.DatagramSession + elif type == SOCK_RAW: C = samclasses.RawSession + else: raise ValueError('bad socket type') + # Get existing session, if available + if session != '' and _sessions.has_key(session): + if _sessions[session].__class__ != C: + raise ValueError('session ' + repr(session) + ' was ' + + 'created with a different session type.') + return _sessions[session] + # Create new session + if type == SOCK_STREAM: ans = C(session, samaddr, **kwargs) + elif type == SOCK_DGRAM: ans = C(session, samaddr, **kwargs) + elif type == SOCK_RAW: ans = C(session, samaddr, **kwargs) + if session != '': _sessions[session] = ans + return ans + finally: _session_lock.release() + +def _wrap_stream(stream, parent_socket): + """Wraps a Socket object around a samclasses.Stream object.""" + s = Socket('', 0, dummy_socket=True) + s.sessobj = stream + s.remotedest = stream.remotedest + s.dest = parent_socket.dest + s.session = parent_socket.session + s.type = parent_socket.type + s.timeout = None + s.samaddr = parent_socket.samaddr + s.closed = False + return s + +# -------------------------------------------------- +# Socket class +# -------------------------------------------------- + +class Socket: + """A socket object.""" + + # Docstrings for pydoc. These variables will be overwritten. + dest = property(doc='Local I2P Destination of socket') + session = property(doc='Session name') + type = property(doc='Socket type: SOCK_STREAM, SOCK_DGRAM,' + + ' or SOCK_RAW.') + # FIXME: Use getsockopt to detect errors. + + def __init__(self, session, type, samaddr=samaddr, **kwargs): + """Equivalent to socket().""" + if kwargs.has_key('dummy_socket'): return + self.sessobj = _make_session(session, type, samaddr, **kwargs) + self.dest = self.sessobj.dest + self.session = session + self.type = type + self.timeout = None # None indicates blocking mode + self.samaddr = samaddr + self.closed = False # Was current object closed? + self.lock = threading.Lock() + + def _verify_open(self): + """Verify that the socket has not been closed.""" + if self.closed == True: + raise ClosedError('socket closed') + + def _verify_stream(self): + """Raise an error if socket is not a SOCK_STREAM.""" + if self.type != SOCK_STREAM: + raise i2p.Error('operation not supported') + # FIXME: Check for errors also. + + def _verify_connected(self, needs_to_be_connected=True): + """Raise an error if socket is not a connected stream socket.""" + self._verify_stream() + if not hasattr(self.sessobj, 'remotedest'): + raise i2p.Error('socket is not connected') + if needs_to_be_connected and not self.sessobj.didconnect: + raise i2p.Error('socket is in the process of connecting') + # FIXME: Check for errors also. + + def _verify_not_connected(self): + """Verify that the socket is not currently connected, and is not + in the process of connecting.""" + self._verify_stream() + if hasattr(self.sessobj, 'remotedest'): + raise i2p.Error('socket is already connected') + # FIXME: Check for errors also. + + def accept(self): + self._verify_open() + self._verify_not_connected() + # Raises BlockError or Timeout if not ready. + C = _wrap_stream(self.sessobj.accept(self.timeout), self) + return (C, C.remotedest) + + def bind(self, address): + self._verify_open() + self._verify_not_connected() + + def close(self): + try: + self._verify_connected() + connected = True + except i2p.Error: + connected = False + if connected: + # Close the Stream object. + self.sessobj.close() + else: + # Never close a session object. + pass + self.closed = True + + def connect(self, address): + # Synchronized. Lock prevents two connects from occurring at the + # same time in different threads. + self.lock.acquire() + try: + self._verify_open() + if self.type == SOCK_DGRAM or self.type == SOCK_RAW: + self.packet_dest = address + return + + self._verify_not_connected() + address = resolve(address, self.samaddr) + + timeout = self.timeout + unwrap = self.sessobj.connect(address, timeout=timeout) + w = _wrap_stream(unwrap, self) + self.sessobj = w.sessobj + self.remotedest = w.remotedest + + if self.sessobj.err != None: + raise self.sessobj.err + + # Raise error if not yet connected + if not self.sessobj.didconnect: + if timeout == 0.0: + raise BlockError('command would have blocked. use ' + + 'select() to find when socket is connected') + else: raise Timeout('timed out. use select() to find ' + + 'when socket is connected') + + finally: self.lock.release() + + def connect_ex(self, address): + try: self.connect(address) + except i2p.Error, e: return e + + def getpeername(self): + self._verify_connected() + return self.remotedest + + def getsockname(self): + return self.dest + + def listen(self, backlog): + self._verify_open() + self._verify_not_connected() + self.sessobj.listen(backlog) + + def makefile(self, mode='r', bufsize=-1): + self._verify_open() + self._verify_connected() + return pysocket._fileobject(self, mode, bufsize) + + def recv(self, bufsize, flags=0): + # FIXME: What about recv'ing if connected in asynchronous mode? + # It is acceptable to call recv() after a stream has closed + # gracefully. It is an error to call recv() after a stream has + # closed due to an I2P network error. + timeout = self.timeout + (peek, waitall, dontwait) = \ + (flags & MSG_PEEK, flags & MSG_WAITALL, flags & MSG_DONTWAIT) + if dontwait: timeout = 0.0 + + if self.type == SOCK_STREAM: + self._verify_connected() + return self.sessobj.recv(bufsize, timeout, peek, waitall) + else: + return self.recvfrom(bufsize, flags)[0] + + def recvfrom(self, bufsize, flags=0): + """For a datagram or raw socket, bufsize = -1 indicates that the + entire packet should be retrieved.""" + timeout = self.timeout + (peek, waitall, dontwait) = \ + (flags & MSG_PEEK, flags & MSG_WAITALL, flags & MSG_DONTWAIT) + if dontwait: timeout = 0.0 + + if self.type == SOCK_STREAM: + self._verify_connected() + if bufsize < 0: raise ValueError('bufsize must be >= 0') + return (self.sessobj.recv(bufsize, timeout, peek, waitall), \ + self.remotedest) + else: + return self.sessobj.recv(timeout, peek)[:bufsize] + + def send(self, string, flags=0): + self._verify_open() + if self.type == SOCK_DGRAM or self.type == SOCK_RAW: + if not hasattr(self, 'packet_dest'): + raise i2p.Error('use connect or sendto to specify a ' + + 'Destination') + self.sendto(string, flags, self.packet_dest) + return + + self._verify_connected() + if self.closed: + raise i2p.Error('send operation on closed socket') + # FIXME: What about send'ing if connected in asynchronous mode? + self.sessobj.send(string) + + def sendall(self, string, flags=0): + self.send(string) + + def sendto(self, string, flags, address): + self._verify_open() + if not self.type in [SOCK_DGRAM, SOCK_RAW]: + raise i2p.Error('operation not supported') + if self.closed: + raise i2p.Error('sendto operation on closed socket') + address = resolve(address, self.samaddr) + self.sessobj.send(string, address) + + def setblocking(self, flag): + if flag: self.timeout = None + else: self.timeout = 0.0 + + def settimeout(self, value): + self.timeout = value + + def gettimeout(self): + return self.timeout + + def __deepcopy__(self, memo): + return copy.copy(self) + +# -------------------------------------------------- +# Poll and select +# -------------------------------------------------- + +POLLIN = 1 # There is data to read +POLLPRI = 1 # Same as POLLIN +POLLOUT = 4 # Ready for output +POLLERR = 8 # Wait for error condition +POLLHUP = 16 # Not implemented +POLLNVAL = 32 # Not implemented + +class Poll: + """Class implementing poll interface. Works for Python sockets + and SAM sockets.""" + def __init__(self): + self.fds = {} # Maps _hash() -> (socket, mask) + def _hash(self, fd): + if isinstance(fd, int): + return fd # Use the fd itself if integer. + else: + return id(fd) # Use object address (no copies!) + def register(self, fd, eventmask=POLLIN|POLLOUT|POLLERR): + self.fds[self._hash(fd)] = (fd, eventmask) + def unregister(self, fd): + del self.fds[self._hash(fd)] + def poll(self, timeout=None): + readlist, writelist, errlist = [], [], [] + for F, mask in self.fds: + if mask & POLLIN: readlist += [F] + if mask & POLLOUT: writelist += [F] + if mask & POLLERR: errlist += [F] + (Rs, Ws, Es) = select(readlist, writelist, errlist, + timeout=timeout) + ans = [] + for R in Rs: ans.append((R, POLLIN)) + for W in Ws: ans.append((W, POLLOUT)) + for E in Es: ans.append((E, POLLERR)) + return ans + +def poll(): + """Returns a polling object. Works on SAM sockets and Python + sockets. See select.poll() in the Python library for more + information.""" + return Poll() + +def select(readlist, writelist, errlist, timeout=None): + """Performs a select call. Works on SAM sockets and Python + sockets. See select.select() in the Python library for more + information.""" + Rans = [] + Wans = [] + Eans = [] + if timeout != None: end = time.clock() + timeout + while True: + # FIXME: Check performance. + # Use pysocket.poll for Python sockets, if needed for speed. + + # Check for read availability. + for R in readlist: + if isinstance(R, int) or hasattr(R, 'fileno'): + # Python socket + if len(pyselect.select([R], [], [], 0.0)[0]) > 0: + Rans.append(R) + else: + # SAM Socket + if R.type == SOCK_STREAM: + try: + R._verify_connected() + Rans.append(R) + except: + pass + else: + if len(R) > 0: Rans.append(R) + + # Check for write availability. + for W in writelist: + if isinstance(W, int) or hasattr(W, 'fileno'): + # Python socket + if len(pyselect.select([], [W], [], 0.0)[1]) > 0: + Wans.append(W) + else: + # SAM Socket + if W.type == SOCK_STREAM: + try: + W._verify_connected() + Wans.append(W) + except: + pass + else: + Wans.append(W) + + # Check for error conditions. + # These can only be stream errors. + for E in errlist: + if isinstance(E, int) or hasattr(E, 'fileno'): + # Python socket + if len(pyselect.select([], [], [E], 0.0)[2]) > 0: + Eans.append(E) + else: + if E.type == SOCK_STREAM: + try: + # FIXME: Use a ._get_error() function for errors. + # Socket can only have an error if it connected. + E._verify_connected() + if E.sessobj.err != None: + Eans.append(E) + except: + pass + if timeout != None and time.clock() >= end: break + if len(Rans) != 0 or len(Wans) != 0 or len(Eans) != 0: break + + samclasses.sleep() + + return (Rans, Wans, Eans) + +def resolve(host, samaddr=samaddr): + """Resolve I2P host name --> I2P Destination. + Returns the same string if host is already a Destination.""" + if host.find('http://') == 0: host = host[len('http://'):] + host = host.rstrip('/') + if len(host) >= 256: return host + S = samclasses.BaseSession(samaddr) + ans = S._namelookup(host) + S.close() + return ans + +def _exchange_data(A, B): + """Exchanges data A <-> B between open stream sockets A and B.""" + # FIXME: There's recv errors that we should be shutting + # down sockets for, but this seems to work OK. + err = None + try: + # Send data from A -> B while available. + while True: + # A -> B. + A.setblocking(False) + try: s = A.recv(1024) + except Exception, e: s = None + if s == '': raise ClosedError + if s == None: + # Stop sending A -> B. + break + else: + B.setblocking(True) + B.sendall(s) + except Exception, e: + err = e + + try: + # Send data from B -> A while available. + while True: + # B -> A. + B.setblocking(False) + try: s = B.recv(1024) + except Exception, e: s = None + if s == '': raise ClosedError + if s == None: + # Stop sending B -> A. + break + else: + A.setblocking(True) + A.sendall(s) + except Exception, e: + err = e + + # Re-raise error after finishing communications both ways. + if err != None: raise err + +def _test_connected(B): + """Raises an error if socket B is not yet connected.""" + [Rlist, Wlist, Elist] = select([B], [B], [B], 0.0) + if len(Wlist) == 0: + raise ValueError('socket not yet connected') + +class Tunnel: + def __init__(self, receive, make_send, nconnect=-1, timeout=60.0): + """A Tunnel relays connections from a 'receive' socket to one + or more 'send' sockets. The receive socket must be bound + and listening. For each incoming connection, a new send + socket is created by calling make_send(). Data is then + exchanged between the created streams until one socket is + closed. nconnect is the maximum number of simultaneous + connections (-1 for infinite), and timeout is the time that + a single connection can last for (None allows a connection + to last forever). + + Sockets must accept stream traffic and support the Python + socket interface. A separate daemonic thread is created to + manage the tunnel. For high performance, make_send() should + make a socket and connect in non-blocking mode (you should + catch and discard the sam.BlockError or socket.error due to + executing connect on a non-blocking socket). + + Security Note: + A firewall is needed to maintain the end user's anonymity. + An attacker could keep a tunnel socket open by pinging it + regularly. The accepted sockets from 'receive' must prevent + this by closing down eventually. + + Socket errors do not cause the Tunnel to shut down. + """ + + self.receive = receive + self.make_send = make_send + self.receive.setblocking(False) + self.alive = True + self.nconnect = nconnect + self.timeout = timeout + T = threading.Thread(target=self._run, args=()) + T.setDaemon(True) + T.start() + + def _run(self): + """Manage the tunnel in a separate thread.""" + tunnels = [] + + while True: + # Look for a new connection + if self.nconnect < 0 or len(tunnels) < self.nconnect: + (A, B) = (None, None) + try: + (A, ignoredest) = self.receive.accept() + A.setblocking(False) + B = self.make_send() + B.setblocking(False) + if self.timeout != None: t = time.time() + self.timeout + else: t = None + tunnels.append((A, B, t)) + except Exception, e: + try: + if A != None: + A.setblocking(False); A.close() + except Exception, e: pass + try: + if B != None: + B.setblocking(False); B.close() + except Exception, e: pass + + # Send data between existing connections + new_tunnels = [] + for (A, B, t) in tunnels: + # For each connection pair, send data. + try: + if t != None: assert time.time() <= t + # Test whether B is successfully connected + _test_connected(B) + + # Send A <-> B. + _exchange_data(A, B) + + if self.timeout != None: t = time.time() + self.timeout + else: t = None + new_tunnels.append((A, B, t)) + except Exception, e: + # Catch errors. Kill the connection if it's been at + # least timeout seconds since last non-erroneous call + # to _exchange_data, or if stream was closed. This + # allows stream-not-finished-connecting errors to be + # dropped within the timeout. + time_ok = True + if self.timeout != None: + if time.time() > t: time_ok = False + if time_ok and not isinstance(e, ClosedError): + # Don't kill connection yet + new_tunnels.append((A, B, t)) + else: + # We've only gotten errors for 'timeout' s. + # Drop the connection. + try: A.setblocking(False); A.close() + except Exception, e: pass + try: B.setblocking(False); B.close() + except Exception, e: pass + tunnels = new_tunnels + time.sleep(0.01) + + # Shut down all connections if self.close() was called. + if not self.alive: + for (A, B, t) in tunnels: + try: A.setblocking(False); A.close() + except: pass + try: B.setblocking(False); B.close() + except: pass + break + + def close(self): + """Close all connections made for this tunnel.""" + self.alive = False + +class TunnelServer(Tunnel): + dest = property(doc='I2P Destination of server.') + session = property(doc='Session name for server.') + def __init__(self, session, port, samaddr=samaddr, nconnect=-1, + timeout=None, **kwargs): + """Tunnels incoming SAM streams --> localhost:port. + + nconnect and timeout are the maximum number of connections + and maximum time per connection. All other arguments are + passed to sam.socket(). This call blocks until the tunnel + is ready.""" + S = socket(session, SOCK_STREAM, samaddr, **kwargs) + S.listen(64) + self.session = session + self.dest = S.dest + def make_send(): + C = pysocket.socket(pysocket.AF_INET, pysocket.SOCK_STREAM) + C.setblocking(False) + try: C.connect(('127.0.0.1', port)) + except: pass # Ignore 'would have blocked' error + return C + Tunnel.__init__(self, S, make_send, nconnect, timeout) + +class TunnelClient(Tunnel): + remotedest = property(doc='Remote Destination.') + dest = property('Local Destination used for routing.') + session = property('Session name for local Destination.') + def __init__(self, session, port, dest, samaddr=samaddr, + nconnect=-1, timeout=None, **kwargs): + """Tunnels localhost:port --> I2P Destination dest. + + A session named 'session' is created locally, for purposes + of routing to 'dest'. nconnect and timeout are the maximum + number of connections and maximum time per connection. All + other arguments are passed to sam.socket(). This call blocks + until the tunnel is ready.""" + S = pysocket.socket(pysocket.AF_INET, pysocket.SOCK_STREAM) + S.bind(('', port)) + S.listen(4) + obj = socket(session, SOCK_STREAM, samaddr, **kwargs) + self.session = session + self.dest = obj.dest + def make_send(): + C = socket(session, SOCK_STREAM, samaddr, **kwargs) + C.setblocking(False) + try: C.connect(dest) + except: pass # Ignore 'would have blocked' error + return C + Tunnel.__init__(self, S, make_send, nconnect, timeout) + +# -------------------------------------------------- +# End of File +# -------------------------------------------------- diff --git a/apps/sam/python/src/i2p/samclasses.py b/apps/sam/python/src/i2p/samclasses.py new file mode 100644 index 000000000..db402b8f8 --- /dev/null +++ b/apps/sam/python/src/i2p/samclasses.py @@ -0,0 +1,769 @@ + +# ------------------------------------------------------------- +# samclasses.py: Lower-level SAM API, interfaces with SAM Bridge. +# ------------------------------------------------------------- + +""" +Lower-level SAM API, interfaces with SAM Bridge. + +For internal use only. + +Use the higher level i2p.sam module for your own programs. + +For details on SAM, see "Simple Anonymous Messaging (SAM) v1.0," +as published by jrandom. + +Class Overview: + + SAMTerminal: Message sender/reader, talks to SAM Bridge + through a single socket. + StringBuffer: Queue for character data. + BaseSession: SAM session classes are derived from this. + StreamSession: Manipulate a SAM stream session through a + threadsafe, high-level interface. + DatagramSession: SAM datagram session, threadsafe, high level. + RawSession: SAM raw session, threadsafe, high level. + +Note that a 'None' timeout is an infinite timeout: it +blocks forever if necessary. + +Todo: + * Error handling is a huge mess. Neaten it up. + Subclass a ErrorMixin class, then use set_error(e), + check_error(), get_error(). + * Streams are a huge mess. Neaten them up. + * This whole interface is a tad confusing. Neaten it up. +""" + +# --------------------------------------------------------- +# Imports +# --------------------------------------------------------- + +import socket, thread, threading, time, string +import Queue, traceback, random, sys, shlex + +# --------------------------------------------------------- +# Import i2p and i2p.sam (for defaults and errors) +# --------------------------------------------------------- + +import i2p +import i2p.sam + +# --------------------------------------------------------- +# Functions +# --------------------------------------------------------- + +def sleep(): time.sleep(0.01) # Sleep between thread polls + +sam_log = False # Logging flag. Logs to ./log.txt. + +# ----------------------------------------------------- +# SAMTerminal +# ----------------------------------------------------- + +class SAMTerminal: + """Message-by-message communication with SAM through a single + socket. _on_* messages are dispatched to msgobj.""" + + def __init__(self, addr, msgobj): + try: self.host, self.port = addr.split(':') + except: raise ValueError('sam port required') + self.port = int(self.port) + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.msgobj = msgobj + try: + self.sock.connect((self.host, self.port)) + except: + raise i2p.RouterError('could not contact SAM bridge on ' + + self.host + ':' + str(self.port)) + thread.start_new_thread(self._poll_loop, ()) + self.error = None + self.lost_error = i2p.RouterError('SAM bridge connection lost') + + def _poll_loop(self): + """Polling loop for incoming messages.""" + try: + while True: + # Read until newline + line = [] + while True: + try: c = self.sock.recv(1) + except socket.error, ex: self.error = self.lost_error + if c == '': self.error = self.lost_error + if self.error != None: return + if c == '\n': break + if c != '': line += [c] + line = ''.join(line) + if sam_log: + logf = open('log.txt', 'a') + logf.write('\n' + line + '\n') + logf.close() + (msg, kwargs) = self._samdecode(line) + # Read N bytes if SIZE=N is present. + if 'SIZE' in kwargs: + data = [] + remain = int(kwargs['SIZE']) + while True: + try: s = self.sock.recv(remain) + except socket.error, ex: self.error = self.lost_error + if s == '': self.error = self.lost_error + if self.error != None: return + if s != '': data += [s] + remain -= len(s) + if remain <= 0: break + data = ''.join(data) + # Store the read data in kwargs['DATA']. + kwargs['DATA'] = data + del kwargs['SIZE'] + # Dispatch the message + try: self.on_message(msg, kwargs) + except Exception, e: + # On exception in on_message, print a warning, keep going. + print 'Unhandled exception in polling thread.' + traceback.print_exc() + + # Don't need to sleep since recv() blocks. + # End of while loop + except Exception, e: + # For other exceptions, print a fatal error and stop polling. + print 'Fatal exception in polling thread' + traceback.print_exc(); sys.exit() + + def _samdecode(self, s): + """Given a SAM command, returns (a, b), where a is the string at + the beginning of the command, and b is a dictionary of name, + value pairs for the command.""" + (args, kwargs) = ([], {}) + for w in shlex.split(s): + if '=' in w: kwargs[w.split('=')[0]] = w.split('=')[1] + else: args += [w] + return (' '.join(args), kwargs) + + def check_message(self, kwargs): + """Raises an error if kwargs['RESULT'] != 'OK'.""" + if not kwargs.get('RESULT', '') in ['OK', '']: + raise i2p.sam.NetworkError((kwargs['RESULT'], + kwargs.get('MESSAGE', ''))) + + def on_message(self, msg, kwargs): + """Process a SAM message that was received. Dispatch to + self._on_MESSAGE_NAME(**kwargs).""" + name = '_on_' + msg.upper().replace(' ', '_') + getattr(self.msgobj, name)(**kwargs) + + def send_message(self, msg): + """Send a message to the SAM bridge. A newline will be + automatically added if none is present.""" + self.check() + if not '\n' in msg: msg = msg + '\n' + if sam_log: + logf = open('log.txt', 'a') + logf.write('\n' + msg) + logf.close() + try: self.sock.sendall(msg) + except socket.error: self.error = self.lost_error + self.check() + + def check(self): + """Raise an error if terminal was closed, otherwise do + nothing.""" + if self.error != None: raise self.error + + def close(self): + """Close the SAM terminal.""" + # If data is sent via STREAM SEND, and the socket is closed + # immediately, the data will be lost. Delay 0.01 s to fix this + # bug (tested Windows, Linux). + time.sleep(0.01) + self.error = i2p.sam.ClosedError() + self.sock.close() + + def queue_get(self, q): + """Identical to q.get() unless a call to self.check() fails, + in which case the waiting is cut short with an error.""" + while True: + try: return q.get_nowait() + except Queue.Empty: pass + self.check() + sleep() + + +# ------------------------------------------------------- +# StringBuffer: A FIFO for string data. +# ------------------------------------------------------- + +class Deque: + """A double-ended queue.""" + def __init__(self): + self.a = [] + self.b = [] + def push_last(self, obj): + """Append obj to the end of the deque.""" + self.b.append(obj) + def push_first(self, obj): + """Prepend obj to the beginning of the deque.""" + self.a.append(obj) + def _partition(self): + if len(self) > 1: + self.a.reverse() + all = self.a + self.b + n = len(all) / 2 + self.a = all[:n] + self.b = all[n:] + self.a.reverse() + def pop_last(self): + """Pop an item off the end of the deque, and return it.""" + if not self.b: self._partition() + try: return self.b.pop() + except: return self.a.pop() + def pop_first(self): + """Pop an item off the beginning of the deque, and return it.""" + if not self.a: self._partition() + try: return self.a.pop() + except: return self.b.pop() + def __len__(self): + """Number of items in the deque.""" + return len(self.b) + len(self.a) + +class StringBuffer(Deque): + """A FIFO for characters. Strings can be efficiently + appended to the end, and read from the beginning. + + Example: + B = StringBuffer('Hello W') + B.append('orld!') + print B.read(5) # 'Hello' + print B.read() # 'World!' + """ + def __init__(self, s=''): + Deque.__init__(self) + self.length = 0 + self.append(s) + def append(self, s): + """Append string data to the end of the buffer.""" + n = 128 + for block in [s[i:i+n] for i in range(0,len(s),n)]: + self.push_last(block) + self.length += len(s) + def prepend(self, s): + """Prepend string data to the beginning of the buffer.""" + n = 128 + blocks = [s[i:i+n] for i in range(0,len(s),n)] + blocks.reverse() + for block in blocks: + self.push_first(block) + self.length += len(s) + def read(self, n=None): + """Read n bytes of data (or less if less data available) from the + beginning of the buffer. The data is removed. If n is + omitted, read the entire buffer.""" + if n == None or n > len(self): n = len(self) + destlen = len(self) - n + ans = [] + while len(self) > destlen: + ans += [self.pop_first()] + self.length -= len(ans[-1]) + ans = ''.join(ans) + self.prepend(ans[n:]) + ans = ans[:n] + return ans + def peek(self, n=None): + """Like read(), but do not remove the data that is returned.""" + ans = self.read(n) + self.prepend(ans) + return ans + def __len__(self): return self.length + def __str__(self): return self.peek() + def __repr__(self): return 'StringBuffer(' + str(self) + ')' + +# ----------------------------------------------------- +# BaseSession +# ----------------------------------------------------- + +class BaseSession: + """Base session, from which StreamSession, DatagramSession, + and RawSession are derived.""" + + def __init__(self, addr=''): + if addr == '': addr = i2p.sam.samaddr + self.term = SAMTerminal(addr=addr, msgobj=self) + self.lock = threading.Lock() # Data lock. + self.closed = False + self.qhello = Queue.Queue() # Thread messaging, HELLO REPLY. + self.qnaming = Queue.Queue() # Thread messaging, NAMING REPLY. + self.qsession = Queue.Queue() # Thread messaging, SESSION STATUS. + self._hello() # Do handshake with SAM bridge. + + def _hello(self): + """Internal command, handshake with SAM terminal.""" + self.term.send_message('HELLO VERSION MIN=' + + str(i2p.sam.samver) + ' MAX=' + str(i2p.sam.samver)) + self.term.check_message(self.term.queue_get(self.qhello)) + + def _on_HELLO_REPLY(self, **kwargs): + """Internal command, got HELLO REPLY.""" + self.qhello.put(kwargs) # Pass kwargs back to _hello. + + def _on_SESSION_STATUS(self, **kwargs): + """Internal command, got SESSION STATUS.""" + self.qsession.put(kwargs) # Pass kwargs back to main thread. + + def _namelookup(self, name): + """Internal command, does a NAMING LOOKUP query.""" + self.term.send_message('NAMING LOOKUP NAME=' + name) + # Read back response, check it, and return X in VALUE=X. + kwargs = self.term.queue_get(self.qnaming) + self.term.check_message(kwargs) + return kwargs['VALUE'] + + def _on_NAMING_REPLY(self, **kwargs): + """Internal command, got NAMING REPLY.""" + self.qnaming.put(kwargs) # Pass kwargs back to _namelookup. + + def _set_properties(self): + """Internal command, call at end of __init__ to set up + properties.""" + self.dest = self._namelookup('ME') + + def close(self): + """Close the session.""" + # Synchronize + self.lock.acquire() + try: + # Close the terminal if we're not already closed. + if not self.closed: self.term.close() + self.closed = True + finally: self.lock.release() + + def _encode_kwargs(self, **kwargs): + """Internal command, encode extra kwargs for passing to + SESSION CREATE.""" + ans = '' + for k in kwargs: + if k == 'in_depth': + ans += ' tunnels.depthInbound=' + \ + str(int(kwargs['in_depth'])) + elif k == 'out_depth': + ans += ' tunnels.depthOutbound=' + \ + str(int(kwargs['out_depth'])) + else: + raise ValueError('unexpected keyword argument ' + repr(k)) + return ans + +# ----------------------------------------------------- +# StreamSession +# ----------------------------------------------------- + +class StreamSession(BaseSession): + """Stream session. All methods are blocking and threadsafe.""" + + def __init__(self, name, addr='', **kwargs): + if addr == '': addr = i2p.sam.samaddr + BaseSession.__init__(self, addr) + self.idmap = {} # Maps id to Stream object. + self.qaccept = Queue.Queue() # Thread messaging, accept. + self.name = name + self.max_accept = 0 # Max queued incoming connections. + + # Create stream session. + if name == '': + name = 'TRANSIENT' + + # DIRECTION=BOTH (the default) is used because we can't know in + # advance whether a session will call listen(). + + self.term.send_message('SESSION CREATE STYLE=STREAM' + + ' DESTINATION=' + name + self._encode_kwargs(**kwargs)) + self.term.check_message(self.term.queue_get(self.qsession)) + + self._set_properties() + + def connect(self, dest, timeout=None): + """Create a stream connected to remote destination 'dest'. The + id is random. If the timeout is exceeded, do NOT raise an + error; rather, return a Stream object with .didconnect set + to False.""" + if not isinstance(dest, type('')): raise TypeError + # Synchronize + self.lock.acquire() + try: + # Pick a positive stream id at random. + while True: + # 9/10 probability of success per iteration + id = random.randrange(1, len(self.idmap) * 10 + 2) + if not id in self.idmap: + ans = Stream(self, dest, id, didconnect=False) + self.idmap[id] = ans + break + finally: self.lock.release() + # Send STREAM CONNECT and wait for reply. + self.term.send_message('STREAM CONNECT ID=' + str(id) + + ' DESTINATION=' + str(dest)) + + # Now wait until the stream's .didconnect flag is set to True. + if timeout != None: end = time.clock() + timeout + while True: + self.term.check() + if ans.didconnect: break + if timeout != None and time.clock() >= end: break + sleep() + + return ans + + def _on_STREAM_STATUS(self, **kwargs): + """Internal command, got STREAM STATUS. Unblocks connect.""" + # Store error is needed + try: self.term.check_message(kwargs) + except Exception, e: + try: self.idmap[int(kwargs['ID'])].err = e + except: pass # Closed too quickly + + # Now set .didconnect flag to True. + try: self.idmap[int(kwargs['ID'])].didconnect = True + except: pass # Closed too quickly + + def accept(self, timeout=None): + """Wait for incoming connection, and return a Stream object + for it.""" + if self.max_accept <= 0: + raise i2p.Error('listen(n) must be called before accept ' + + '(n>=1)') + if timeout != None: end = time.clock() + timeout + while True: + self.term.check() + # Synchronized + self.lock.acquire() + try: + # Get Stream object if available. + if self.qaccept.qsize() > 0: + return self.term.queue_get(self.qaccept) + finally: self.lock.release() + if timeout != None and time.clock() >= end: break + sleep() + + # Handle timeout and blocking errors + if timeout == 0.0: + raise i2p.sam.BlockError('command would have blocked') + else: + raise i2p.sam.Timeout('timed out') + + def listen(self, backlog): + """Set maximum number of queued connections.""" + if self.closed: raise sam.ClosedError() + self.max_accept = backlog + + def _on_STREAM_CONNECTED(self, **kwargs): + """Got STREAM CONNECTED command. This is what accept() commands + wait for.""" + # Synchronize + self.lock.acquire() + try: + # Drop connection if over maximum size. + if self.qaccept.qsize() >= self.max_accept: + self.term.send_message('STREAM CLOSE ID=' + + str(int(kwargs['ID']))) + return + + # Parse, create Stream, and place on self.qaccept. + self.term.check_message(kwargs) + # A negative id is chosen for us + id = int(kwargs['ID']) + self.idmap[id] = Stream(self, kwargs['DESTINATION'], id) + # Pass Stream object back to accept. + self.qaccept.put(self.idmap[id]) + finally: self.lock.release() + + def _send_stream(self, id, data): + """Internal command, send data to stream id. Use Stream.send + in your code.""" + self.term.send_message('STREAM SEND ID=' + str(id) + ' SIZE=' + + str(len(data)) + '\n' + data) + + def _on_STREAM_CLOSED(self, **kwargs): + """Got STREAM CLOSED command. Call idmap[id].on_close(e) and + delete idmap[id].""" + id = int(kwargs['ID']) + + # No error is produced for a graceful remote close. + e = None + try: self.term.check_message(kwargs) + except i2p.Error, err: e = err + + # Synchronize + self.lock.acquire() + try: + # Sent STREAM CLOSE, SAM didn't hear us in time. + if not id in self.idmap: return + # Pop id from self.idmap, if available. + obj = self.idmap[id] + del self.idmap[id] + finally: self.lock.release() + + # Process on_close message. + obj.on_close(None) + + def _on_STREAM_RECEIVED(self, **kwargs): + """Got STREAM RECEIVED command. Dispatch to + idmap[id].on_receive(s).""" + id = int(kwargs['ID']) + if not id in self.idmap: + # _on_STREAM_CONNECTED blocks until self.idmap[id] is properly + # set up. Therefore, we have received a stream packet despite + # closing the stream immediately after _on_STREAM_CONNECTED + # (SAM ignored us). So ignore it. + return + self.idmap[id].on_receive(kwargs['DATA']) + + def __len__(self): + """Unconnected session; has no read data available.""" + return 0 + + +class Stream: + """Receives and sends data for an individual stream.""" + + def __init__(self, parent, remotedest, id, didconnect=True): + self.parent = parent + self.buf = StringBuffer() + self.localdest = parent.dest + self.remotedest = remotedest + self.id = id + # Data lock. Allow multiple acquire()s by same thread + self.lock = threading.RLock() + self.closed = False + # Error message, on STREAM STATUS, or on STREAM CLOSED. + self.err = None + # Whether stream got a STREAM CONNECTED message + self.didconnect = didconnect + + def send(self, s): + """Sends the string s, blocking if necessary.""" + id = self.id + if self.closed or id == None: + if self.err != None: raise self.err + raise i2p.sam.ClosedError('stream closed') + if len(s) == 0: return + nmax = 32768 + for block in [s[i:i+nmax] for i in range(0,len(s),nmax)]: + self.parent._send_stream(id, block) + + def recv(self, n, timeout=None, peek=False, waitall=False): + """Reads up to n bytes in a manner identical to socket.recv. + Blocks for up to timeout seconds if n > 0 and no data is + available (timeout=None means wait forever). If still no data + is available, raises BlockError or Timeout. For a closed + stream, recv will read the data stored in the buffer until + EOF, at which point the read data will be truncated. If peek + is True, the data is not removed. If waitall is True, reads + exactly n bytes, or raises BlockError or Timeout as + appropriate. Returns data.""" + + if n < 0: raise ValueError + if n == 0: return '' + + minlen = 1 + if waitall: minlen = n + + if timeout != None: end = time.clock() + timeout + while True: + # Synchronized check and read until data available. + self.parent.term.check() + self.lock.acquire() + try: + if len(self.buf) >= minlen: + if peek: return self.buf.peek(n) + else: return self.buf.read(n) + # Graceful close: return as much data as possible + # (up to n bytes). + if self.closed and self.err == None: return self.buf.read(n) + # Ungraceful close: raise an error. + if self.err != None: raise self.err + finally: self.lock.release() + if timeout != None and time.clock() >= end: break + sleep() + + # Handle timeout and blocking error + if timeout == 0.0: + raise i2p.sam.BlockError('command would have blocked') + else: + raise i2p.sam.Timeout('timed out') + + def __len__(self): + """Current length of read buffer.""" + return len(self.buf) + + def close(self): + """Close the stream. Threadsafe.""" + # Synchronize self.parent. + self.parent.lock.acquire() + try: + if not self.closed: + self.closed = True + id = self.id + # Set self.id to None, so we don't close a new stream by + # accident. + self.id = None + if not id in self.parent.idmap: return + self.parent.term.send_message('STREAM CLOSE ID=' + str(id)) + # No error is produced for a locally closed stream + self.on_close(None) + del self.parent.idmap[id] + finally: self.parent.lock.release() + + def on_receive(self, s): + # Synchronize + self.lock.acquire() + try: + self.buf.append(s) + finally: self.lock.release() + + def on_close(self, e): + self.closed = True + self.err = e + + def __del__(self): + self.close() + +# ----------------------------------------------------- +# DatagramSession +# ----------------------------------------------------- + +class DatagramSession(BaseSession): + """Datagram session. All methods are blocking and threadsafe.""" + + def __init__(self, name, addr='', **kwargs): + if addr == '': addr = i2p.sam.samaddr + BaseSession.__init__(self, addr) + self.buf = Deque() # FIFO of incoming packets. + self.name = name + + # Create datagram session + if name == '': name = 'TRANSIENT' + self.term.send_message('SESSION CREATE STYLE=DATAGRAM ' + + 'DESTINATION=' + name + self._encode_kwargs(**kwargs)) + self.term.check_message(self.term.queue_get(self.qsession)) + + self._set_properties() + + def _on_DATAGRAM_RECEIVED(self, **kwargs): + """Internal method, got DATAGRAM RECEIVED.""" + # Synchronized + self.lock.acquire() + try: + self.buf.push_last((kwargs['DATA'], kwargs['DESTINATION'])) + finally: self.lock.release() + + def send(self, s, dest): + """Send packet with contents s to given destination.""" + # Raise error if packet is too large. + if len(s) > i2p.sam.MAX_DGRAM: + raise ValueError('packets must have length <= ' + + str(i2p.sam.MAX_DGRAM) + ' bytes') + self.term.send_message('DATAGRAM SEND DESTINATION=' + dest + + ' SIZE=' + str(len(s)) + '\n' + s) + + def recv(self, timeout=None, peek=False): + """Get a single packet. Blocks for up to timeout seconds if + n > 0 and no packet is available (timeout=None means wait + forever). If still no packet is available, raises BlockError + or Timeout. Returns the pair (data, address). If peek is + True, the data is not removed.""" + if timeout != None: end = time.clock() + timeout + while True: + self.term.check() + # Synchronized check and read until data available. + self.lock.acquire() + try: + if len(self.buf) > 0: + if peek: + ans = self.buf.pop_first() + self.buf.push_first(ans) + return ans + else: + return self.buf.pop_first() + finally: self.lock.release() + if timeout != None and time.clock() >= end: break + sleep() + + # Handle timeout and blocking error + if timeout == 0.0: + raise i2p.sam.BlockError('command would have blocked') + else: + raise i2p.sam.Timeout('timed out') + + def __len__(self): + """Number of packets in read buffer.""" + return len(self.buf) + +# ----------------------------------------------------- +# RawSession +# ----------------------------------------------------- + +class RawSession(BaseSession): + """Raw session. All methods are blocking and threadsafe.""" + + def __init__(self, name, addr='', **kwargs): + if addr == '': addr = i2p.sam.samaddr + BaseSession.__init__(self, addr) + self.buf = Deque() # FIFO of incoming packets. + self.name = name + + # Create raw session + if name == '': name = 'TRANSIENT' + self.term.send_message('SESSION CREATE STYLE=RAW DESTINATION=' + + name + self._encode_kwargs(**kwargs)) + self.term.check_message(self.term.queue_get(self.qsession)) + + self._set_properties() + + def _on_RAW_RECEIVED(self, **kwargs): + """Internal method, got RAW RECEIVED.""" + # Synchronized + self.lock.acquire() + try: + self.buf.push_last((kwargs['DATA'], '')) + finally: self.lock.release() + + def send(self, s, dest): + """Send packet with contents s to given destination.""" + # Raise error if packet is too big + if len(s) > i2p.sam.MAX_RAW: + raise ValueError('packets must have length <= ' + + str(i2p.sam.MAX_RAW) + ' bytes') + self.term.send_message('RAW SEND DESTINATION=' + dest + + ' SIZE=' + str(len(s)) + '\n' + s) + + def recv(self, timeout=None, peek=False): + """Identical to DatagramSocket.recv. The from address is an + empty string.""" + if timeout != None: end = time.clock() + timeout + while True: + self.term.check() + # Synchronized check and read until data available. + self.lock.acquire() + try: + if len(self.buf) > 0: + if peek: + ans = self.buf.pop_first() + self.buf.push_first(ans) + return ans + else: + return self.buf.pop_first() + finally: self.lock.release() + if timeout != None and time.clock() >= end: break + sleep() + + # Handle timeout and blocking error + if timeout == 0.0: + raise i2p.sam.BlockError('command would have blocked') + else: + raise i2p.sam.Timeout('timed out') + + def __len__(self): + """Number of packets in read buffer.""" + return len(self.buf) + + +# ----------------------------------------------------- +# End of file +# ----------------------------------------------------- diff --git a/apps/sam/python/src/i2p/test/readme.txt b/apps/sam/python/src/i2p/test/readme.txt new file mode 100644 index 000000000..9012b1137 --- /dev/null +++ b/apps/sam/python/src/i2p/test/readme.txt @@ -0,0 +1,6 @@ + +Unit tests for I2P Python interface. + +Note that these aren't all unit tests yet. + +Some are demos, some require manual intervention. diff --git a/apps/sam/python/src/i2p/test/test_eep.py b/apps/sam/python/src/i2p/test/test_eep.py new file mode 100644 index 000000000..34b11f078 --- /dev/null +++ b/apps/sam/python/src/i2p/test/test_eep.py @@ -0,0 +1,37 @@ + +# ----------------------------------------------------- +# test_eep.py: Unit tests for eep.py. +# ----------------------------------------------------- + +# Make sure we can import i2p +import sys; sys.path += ['../../'] + +import traceback, sys +from i2p import eep, sam, samclasses + +def verify_html(s): + """Raise an error if s does not end with """ + assert s.strip().lower()[-7:] == '' + +def eepget_test(): + try: + verify_html(eep.urlget('http://duck.i2p/index.html')) + verify_html(eep.urlget('http://duck.i2p/')) + verify_html(eep.urlget('http://duck.i2p')) + verify_html(eep.urlget('duck.i2p/')) + verify_html(eep.urlget('duck.i2p')) + except Exception, e: + print 'Unit test failed for eepget' + print "Note that urllib2.urlopen uses IE's proxy settings " + \ + "in Windows." + print "This may cause " + \ + "urllib2.urlopen('http://www.google.com/') to fail." + traceback.print_exc(); sys.exit() + print 'eepget: OK' + +def test(): + eepget_test() + +if __name__ == '__main__': + print 'Testing:' + test() diff --git a/apps/sam/python/src/i2p/test/test_sam.py b/apps/sam/python/src/i2p/test/test_sam.py new file mode 100644 index 000000000..c2d142875 --- /dev/null +++ b/apps/sam/python/src/i2p/test/test_sam.py @@ -0,0 +1,442 @@ + +# ----------------------------------------------------- +# test_sam.py: Unit tests for sam.py. +# ----------------------------------------------------- + +# Make sure we can import i2p +import sys; sys.path += ['../../'] + +import traceback, time, thread, threading, random, copy +from i2p import eep, sam + +def test_passed(s, msg='OK'): + """Notify user that the given unit test passed.""" + print ' ' + (s + ':').ljust(50) + msg + +def verify_html(s): + """Raise an error if s does not end with """ + assert s.strip().lower()[-7:] == '' + +def stream_client(dest): + """Sub-unit test for sam.socket in SOCK_STREAM mode.""" + S = sam.socket('Alice', sam.SOCK_STREAM) + S.connect(dest) + S.send('GET / HTTP/1.0\r\n\r\n') # Send request + f = S.makefile() # File object + + while True: # Read header + line = f.readline().strip() # Read a line + if line == '': break # Content begins + + s = f.read() # Get content + f.close() + S.close() + +def stream_client_test(): + """Unit test for sam.socket in SOCK_STREAM mode.""" + url = 'morph.i2p' + stream_client('http://' + url + '/') + stream_client(url) + stream_client(url + '/') + stream_client('http://' + url) + stream_client(sam.resolve('http://' + url + '/')) + test_passed('sam.socket stream client') + +def packet_test(raw=True): + """Unit test for sam.socket in SOCK_DGRAM or SOCK_RAW modes.""" + + try: + multithread_wait_time = 500.0 + may_need_increase = False + + if raw: + C = sam.socket('Carola', sam.SOCK_RAW, in_depth=0, out_depth=0) + D = sam.socket('Davey', sam.SOCK_RAW, in_depth=0, out_depth=0) + else: + C = sam.socket('Carol', sam.SOCK_DGRAM,in_depth=0,out_depth=0) + D = sam.socket('Dave', sam.SOCK_DGRAM, in_depth=0, out_depth=0) + + global C_recv, D_recv, C_got, D_got, __lock + C_recv = [] # Packets C *should* receive + D_recv = [] # Packets D *should* receive + C_got = [] # Packets C actually got + D_got = [] # Packets D actually got + + n = 50 # Create n threads + m = 40 # Each thread sends m packets + + global __done_count + __done_count = 0 + __lock = threading.Lock() + + # Use C and D to send and read in many different threads. + def f(): + # This code is run in each separate thread + global C_recv, D_recv, C_got, D_got, __lock, __done_count + for i in range(m): + # Random binary string of length 2-80. + index_list = range(random.randrange(2, 80)) + s = ''.join([chr(random.randrange(256)) for j in index_list]) + if random.randrange(2) == 0: + # Send packet from C to D, and log it. + C.sendto(s, 0, D.dest) + __lock.acquire() + D_recv += [s] + __lock.release() + else: + # Send packet from D to C, and log it. + D.sendto(s, 0, C.dest) + __lock.acquire() + C_recv += [s] + __lock.release() + time.sleep(0.01*random.uniform(0.0,1.0)) + # Read any available packets. + try: (p, fromaddr) = C.recvfrom(1000, sam.MSG_DONTWAIT) + except sam.BlockError: p = None + if p != None and not raw: assert fromaddr == D.dest + + __lock.acquire() + if p != None: C_got += [p] + __lock.release() + + try: (p, fromaddr) = D.recvfrom(1000, sam.MSG_DONTWAIT) + except sam.BlockError: p = None + if p != None and not raw: assert fromaddr == C.dest + + __lock.acquire() + if p != None: D_got += [p] + __lock.release() + + __lock.acquire() + __done_count += 1 + __lock.release() + + # Create n threads. + for i in range(n): + threading.Thread(target=f).start() + + # Wait for them to finish. + while __done_count < n: time.sleep(0.01) + + # Read any left-over received packets. + end_time = time.clock() + multithread_wait_time + while time.clock() < end_time: + # Read any available packets. + try: (p, fromaddr) = C.recvfrom(1000, sam.MSG_DONTWAIT) + except sam.BlockError: p = None + if p != None and not raw: assert fromaddr == D.dest + + if p != None: C_got += [p] + + try: (p, fromaddr) = D.recvfrom(1000, sam.MSG_DONTWAIT) + except sam.BlockError: p = None + if p != None and not raw: assert fromaddr == C.dest + + if p != None: D_got += [p] + if len(C_got) == len(C_recv) and len(D_got) == len(D_recv): + break + + if time.clock() >= end_time: + may_need_increase = True + + C_got.sort() + D_got.sort() + C_recv.sort() + D_recv.sort() + + assert C_got == C_recv + assert D_got == D_recv + + C.close() + D.close() + except: + if raw: + print 'Unit test failed for sam.socket (SOCK_RAW).' + print 'Raw packets are not reliable.' + else: + print 'Unit test failed for sam.socket (SOCK_DGRAM).' + print 'Datagram packets are not reliable.' + + if may_need_increase: + print 'Try increasing multithread_wait_time.' + + traceback.print_exc(); sys.exit() + + if raw: + test_passed('sam.socket (SOCK_RAW)') + else: + test_passed('sam.socket (SOCK_RAW)') + +def stream_test(): + """Multithreaded unit test for sam.socket (SOCK_STREAM).""" + + try: + multithread_wait_time = 200.0 + may_need_increase = False + + kwargs = {'in_depth':0, 'out_depth':0} + C = sam.socket('Carolic', sam.SOCK_STREAM, **kwargs) + D = sam.socket('David', sam.SOCK_STREAM, **kwargs) + Cout = sam.socket('Carolic', sam.SOCK_STREAM, **kwargs) + Dout = sam.socket('David', sam.SOCK_STREAM, **kwargs) + + assert C.dest == Cout.dest + assert D.dest == Dout.dest + + C.listen(5) + D.listen(5) + Cout.connect(D.dest) + Dout.connect(C.dest) + (Cin, ignoredest) = C.accept() + (Din, ignoredest) = D.accept() + + global C_recv, D_recv, C_got, D_got, __lock + C_recv = [] # String data C *should* receive + D_recv = [] # String data D *should* receive + C_got = [] # String data C actually got + D_got = [] # String data D actually got + + n = 50 # Create n threads + m = 40 # Each thread sends m strings + + global __done_count + __done_count = 0 + __lock = threading.Lock() + + # Use C and D to send and read in many different threads. + def f(): + # This code is run in each separate thread + global C_recv, D_recv, C_got, D_got, __lock, __done_count + for i in range(m): + # Random binary string of length 2-80. + index_list = range(random.randrange(2, 80)) + s = ''.join([chr(random.randrange(256)) for j in index_list]) + if random.randrange(2) == 0: + # Send packet from C to D, and log it. + __lock.acquire() + Cout.send(s) + D_recv += [s] + __lock.release() + else: + # Send packet from D to C, and log it. + __lock.acquire() + Dout.send(s) + C_recv += [s] + __lock.release() + time.sleep(0.01*random.uniform(0.0,1.0)) + # Read any available string data, non-blocking. + + __lock.acquire() + try: p = Cin.recv(100000, sam.MSG_DONTWAIT) + except sam.BlockError: p = None + if p != None: C_got += [p] + __lock.release() + + __lock.acquire() + try: p = Din.recv(100000, sam.MSG_DONTWAIT) + except sam.BlockError: p = None + if p != None: D_got += [p] + __lock.release() + + __lock.acquire() + __done_count += 1 + __lock.release() + + # Create n threads. + for i in range(n): + threading.Thread(target=f).start() + + # Wait for them to finish. + while __done_count < n: time.sleep(0.01) + + # Read any left-over received string data. + end_time = time.clock() + multithread_wait_time + while time.clock() < end_time: + # Read any available string data, non-blocking. + try: p = Cin.recv(100000, sam.MSG_DONTWAIT) + except sam.BlockError: p = None + if p != None: C_got += [p] + + try: p = Din.recv(100000, sam.MSG_DONTWAIT) + except sam.BlockError: p = None + if p != None: D_got += [p] + + if len(''.join(C_got)) == len(''.join(C_recv)) and \ + len(''.join(D_got)) == len(''.join(D_recv)): + break + + if time.clock() >= end_time: + may_need_increase = True + + C_got = ''.join(C_got) + D_got = ''.join(D_got) + C_recv = ''.join(C_recv) + D_recv = ''.join(D_recv) + assert C_got == C_recv + assert D_got == D_recv + + Cin.close() + Din.close() + Cout.close() + Dout.close() + C.close() + D.close() + except: + print 'Unit test failed for sam.socket ' + \ + '(SOCK_STREAM, multithreaded).' + + if may_need_increase: + print 'Try increasing multithread_wait_time.' + + traceback.print_exc(); sys.exit() + + test_passed('sam.socket (SOCK_STREAM, multithreaded)') + + +def noblock_stream_test(): + """Unit test for non-blocking stream commands and listen.""" + + serv = sam.socket('Allison',sam.SOCK_STREAM,in_depth=0,out_depth=0) + serv.setblocking(False) + serv.listen(100) + assert serv.gettimeout() == 0.0 + + msg_to_client = 'Hi, client!!!!' + msg_to_server = 'Hi, server!' + + nconnects = 5 + + global server_done, client_count, client_lock + server_done = False + client_count = 0 + client_lock = threading.Lock() + + def serv_func(n = nconnects): + while True: + try: + (C, ignoredest) = serv.accept() + C.send(msg_to_client) + rmsg = C.recv(len(msg_to_server), sam.MSG_WAITALL) + if rmsg != msg_to_server: + raise ValueError('message should have been: ' + + repr(msg_to_server) + ' was: ' + repr(rmsg)) + C.close() + n -= 1 + if n == 0: break + except sam.BlockError: + pass + time.sleep(0.01) + global server_done + server_done = True + + def client_func(): + # FIXME: i2p.sam.NetworkError('TIMEOUT', '') errors are produced + # for our streams if we use '' for all clients. Why? + C = sam.socket('Bobb', sam.SOCK_STREAM, in_depth=0, out_depth=0) + C.setblocking(False) + try: + C.connect(serv.dest) + except sam.BlockError: + # One could also use timeout=0.1 and loop + (Rlist, Wlist, Elist) = sam.select([C], [C], [C]) + if len(Elist) > 0: + assert Elist[0] == C + raise Elist[0].sessobj.err + C.send(msg_to_server) + C.setblocking(True) + rmsg = C.recv(len(msg_to_client), sam.MSG_WAITALL) + if rmsg != msg_to_client: + raise ValueError('message should have been: ' + + repr(msg_to_client) + ' was: ' + repr(rmsg)) + C.close() + global client_count, client_lock + + # Synchronized + client_lock.acquire() + try: client_count += 1 + finally: client_lock.release() + + + thread.start_new_thread(serv_func, ()) + + for i in range(nconnects): + thread.start_new_thread(client_func, ()) + + while True: + if server_done and client_count == nconnects: break + time.sleep(0.01) + + test_passed('sam.listen (SOCK_STREAM), and non-blocking IO') + +def tunnel_server_demo(): + """Demo for TunnelServer.""" + + T = sam.TunnelServer('Alisick', 8080, in_depth=0, out_depth=0) + + print 'Server ready at:' + print T.dest + while True: + time.sleep(0.01) + +def tunnel_client_demo(): + """Demo for TunnelClient.""" + + T = sam.TunnelClient('Alliaha', 8001, 'duck.i2p', \ + in_depth=0, out_depth=0) + + print 'Serving up duck.i2p at http://127.0.0.1:8001/' + while True: + time.sleep(0.01) + + + +# select, poll +# tunnel_client, tunnel_server +# noblocking unit tests + +def multi_stream_test(n): + """See if we can have n streams open at once.""" + server = None + client = [None] * n + + server = sam.socket('Aligi',sam.SOCK_STREAM,in_depth=0,out_depth=0) + server.listen(n) + + for i in range(n): + client[i] = sam.socket('Bobo', sam.SOCK_STREAM, \ + in_depth=0, out_depth=0) + + for i in range(n): + client[i].connect(server.dest) + client[i].send('Hi') + + for i in range(n): + client[i].close() + server.close() + + test_passed(str(n) + ' streams open at once') + + + +# Todo: Write unit tests for TunnelServer, TunnelClient. + +def test(): + print 'Testing:' + print "Comment and uncomment tests manually, if they don't finish." + +# noblock_stream_test() +# stream_client_test() +# packet_test(raw=True) +# stream_test() +# multi_stream_test(200) + +# Demos (manual unit tests): +# tunnel_server_demo() +# tunnel_client_demo() # This fails too + +# Note: The datagram unit test fails, apparently due to a bug in I2P +# (packet loss). +## packet_test(raw=False) + +if __name__ == '__main__': + test() diff --git a/apps/sam/python/src/i2p/test/test_samclasses.py b/apps/sam/python/src/i2p/test/test_samclasses.py new file mode 100644 index 000000000..7609dd381 --- /dev/null +++ b/apps/sam/python/src/i2p/test/test_samclasses.py @@ -0,0 +1,446 @@ + +# ----------------------------------------------------- +# test_samclasses.py: Unit tests for samclasses.py. +# ----------------------------------------------------- + +# Make sure we can import i2p +import sys; sys.path += ['../../'] + +import traceback, time, thread, threading, random +from i2p import eep, sam, samclasses + +def test_passed(s, msg='OK'): + """Notify user that the given unit test passed.""" + print ' ' + (s + ':').ljust(50) + msg + +def verify_html(s): + """Raise an error if s does not end with """ + assert s.strip().lower()[-7:] == '' + +def resolve_test(name='duck.i2p'): + """Unit test for resolve.""" + try: + rname = sam.resolve(name) + except: + print 'Unit test failed for sam.resolve' + traceback.print_exc(); sys.exit() + + test_passed('sam.resolve', 'See below') + print ' Use hosts.txt to verify that ' + name + '=' + \ + rname[:15] + '...' + +def raw_test1(): + """Unit test for samclasses.RawSession.""" + + try: + C = samclasses.RawSession('Carol') + D = samclasses.RawSession('Dave') + + C.send('Hello!', D.dest) + D.send('Hi C!', C.dest) + + (packet, addr) = C.recv(1000) + assert packet == 'Hi C!' + (packet, addr) = D.recv(1000) + assert packet == 'Hello!' + C.close() + D.close() + except: + print 'Unit test failed for samclasses.RawSession' + traceback.print_exc(); sys.exit() + test_passed('samclasses.RawSession') + +def datagram_test1(): + """Unit test for samclasses.DatagramSession.""" + + try: + C = samclasses.DatagramSession('Carol') + D = samclasses.DatagramSession('Dave') + + C.send('Hello!', D.dest) + D.send('Hi C!', C.dest) + + (packet, remotedest) = C.recv(1000) + assert str(packet) == 'Hi C!' and remotedest == D.dest + (packet, remotedest) = D.recv(1000) + assert str(packet) == 'Hello!' and remotedest == C.dest + C.close() + D.close() + except: + print 'Unit test failed for samclasses.DatagramSession' + traceback.print_exc(); sys.exit() + test_passed('samclasses.DatagramSession') + +def stream_readline(S): + """Read a line, with a \r\n newline, including trailing \r\n.""" + ans = [] + while True: + c = S.recv(1) + if c == '': break + if c == '\n': break + ans += [c] + return ''.join(ans) + +def stream_http_get(S, dest): + """Get contents of http://dest/ via HTTP/1.0 and + samclasses.StreamSession S.""" + C = S.connect(dest) + + C.send('GET / HTTP/1.0\r\n\r\n') + + while True: + line = stream_readline(C).strip() + if line.find('Content-Length: ') == 0: + clen = int(line.split()[1]) + if line == '': break + + s = C.recv(clen, timeout=None) + time.sleep(2.0) + C.close() + return s + +def stream_test1(): + """Unit test for samclasses.StreamSession.connect.""" + + try: + dest = sam.resolve('duck.i2p') + S = samclasses.StreamSession('Bob') + verify_html(stream_http_get(S, dest)) + verify_html(stream_http_get(S, dest)) + verify_html(stream_http_get(S, dest)) + S.close() + + except: + print 'Unit test failed for samclasses.StreamSession' + traceback.print_exc(); sys.exit() + test_passed('samclasses.StreamSession.connect') + +def stream_test2(): + """Unit test for samclasses.StreamSession.accept.""" + global __server_done, __client_done, __err + __server_done = False + __client_done = False + __err = None + + S = samclasses.StreamSession('Bob') + S.listen(10) + msg = '

Hello!

' + + def serve(): + try: + # Serve 3 connections, then quit. + for i in range(3): + C = S.accept() # Get a connection. + req = stream_readline(C) # Read HTTP request. + + s = msg # Message to send back + + C.send('HTTP/1.0 200 OK\r\nContent-Type: text/html\r\n' + + 'Content-Length: ' + str(int(len(s))) + '\r\n\r\n' + s) + + if i % 2 == 0: C.close() # Close connection + S.close() + except Exception, e: + global __err + __err = e + global __server_done + __server_done = True + + thread.start_new_thread(serve, ()) + # Wait for accept to kick in (should work without). + time.sleep(2.0) + + def client(): + try: + S2 = samclasses.StreamSession('Carol') + # Get / on server three times. + assert stream_http_get(S2, S.dest) == msg + assert stream_http_get(S2, S.dest) == msg + assert stream_http_get(S2, S.dest) == msg + S2.close() + except Exception, e: + global __err + __err = e + global __client_done + __client_done = True + + thread.start_new_thread(client, ()) + + while not (__client_done and __server_done): time.sleep(0.01) + + if __err != None: + print 'Unit test failed for samclasses.StreamSession.accept' + raise __err + test_passed('samclasses.StreamSession.accept') + +def multithread_packet_test(raw=True): + """If raw: Multithreaded unit test for samclasses.RawSession. + Not raw: Multithreaded unit test for samclasses.DatagramSession. + """ + + try: + multithread_wait_time = 200.0 + may_need_increase = False + + if raw: + C = samclasses.RawSession('Carol', in_depth=0, out_depth=0) + D = samclasses.RawSession('Dave', in_depth=0, out_depth=0) + else: + C = samclasses.DatagramSession('Carol',in_depth=0,out_depth=0) + D = samclasses.DatagramSession('Dave',in_depth=0,out_depth=0) + + global C_recv, D_recv, C_got, D_got, __lock + C_recv = [] # Packets C *should* receive + D_recv = [] # Packets D *should* receive + C_got = [] # Packets C actually got + D_got = [] # Packets D actually got + + n = 50 # Create n threads + m = 40 # Each thread sends m packets + + global __done_count + __done_count = 0 + __lock = threading.Lock() + + # Use C and D to send and read in many different threads. + def f(): + # This code is run in each separate thread + global C_recv, D_recv, C_got, D_got, __lock, __done_count + for i in range(m): + # Random binary string of length 2-80. + index_list = range(random.randrange(2, 80)) + s = ''.join([chr(random.randrange(256)) for j in index_list]) + if random.randrange(2) == 0: + # Send packet from C to D, and log it. + C.send(s, D.dest) + __lock.acquire() + D_recv += [s] + __lock.release() + else: + # Send packet from D to C, and log it. + D.send(s, C.dest) + __lock.acquire() + C_recv += [s] + __lock.release() + time.sleep(0.01*random.uniform(0.0,1.0)) + # Read any available packets. + try: (p, fromaddr) = C.recv(timeout=0.0) + except sam.BlockError: p = None + if p != None and not raw: assert fromaddr == D.dest + + __lock.acquire() + if p != None: C_got += [p] + __lock.release() + + try: (p, fromaddr) = D.recv(timeout=0.0) + except sam.BlockError: p = None + if p != None and not raw: assert fromaddr == C.dest + + __lock.acquire() + if p != None: D_got += [p] + __lock.release() + + __lock.acquire() + __done_count += 1 + __lock.release() + + # Create n threads. + for i in range(n): + threading.Thread(target=f).start() + + # Wait for them to finish. + while __done_count < n: time.sleep(0.01) + + # Read any left-over received packets. + end_time = time.clock() + multithread_wait_time + while time.clock() < end_time: + # Read any available packets. + try: (p, fromaddr) = C.recv(timeout=0.0) + except sam.BlockError: p = None + if p != None and not raw: assert fromaddr == D.dest + + if p != None: C_got += [p] + + try: (p, fromaddr) = D.recv(timeout=0.0) + except sam.BlockError: p = None + if p != None and not raw: assert fromaddr == C.dest + + if p != None: D_got += [p] + if len(C_got) == len(C_recv) and len(D_got) == len(D_recv): + break + + if time.clock() >= end_time: + may_need_increase = True + + C_got.sort() + D_got.sort() + C_recv.sort() + D_recv.sort() + assert C_got == C_recv + assert D_got == D_recv + + C.close() + D.close() + except: + if raw: + print 'Unit test failed for samclasses.RawSession ' + \ + '(multithreaded).' + print 'Raw packets are not reliable.' + else: + print 'Unit test failed for samclasses.DatagramSession ' + \ + '(multithreaded).' + print 'Datagram packets are not reliable.' + + if may_need_increase: + print 'Try increasing multithread_wait_time.' + + traceback.print_exc(); sys.exit() + if raw: + test_passed('samclasses.RawSession (multithreaded)') + else: + test_passed('samclasses.DatagramSession (multithreaded)') + + + +def multithread_stream_test(): + """Multithreaded unit test for samclasses.StreamSession.""" + + try: + multithread_wait_time = 200.0 + may_need_increase = False + + C = samclasses.StreamSession('Carol', in_depth=0, out_depth=0) + D = samclasses.StreamSession('Dave', in_depth=0, out_depth=0) + C.listen(10) + D.listen(10) + + Cout = C.connect(D.dest) + Dout = D.connect(C.dest) + Cin = C.accept() + Din = D.accept() + + global C_recv, D_recv, C_got, D_got, __lock + C_recv = [] # String data C *should* receive + D_recv = [] # String data D *should* receive + C_got = [] # String data C actually got + D_got = [] # String data D actually got + + n = 50 # Create n threads + m = 40 # Each thread sends m strings + + global __done_count + __done_count = 0 + __lock = threading.Lock() + + # Use C and D to send and read in many different threads. + def f(): + # This code is run in each separate thread + global C_recv, D_recv, C_got, D_got, __lock, __done_count + for i in range(m): + # Random binary string of length 2-80. + index_list = range(random.randrange(2, 80)) + s = ''.join([chr(random.randrange(256)) for j in index_list]) + if random.randrange(2) == 0: + # Send packet from C to D, and log it. + __lock.acquire() + Cout.send(s) + D_recv += [s] + __lock.release() + else: + # Send packet from D to C, and log it. + __lock.acquire() + Dout.send(s) + C_recv += [s] + __lock.release() + time.sleep(0.01*random.uniform(0.0,1.0)) + # Read any available string data, non-blocking. + + __lock.acquire() + try: p = Cin.recv(100000, timeout=0.0) + except sam.BlockError: p = None + if p != None: C_got += [p] + __lock.release() + + __lock.acquire() + try: p = Din.recv(100000, timeout=0.0) + except sam.BlockError: p = None + if p != None: D_got += [p] + __lock.release() + + __lock.acquire() + __done_count += 1 + __lock.release() + + # Create n threads. + for i in range(n): + threading.Thread(target=f).start() + + # Wait for them to finish. + while __done_count < n: time.sleep(0.01) + + # Read any left-over received string data. + end_time = time.clock() + multithread_wait_time + while time.clock() < end_time: + # Read any available string data, non-blocking. + try: p = Cin.recv(100000, timeout=0.0) + except sam.BlockError: p = None + if p != None: C_got += [p] + + try: p = Din.recv(100000, timeout=0.0) + except sam.BlockError: p = None + if p != None: D_got += [p] + + if len(''.join(C_got)) == len(''.join(C_recv)) and \ + len(''.join(D_got)) == len(''.join(D_recv)): + break + + if time.clock() >= end_time: + may_need_increase = True + + C_got = ''.join(C_got) + D_got = ''.join(D_got) + C_recv = ''.join(C_recv) + D_recv = ''.join(D_recv) + assert C_got == C_recv + assert D_got == D_recv + + Cin.close() + Din.close() + Cout.close() + Dout.close() + C.close() + D.close() + except: + print 'Unit test failed for samclasses.StreamSession ' + \ + '(multithreaded).' + + if may_need_increase: + print 'Try increasing multithread_wait_time.' + + traceback.print_exc(); sys.exit() + test_passed('samclasses.StreamSession (multithreaded)') + + +def test(): + print 'Tests may take several minutes each.' + print 'If the network is unreliable, tests will fail.' + print 'A test only needs to pass once to be considered successful.' + print + print 'Testing:' + + resolve_test() + raw_test1() + datagram_test1() + stream_test1() + stream_test2() + multithread_packet_test(raw=True) + multithread_stream_test() + + # Note: The datagram unit test fails, but it's apparently I2P's + # fault (the code is the same as for raw packets, and the sam + # bridge is sent all the relevant data). + # Code: multithread_packet_test(raw=False) + +if __name__ == '__main__': + test() + diff --git a/apps/sam/python/todo.txt b/apps/sam/python/todo.txt new file mode 100644 index 000000000..16963bef7 --- /dev/null +++ b/apps/sam/python/todo.txt @@ -0,0 +1,13 @@ + +Todo: + +* Deal with known bugs (see bugs.txt). +* Clean up code. + See the comments at the top of samclasses.py. +* Use a logger framework (perhaps Python module logging). +* Deal with FIXME comments. +* Unit tests for close then do something errors. + Try closing a stream at one end, and make sure the other + end gets all the data sent before closing. +* Make an event-based socket class. + (doesn't need to be asyncore -- but perhaps model it).