2007-12-02 Complication

* Commit SAM v2 patch from mkvore (thank you!)
    * Minor reformatting to preserve consistent whitespace
      in old SAM classes (new classes unaltered)
This commit is contained in:
complication
2007-12-03 04:19:25 +00:00
committed by zzz
parent 979a3e98d8
commit 17b719f3f7
8 changed files with 1011 additions and 72 deletions

View File

@ -112,12 +112,15 @@ public class SAMHandlerFactory {
case 1:
handler = new SAMv1Handler(s, verMajor, verMinor, i2cpProps);
break;
case 2:
handler = new SAMv2Handler(s, verMajor, verMinor, i2cpProps);
break;
default:
_log.error("BUG! Trying to initialize the wrong SAM version!");
throw new SAMException("BUG! (in handler instantiation)");
}
} catch (IOException e) {
_log.error("Error creating the v1 handler", e);
_log.error("Error creating the handler for version "+verMajor, e);
throw new SAMException("IOException caught during SAM handler instantiation");
}
return handler;
@ -133,15 +136,16 @@ public class SAMHandlerFactory {
|| (maxMajor == -1) || (maxMinor == -1)) {
return null;
}
if (minMajor > maxMajor) {
return null;
} else if ((minMajor == maxMajor) && (minMinor > maxMinor)) {
return null;
}
if ((minMajor >= 1) && (minMinor >= 0)) {
return "1.0";
}
if ((minMinor >= 10) || (maxMinor >= 10)) return null ;
float fminVer = (float) minMajor + (float) minMinor / 10 ;
float fmaxVer = (float) maxMajor + (float) maxMinor / 10 ;
if ( ( fminVer <= 2.0 ) && ( fmaxVer >= 2.0 ) ) return "2.0" ;
if ( ( fminVer <= 1.0 ) && ( fmaxVer >= 1.0 ) ) return "1.0" ;
return null;
}

View File

@ -15,14 +15,31 @@ import net.i2p.data.Destination;
/**
* Interface for sending streaming data to a SAM client
*/
public interface SAMStreamReceiver {
/**
* Sends the result of a stream send operation
*/
public void streamSendAnswer( int id, String result, String bufferState ) throws IOException;
/**
* Notifies that the outwards buffer is free for writing
*/
public void notifyStreamSendBufferFree( int id ) throws IOException;
/**
* Notify about a new incoming connection
*
* @param id New connection id
*/
public void notifyStreamConnection(int id, Destination dest) throws IOException;
public void notifyStreamIncomingConnection ( int id, Destination dest ) throws IOException;
/**
* Notify about a new outgoing connection
*
* @param id New connection id
*/
public void notifyStreamOutgoingConnection(int id, String result, String msg) throws IOException;
/**
* Send a byte array to a SAM client.

View File

@ -47,13 +47,13 @@ public class SAMStreamSession {
private final static Log _log = new Log(SAMStreamSession.class);
private final static int SOCKET_HANDLER_BUF_SIZE = 32768;
protected final static int SOCKET_HANDLER_BUF_SIZE = 32768;
private SAMStreamReceiver recv = null;
protected SAMStreamReceiver recv = null;
private SAMStreamSessionServer server = null;
private I2PSocketManager socketMgr = null;
protected I2PSocketManager socketMgr = null;
private Object handlersMapLock = new Object();
/** stream id (Long) to SAMStreamSessionSocketReader */
@ -65,13 +65,14 @@ public class SAMStreamSession {
private int lastNegativeId = 0;
// Can we create outgoing connections?
private boolean canCreate = false;
protected boolean canCreate = false;
/**
* should we flush every time we get a STREAM SEND, or leave that up to
* the streaming lib to decide?
*/
private boolean forceFlush = false;
protected boolean forceFlush = false;
public static String PROP_FORCE_FLUSH = "sam.forceFlush";
public static String DEFAULT_FORCE_FLUSH = "false";
@ -189,7 +190,7 @@ public class SAMStreamSession {
* @throws InterruptedIOException if the connection timeouts
* @throws I2PException if there's another I2P-related error
*/
public boolean connect(int id, String dest, Properties props) throws I2PException, ConnectException, NoRouteToHostException, DataFormatException, InterruptedIOException, SAMInvalidDirectionException {
public boolean connect ( int id, String dest, Properties props ) throws I2PException, ConnectException, NoRouteToHostException, DataFormatException, InterruptedIOException, SAMInvalidDirectionException, IOException {
if (!canCreate) {
_log.debug("Trying to create an outgoing connection using a receive-only session");
throw new SAMInvalidDirectionException("Trying to create connections through a receive-only session");
@ -208,10 +209,15 @@ public class SAMStreamSession {
opts.setConnectTimeout(60 * 1000);
_log.debug("Connecting new I2PSocket...");
// blocking connection (SAMv1)
I2PSocket i2ps = socketMgr.connect(d, opts);
createSocketHandler(i2ps, id);
recv.notifyStreamOutgoingConnection ( id, "OK", null );
return true;
}
@ -277,7 +283,7 @@ public class SAMStreamSession {
*
* @return An id associated to the socket handler
*/
private int createSocketHandler(I2PSocket s, int id) {
protected int createSocketHandler ( I2PSocket s, int id ) {
SAMStreamSessionSocketReader reader = null;
StreamSender sender = null;
if (id == 0) {
@ -285,8 +291,8 @@ public class SAMStreamSession {
}
try {
reader = new SAMStreamSessionSocketReader(s, id);
sender = new StreamSender(s, id);
reader = newSAMStreamSessionSocketReader(s, id);
sender = newStreamSender(s, id);
} catch (IOException e) {
_log.error("IOException when creating SAM STREAM session socket handler", e);
recv.stopStreamReceiving();
@ -318,7 +324,7 @@ public class SAMStreamSession {
*
* @param id Handler id
*/
private SAMStreamSessionSocketReader getSocketReader(int id) {
protected SAMStreamSessionSocketReader getSocketReader ( int id ) {
synchronized (handlersMapLock) {
return (SAMStreamSessionSocketReader)handlersMap.get(new Integer(id));
}
@ -334,7 +340,7 @@ public class SAMStreamSession {
*
* @param id Handler id
*/
private boolean checkSocketHandlerId(int id) {
protected boolean checkSocketHandlerId ( int id ) {
synchronized (handlersMapLock) {
return (!(handlersMap.get(new Integer(id)) == null));
}
@ -345,7 +351,7 @@ public class SAMStreamSession {
*
* @param id Handler id to be removed
*/
private void removeSocketHandler(int id) {
protected void removeSocketHandler ( int id ) {
SAMStreamSessionSocketReader reader = null;
StreamSender sender = null;
@ -446,7 +452,8 @@ public class SAMStreamSession {
}
_log.debug("New connection id: " + id);
recv.notifyStreamConnection(id, i2ps.getPeerDestination());
recv.notifyStreamIncomingConnection ( id, i2ps.getPeerDestination() );
} catch (I2PException e) {
_log.debug("Caught I2PException", e);
break;
@ -469,29 +476,62 @@ public class SAMStreamSession {
}
boolean setReceiveLimit ( int id, long limit, boolean nolimit )
{
_log.debug ( "Protocol v1 does not support a receive limit for streams" );
return false ;
}
/**
* SAM STREAM socket handler, running in its own thread. It forwards
* SAM STREAM socket reader, running in its own thread. It forwards
* forward data to/from an I2P socket.
*
* @author human
*/
public class SAMStreamSessionSocketReader implements Runnable {
private I2PSocket i2pSocket = null;
protected I2PSocket i2pSocket = null;
private Object runningLock = new Object();
private boolean stillRunning = true;
protected Object runningLock = new Object();
protected boolean stillRunning = true;
protected int id;
private int id;
/**
* Create a new SAM STREAM session socket reader
*
* @param s Socket to be handled
* @param id Unique id assigned to the handler
*/
public SAMStreamSessionSocketReader(I2PSocket s, int id) throws IOException {
_log.debug("Instantiating new SAM STREAM session socket handler");
public SAMStreamSessionSocketReader ( I2PSocket s, int id ) throws IOException {}
/**
* Stop a SAM STREAM session socket reader thread immediately.
*/
public void stopRunning() {}
public void run() {}
}
protected SAMStreamSessionSocketReader
newSAMStreamSessionSocketReader ( I2PSocket s, int id ) throws IOException {
return new SAMv1StreamSessionSocketReader ( s, id );
}
public class SAMv1StreamSessionSocketReader extends SAMStreamSessionSocketReader {
/**
* Create a new SAM STREAM session socket reader
*
* @param s Socket to be handled
* @param id Unique id assigned to the handler
*/
public SAMv1StreamSessionSocketReader ( I2PSocket s, int id ) throws IOException {
super(s, id);
_log.debug("Instantiating new SAM STREAM session socket reader");
i2pSocket = s;
this.id = id;
@ -507,6 +547,7 @@ public class SAMStreamSession {
if (stillRunning) {
stillRunning = false;
}
runningLock.notifyAll() ;
}
}
@ -558,7 +599,40 @@ public class SAMStreamSession {
* Lets us push data through the stream without blocking, (even after exceeding
* the I2PSocket's buffer)
*/
private class StreamSender implements Runnable {
protected class StreamSender implements Runnable {
public StreamSender ( I2PSocket s, int id ) throws IOException {}
/**
* Send bytes through the SAM STREAM session socket sender
*
* @param data Data to be sent
*
* @throws IOException if the client didnt provide enough data
*/
public void sendBytes ( InputStream in, int size ) throws IOException {}
/**
* Stop a SAM STREAM session socket sender thread immediately
*
*/
public void stopRunning() {}
/**
* Stop a SAM STREAM session socket sender gracefully: stop the
* sender thread once all pending data has been sent.
*/
public void shutDownGracefully() {}
public void run() {}
}
protected StreamSender newStreamSender ( I2PSocket s, int id ) throws IOException {
return new v1StreamSender ( s, id ) ;
}
protected class v1StreamSender extends StreamSender
{
private List _data;
private int _id;
private ByteCache _cache;
@ -567,7 +641,8 @@ public class SAMStreamSession {
private Object runningLock = new Object();
private I2PSocket i2pSocket = null;
public StreamSender(I2PSocket s, int id) throws IOException {
public v1StreamSender ( I2PSocket s, int id ) throws IOException {
super ( s, id );
_data = new ArrayList(1);
_id = id;
_cache = ByteCache.getInstance(4, 32*1024);

View File

@ -35,6 +35,8 @@ import net.i2p.util.Log;
* @author human
*/
public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramReceiver, SAMStreamReceiver {
protected int verMajorId = 1;
protected int verMinorId = 0;
private final static Log _log = new Log(SAMv1Handler.class);
@ -42,7 +44,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
private SAMRawSession rawSession = null;
private SAMDatagramSession datagramSession = null;
private SAMStreamSession streamSession = null;
protected SAMStreamSession streamSession = null;
private long _id;
private static volatile long __id = 0;
@ -74,11 +76,15 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
_id = ++__id;
_log.debug("SAM version 1 handler instantiated");
if ((this.verMajor != 1) || (this.verMinor != 0)) {
if ( ! verifVersion() ) {
throw new SAMException("BUG! Wrong protocol version!");
}
}
public boolean verifVersion() {
return ( verMajor == 1 && verMinor == 0 ) ;
}
public void handle() {
String msg = null;
String domain = null;
@ -248,7 +254,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
}
props.remove("DIRECTION");
streamSession = new SAMStreamSession(destKeystream, dir,props,this);
streamSession = newSAMStreamSession(destKeystream, dir,props);
} else {
_log.debug("Unrecognized SESSION STYLE: \"" + style +"\"");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Unrecognized SESSION STYLE\"\n");
@ -275,6 +281,13 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
}
}
SAMStreamSession newSAMStreamSession(String destKeystream, String direction, Properties props )
throws IOException, DataFormatException, SAMException
{
return new SAMStreamSession(destKeystream, direction, props, this) ;
}
/* Parse and execute a DEST message*/
private boolean execDestMessage(String opcode, Properties props) {
@ -489,7 +502,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
}
/* Parse and execute a STREAM message */
private boolean execStreamMessage(String opcode, Properties props) {
protected boolean execStreamMessage(String opcode, Properties props) {
if (streamSession == null) {
_log.error("STREAM message received, but no STREAM session exists");
return false;
@ -508,7 +521,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
}
}
private boolean execStreamSend(Properties props) {
protected boolean execStreamSend(Properties props) {
if (props == null) {
_log.debug("No parameters specified in STREAM SEND message");
return false;
@ -570,7 +583,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
}
}
private boolean execStreamConnect(Properties props) {
protected boolean execStreamConnect(Properties props) {
if (props == null) {
_log.debug("No parameters specified in STREAM CONNECT message");
return false;
@ -604,39 +617,38 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
props.remove("DESTINATION");
try {
if (!streamSession.connect(id, dest, props)) {
_log.debug("STREAM connection failed");
return false;
try {
if (!streamSession.connect(id, dest, props)) {
_log.debug("STREAM connection failed");
return false;
}
} catch (DataFormatException e) {
_log.debug("Invalid destination in STREAM CONNECT message");
notifyStreamOutgoingConnection ( id, "INVALID_KEY", null );
} catch (SAMInvalidDirectionException e) {
_log.debug("STREAM CONNECT failed: " + e.getMessage());
notifyStreamOutgoingConnection ( id, "INVALID_DIRECTION", null );
} catch (ConnectException e) {
_log.debug("STREAM CONNECT failed: " + e.getMessage());
notifyStreamOutgoingConnection ( id, "CONNECTION_REFUSED", null );
} catch (NoRouteToHostException e) {
_log.debug("STREAM CONNECT failed: " + e.getMessage());
notifyStreamOutgoingConnection ( id, "CANT_REACH_PEER", null );
} catch (InterruptedIOException e) {
_log.debug("STREAM CONNECT failed: " + e.getMessage());
notifyStreamOutgoingConnection ( id, "TIMEOUT", null );
} catch (I2PException e) {
_log.debug("STREAM CONNECT failed: " + e.getMessage());
notifyStreamOutgoingConnection ( id, "I2P_ERROR", null );
}
return writeString("STREAM STATUS RESULT=OK ID=" + id + "\n");
} catch (DataFormatException e) {
_log.debug("Invalid destination in STREAM CONNECT message");
return writeString("STREAM STATUS RESULT=INVALID_KEY ID="
+ id + "\n");
} catch (SAMInvalidDirectionException e) {
_log.debug("STREAM CONNECT failed: " + e.getMessage());
return writeString("STREAM STATUS RESULT=INVALID_DIRECTION ID="
+ id + "\n");
} catch (ConnectException e) {
_log.debug("STREAM CONNECT failed: " + e.getMessage());
return writeString("STREAM STATUS RESULT=CONNECTION_REFUSED ID="
+ id + "\n");
} catch (NoRouteToHostException e) {
_log.debug("STREAM CONNECT failed: " + e.getMessage());
return writeString("STREAM STATUS RESULT=CANT_REACH_PEER ID="
+ id + "\n");
} catch (InterruptedIOException e) {
_log.debug("STREAM CONNECT failed: " + e.getMessage());
return writeString("STREAM STATUS RESULT=TIMEOUT ID="
+ id + "\n");
} catch (I2PException e) {
_log.debug("STREAM CONNECT failed: " + e.getMessage());
return writeString("STREAM STATUS RESULT=I2P_ERROR ID="
+ id + "\n");
} catch (IOException e) {
return false ;
}
return true ;
}
private boolean execStreamClose(Properties props) {
protected boolean execStreamClose(Properties props) {
if (props == null) {
_log.debug("No parameters specified in STREAM CLOSE message");
return false;
@ -745,7 +757,41 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
}
// SAMStreamReceiver implementation
public void notifyStreamConnection(int id, Destination d) throws IOException {
public void streamSendAnswer( int id, String result, String bufferState ) throws IOException
{
if ( streamSession == null )
{
_log.error ( "BUG! Want to answer to stream SEND, but session is null!" );
throw new NullPointerException ( "BUG! STREAM session is null!" );
}
if ( !writeString ( "STREAM SEND ID=" + id
+ " RESULT=" + result
+ " STATE=" + bufferState
+ "\n" ) )
{
throw new IOException ( "Error notifying connection to SAM client" );
}
}
public void notifyStreamSendBufferFree( int id ) throws IOException
{
if ( streamSession == null )
{
_log.error ( "BUG! Stream outgoing buffer is free, but session is null!" );
throw new NullPointerException ( "BUG! STREAM session is null!" );
}
if ( !writeString ( "STREAM READY_TO_SEND ID=" + id + "\n" ) )
{
throw new IOException ( "Error notifying connection to SAM client" );
}
}
public void notifyStreamIncomingConnection(int id, Destination d) throws IOException {
if (streamSession == null) {
_log.error("BUG! Received stream connection, but session is null!");
throw new NullPointerException("BUG! STREAM session is null!");
@ -758,6 +804,28 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
}
}
public void notifyStreamOutgoingConnection ( int id, String result, String msg ) throws IOException
{
if ( streamSession == null )
{
_log.error ( "BUG! Received stream connection, but session is null!" );
throw new NullPointerException ( "BUG! STREAM session is null!" );
}
String msgString = "" ;
if ( msg != null ) msgString = " MESSAGE=\"" + msg + "\"";
if ( !writeString ( "STREAM STATUS RESULT="
+ result
+ " ID=" + id
+ msgString
+ "\n" ) )
{
throw new IOException ( "Error notifying connection to SAM client" );
}
}
public void receiveStreamBytes(int id, byte data[], int len) throws IOException {
if (streamSession == null) {
_log.error("Received stream bytes, but session is null!");

View File

@ -0,0 +1,196 @@
package net.i2p.sam;
/*
* free (adj.): unencumbered; not under the control of others
* Written by human in 2004 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.net.Socket;
import java.util.Properties;
import java.util.StringTokenizer;
import net.i2p.I2PException;
import net.i2p.client.I2PSessionException;
import net.i2p.data.Base64;
import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper;
import net.i2p.data.Destination;
import net.i2p.util.Log;
/**
* Class able to handle a SAM version 2 client connection.
*
* @author mkvore
*/
public class SAMv2Handler extends SAMv1Handler implements SAMRawReceiver, SAMDatagramReceiver, SAMStreamReceiver
{
private final static Log _log = new Log ( SAMv2Handler.class );
/**
* Create a new SAM version 2 handler. This constructor expects
* that the SAM HELLO message has been still answered (and
* stripped) from the socket input stream.
*
* @param s Socket attached to a SAM client
* @param verMajor SAM major version to manage (should be 2)
* @param verMinor SAM minor version to manage
*/
public SAMv2Handler ( Socket s, int verMajor, int verMinor ) throws SAMException, IOException
{
this ( s, verMajor, verMinor, new Properties() );
}
/**
* Create a new SAM version 2 handler. This constructor expects
* that the SAM HELLO message has been still answered (and
* stripped) from the socket input stream.
*
* @param s Socket attached to a SAM client
* @param verMajor SAM major version to manage (should be 2)
* @param verMinor SAM minor version to manage
* @param i2cpProps properties to configure the I2CP connection (host, port, etc)
*/
public SAMv2Handler ( Socket s, int verMajor, int verMinor, Properties i2cpProps ) throws SAMException, IOException
{
super ( s, verMajor, verMinor, i2cpProps );
}
public boolean verifVersion()
{
return (verMajor == 2 && verMinor == 0) ;
}
SAMStreamSession newSAMStreamSession(String destKeystream, String direction, Properties props )
throws IOException, DataFormatException, SAMException
{
return new SAMv2StreamSession(destKeystream, direction, props, this) ;
}
/* Parse and execute a STREAM message */
protected boolean execStreamMessage ( String opcode, Properties props )
{
if ( streamSession == null )
{
_log.error ( "STREAM message received, but no STREAM session exists" );
return false;
}
if ( opcode.equals ( "SEND" ) )
{
return execStreamSend ( props );
}
else if ( opcode.equals ( "CONNECT" ) )
{
return execStreamConnect ( props );
}
else if ( opcode.equals ( "CLOSE" ) )
{
return execStreamClose ( props );
}
else if ( opcode.equals ( "RECEIVE") )
{
return execStreamReceive( props );
}
else
{
_log.debug ( "Unrecognized RAW message opcode: \""
+ opcode + "\"" );
return false;
}
}
private boolean execStreamReceive ( Properties props )
{
if ( props == null )
{
_log.debug ( "No parameters specified in STREAM RECEIVE message" );
return false;
}
int id;
{
String strid = props.getProperty ( "ID" );
if ( strid == null )
{
_log.debug ( "ID not specified in STREAM RECEIVE message" );
return false;
}
try
{
id = Integer.parseInt ( strid );
}
catch ( NumberFormatException e )
{
_log.debug ( "Invalid STREAM RECEIVE ID specified: " + strid );
return false;
}
}
boolean nolimit = false;
long limit = 0;
{
String strsize = props.getProperty ( "LIMIT" );
if ( strsize == null )
{
_log.debug ( "Limit not specified in STREAM RECEIVE message" );
return false;
}
if ( strsize.equals( "NONE" ) )
{
nolimit = true ;
}
else
{
try
{
limit = Long.parseLong ( strsize );
}
catch ( NumberFormatException e )
{
_log.debug ( "Invalid STREAM RECEIVE size specified: " + strsize );
return false;
}
if ( limit < 0 )
{
_log.debug ( "Specified limit (" + limit
+ ") is out of protocol limits" );
return false;
}
}
}
streamSession.setReceiveLimit ( id, limit, nolimit ) ;
return true;
}
}

View File

@ -0,0 +1,574 @@
package net.i2p.sam;
/*
* free (adj.): unencumbered; not under the control of others
* Written by human in 2004 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import net.i2p.I2PException;
import net.i2p.client.I2PClient;
import net.i2p.client.streaming.I2PServerSocket;
import net.i2p.client.streaming.I2PSocket;
import net.i2p.client.streaming.I2PSocketManager;
import net.i2p.client.streaming.I2PSocketManagerFactory;
import net.i2p.client.streaming.I2PSocketOptions;
import net.i2p.data.Base64;
import net.i2p.data.ByteArray;
import net.i2p.data.DataHelper;
import net.i2p.data.DataFormatException;
import net.i2p.data.Destination;
import net.i2p.util.ByteCache;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
/**
* SAMv2 STREAM session class.
*
* @author mkvore
*/
public class SAMv2StreamSession extends SAMStreamSession
{
private final static Log _log = new Log ( SAMv2StreamSession.class );
/**
* Create a new SAM STREAM session.
*
* @param dest Base64-encoded destination (private key)
* @param dir Session direction ("RECEIVE", "CREATE" or "BOTH")
* @param props Properties to setup the I2P session
* @param recv Object that will receive incoming data
*/
public SAMv2StreamSession ( String dest, String dir, Properties props,
SAMStreamReceiver recv ) throws IOException, DataFormatException, SAMException
{
super ( dest, dir, props, recv );
}
/**
* Create a new SAM STREAM session.
*
* @param destStream Input stream containing the destination keys
* @param dir Session direction ("RECEIVE", "CREATE" or "BOTH")
* @param props Properties to setup the I2P session
* @param recv Object that will receive incoming data
*/
public SAMv2StreamSession ( InputStream destStream, String dir,
Properties props, SAMStreamReceiver recv ) throws IOException, DataFormatException, SAMException
{
super ( destStream, dir, props, recv );
}
/**
* Connect the SAM STREAM session to the specified Destination
*
* @param id Unique id for the connection
* @param dest Base64-encoded Destination to connect to
* @param props Options to be used for connection
*
* @throws DataFormatException if the destination is not valid
* @throws SAMInvalidDirectionException if trying to connect through a
* receive-only session
* @return true if the communication with the SAM client is ok
*/
public boolean connect ( int id, String dest, Properties props )
throws DataFormatException, SAMInvalidDirectionException
{
if ( !canCreate )
{
_log.debug ( "Trying to create an outgoing connection using a receive-only session" );
throw new SAMInvalidDirectionException ( "Trying to create connections through a receive-only session" );
}
if ( checkSocketHandlerId ( id ) )
{
_log.debug ( "The specified id (" + id + ") is already in use" );
return false ;
}
Destination d = new Destination();
d.fromBase64 ( dest );
I2PSocketOptions opts = socketMgr.buildOptions ( props );
if ( props.getProperty ( I2PSocketOptions.PROP_CONNECT_TIMEOUT ) == null )
opts.setConnectTimeout ( 60 * 1000 );
_log.debug ( "Connecting new I2PSocket..." );
// non-blocking connection (SAMv2)
StreamConnector connector ;
connector = new StreamConnector ( id, d, opts );
I2PThread connectThread = new I2PThread ( connector, "StreamConnector" + id ) ;
connectThread.start() ;
return true ;
}
/**
* SAM STREAM socket connecter, running in its own thread.
*
* @author mkvore
*/
public class StreamConnector implements Runnable
{
private Object runningLock = new Object();
private boolean stillRunning = true;
private int id;
private Destination dest ;
private I2PSocketOptions opts ;
/**
* Create a new SAM STREAM session socket reader
*
* @param id Unique id assigned to the handler
* @param dest Destination to reach
* @param opts Socket options (I2PSocketOptions)
*/
public StreamConnector ( int id, Destination dest, I2PSocketOptions opts )// throws IOException
{
_log.debug ( "Instantiating new SAM STREAM connector" );
this.id = id ;
this.opts = opts ;
this.dest = dest ;
}
public void run()
{
_log.debug ( "run() called for socket connector " + id );
try
{
try
{
I2PSocket i2ps = socketMgr.connect ( dest, opts );
createSocketHandler ( i2ps, id );
recv.notifyStreamOutgoingConnection ( id, "OK", null );
}
catch ( DataFormatException e )
{
_log.debug ( "Invalid destination in STREAM CONNECT message" );
recv.notifyStreamOutgoingConnection ( id, "INVALID_KEY", e.getMessage() );
}
catch ( ConnectException e )
{
_log.debug ( "STREAM CONNECT failed: " + e.getMessage() );
recv.notifyStreamOutgoingConnection ( id, "CONNECTION_REFUSED", e.getMessage() );
}
catch ( NoRouteToHostException e )
{
_log.debug ( "STREAM CONNECT failed: " + e.getMessage() );
recv.notifyStreamOutgoingConnection ( id, "CANT_REACH_PEER", e.getMessage() );
}
catch ( InterruptedIOException e )
{
_log.debug ( "STREAM CONNECT failed: " + e.getMessage() );
recv.notifyStreamOutgoingConnection ( id, "TIMEOUT", e.getMessage() );
}
catch ( I2PException e )
{
_log.debug ( "STREAM CONNECT failed: " + e.getMessage() );
recv.notifyStreamOutgoingConnection ( id, "I2P_ERROR", e.getMessage() );
}
}
catch ( IOException e )
{
_log.debug ( "Error sending disconnection notice for handler "
+ id, e );
}
_log.debug ( "Shutting down SAM STREAM session connector " + id );
}
}
/**
* Lets us push data through the stream without blocking, (even after exceeding
* the I2PSocket's buffer)
*/
protected StreamSender newStreamSender ( I2PSocket s, int id ) throws IOException
{
return new v2StreamSender ( s, id ) ;
}
protected SAMStreamSessionSocketReader
newSAMStreamSessionSocketReader(I2PSocket s, int id ) throws IOException
{
return new SAMv2StreamSessionSocketReader(s,id);
}
protected class v2StreamSender extends StreamSender
{
private List _data;
private int _dataSize;
private int _id;
private ByteCache _cache;
private OutputStream _out = null;
private boolean _stillRunning, _shuttingDownGracefully;
private Object runningLock = new Object();
private I2PSocket i2pSocket = null;
public v2StreamSender ( I2PSocket s, int id ) throws IOException
{
super ( s, id );
_data = new ArrayList ( 1 );
_dataSize = 0;
_id = id;
_cache = ByteCache.getInstance ( 10, 32 * 1024 );
_out = s.getOutputStream();
_stillRunning = true;
_shuttingDownGracefully = false;
i2pSocket = s;
}
/**
* Send bytes through the SAM STREAM session socket sender
*
* @param data Data to be sent
*
* @throws IOException if the client didnt provide enough data
*/
public void sendBytes ( InputStream in, int size ) throws IOException
{
if ( _log.shouldLog ( Log.DEBUG ) )
_log.debug ( "Handler " + _id + ": sending " + size + " bytes" );
ByteArray ba = _cache.acquire();
int read = DataHelper.read ( in, ba.getData(), 0, size );
if ( read != size )
throw new IOException ( "Insufficient data from the SAM client (" + read + "/" + size + ")" );
ba.setValid ( read );
synchronized ( _data )
{
if ( _dataSize >= SOCKET_HANDLER_BUF_SIZE )
{
_cache.release ( ba, false );
recv.streamSendAnswer ( _id, "FAILED", "BUFFER_FULL" ) ;
}
else
{
_dataSize += size ;
_data.add ( ba );
_data.notifyAll();
if ( _dataSize >= SOCKET_HANDLER_BUF_SIZE )
{
recv.streamSendAnswer ( _id, "OK", "BUFFER_FULL" ) ;
}
else
{
recv.streamSendAnswer ( _id, "OK", "READY" );
}
}
}
}
/**
* Stop a SAM STREAM session socket sender thread immediately
*
*/
public void stopRunning()
{
_log.debug ( "stopRunning() invoked on socket sender " + _id );
synchronized ( runningLock )
{
if ( _stillRunning )
{
_stillRunning = false;
try
{
i2pSocket.close();
}
catch ( IOException e )
{
_log.debug ( "Caught IOException", e );
}
synchronized ( _data )
{
_data.clear();
_data.notifyAll();
}
}
}
}
/**
* Stop a SAM STREAM session socket sender gracefully: stop the
* sender thread once all pending data has been sent.
*/
public void shutDownGracefully()
{
_log.debug ( "shutDownGracefully() invoked on socket sender " + _id );
_shuttingDownGracefully = true;
}
public void run()
{
_log.debug ( "run() called for socket sender " + _id );
ByteArray data = null;
while ( _stillRunning )
{
data = null;
try
{
synchronized ( _data )
{
if ( _data.size() > 0 )
{
int formerSize = _dataSize ;
data = ( ByteArray ) _data.remove ( 0 );
_dataSize -= data.getValid();
if ( ( formerSize >= SOCKET_HANDLER_BUF_SIZE ) && ( _dataSize < SOCKET_HANDLER_BUF_SIZE ) )
recv.notifyStreamSendBufferFree ( _id );
}
else if ( _shuttingDownGracefully )
{
/* No data left and shutting down gracefully?
If so, stop the sender. */
stopRunning();
break;
}
else
{
/* Wait for data. */
_data.wait ( 5000 );
}
}
if ( data != null )
{
try
{
_out.write ( data.getData(), 0, data.getValid() );
if ( forceFlush )
{
// i dont like doing this, but it clears the buffer issues
_out.flush();
}
}
catch ( IOException ioe )
{
// ok, the stream failed, but the SAM client didn't
if ( _log.shouldLog ( Log.WARN ) )
_log.warn ( "Stream failed", ioe );
removeSocketHandler ( _id );
stopRunning();
}
finally
{
_cache.release ( data, false );
}
}
}
catch ( InterruptedException ie ) {}
catch ( IOException e ) {}}
synchronized ( _data )
{
_data.clear();
}
}
}
/**
* Send bytes through a SAM STREAM session.
*
* @param data Bytes to be sent
*
* @return True if the data was queued for sending, false otherwise
*/
public boolean setReceiveLimit ( int id, long limit, boolean nolimit )
{
SAMStreamSessionSocketReader reader = getSocketReader ( id );
if ( reader == null )
{
if ( _log.shouldLog ( Log.WARN ) )
_log.warn ( "Trying to set a limit to a nonexistent reader " + id );
return false;
}
( (SAMv2StreamSessionSocketReader) reader).setLimit ( limit, nolimit );
return true;
}
/**
* SAM STREAM socket reader, running in its own thread. It forwards
* forward data to/from an I2P socket.
*
* @author human
*/
public class SAMv2StreamSessionSocketReader extends SAMv1StreamSessionSocketReader
{
protected boolean nolimit ;
protected long limit ;
protected long totalReceived ;
/**
* Create a new SAM STREAM session socket reader
*
* @param s Socket to be handled
* @param id Unique id assigned to the handler
*/
public SAMv2StreamSessionSocketReader ( I2PSocket s, int id ) throws IOException
{
super ( s, id );
nolimit = false ;
limit = 0 ;
totalReceived = 0 ;
}
public void setLimit ( long limit, boolean nolimit )
{
synchronized (runningLock)
{
this.limit = limit ;
this.nolimit = nolimit ;
runningLock.notify() ;
}
_log.debug ( "new limit set for socket reader " + id + " : " + (nolimit ? "NOLIMIT" : limit + " bytes" ) );
}
public void run()
{
_log.debug ( "run() called for socket reader " + id );
int read = -1;
byte[] data = new byte[SOCKET_HANDLER_BUF_SIZE];
try
{
InputStream in = i2pSocket.getInputStream();
while ( stillRunning )
{
synchronized (runningLock)
{
while ( stillRunning && ( !nolimit && totalReceived >= limit) )
{
try{
runningLock.wait() ;
}
catch (InterruptedException ie)
{}
}
if ( !stillRunning )
break ;
}
read = in.read ( data );
if ( read == -1 )
{
_log.debug ( "Handler " + id + ": connection closed" );
break;
}
totalReceived += read ;
recv.receiveStreamBytes ( id, data, read );
}
}
catch ( IOException e )
{
_log.debug ( "Caught IOException", e );
}
try
{
i2pSocket.close();
}
catch ( IOException e )
{
_log.debug ( "Caught IOException", e );
}
if ( stillRunning )
{
removeSocketHandler ( id );
// FIXME: we need error reporting here!
try
{
recv.notifyStreamDisconnection ( id, "OK", null );
}
catch ( IOException e )
{
_log.debug ( "Error sending disconnection notice for handler "
+ id, e );
}
}
_log.debug ( "Shutting down SAM STREAM session socket handler " + id );
}
}
}

View File

@ -1,4 +1,9 @@
$Id: history.txt,v 1.598 2007-11-26 16:54:00 zzz Exp $
$Id: history.txt,v 1.599 2007-12-01 22:13:15 complication Exp $
2007-12-02 Complication
* Commit SAM v2 patch from mkvore (thank you!)
* Minor reformatting to preserve consistent whitespace
in old SAM classes (new classes unaltered)
2007-12-01 Complication
* Separate the checks "does Jetty .zip file need downloading"

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.533 $ $Date: 2007-11-26 16:53:58 $";
public final static String ID = "$Revision: 1.534 $ $Date: 2007-12-01 22:13:18 $";
public final static String VERSION = "0.6.1.30";
public final static long BUILD = 5;
public final static long BUILD = 6;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID);