2004-10-24 01:42:34 +00:00
/ *
* licensed under BSD license . . .
* ( if you know the proper clause for that , add it . . . )
* /
package net.i2p.client.streaming ;
import java.io.ByteArrayOutputStream ;
import java.io.IOException ;
import java.io.InterruptedIOException ;
import java.io.UnsupportedEncodingException ;
import java.net.ConnectException ;
import java.net.NoRouteToHostException ;
2005-01-06 00:17:53 +00:00
import java.util.ArrayList ;
2004-10-24 01:42:34 +00:00
import java.util.HashMap ;
import java.util.HashSet ;
import java.util.Iterator ;
2005-01-06 00:17:53 +00:00
import java.util.List ;
2004-10-24 23:00:44 +00:00
import java.util.Properties ;
2004-10-24 01:42:34 +00:00
import java.util.Set ;
import net.i2p.I2PAppContext ;
import net.i2p.I2PException ;
import net.i2p.client.I2PSession ;
import net.i2p.client.I2PSessionException ;
import net.i2p.client.I2PSessionListener ;
import net.i2p.data.Base64 ;
import net.i2p.data.DataFormatException ;
import net.i2p.data.Destination ;
import net.i2p.util.Log ;
/ * *
* Centralize the coordination and multiplexing of the local client ' s streaming .
* There should be one I2PSocketManager for each I2PSession , and if an application
* is sending and receiving data through the streaming library using an
* I2PSocketManager , it should not attempt to call I2PSession ' s setSessionListener
* or receive any messages with its . receiveMessage
*
* /
2004-11-25 21:57:19 +00:00
class I2PSocketManagerImpl implements I2PSocketManager , I2PSessionListener {
2004-10-24 01:42:34 +00:00
private I2PAppContext _context ;
private Log _log ;
private I2PSession _session ;
private I2PServerSocketImpl _serverSocket = null ;
private Object lock = new Object ( ) ; // for locking socket lists
private HashMap _outSockets ;
private HashMap _inSockets ;
private I2PSocketOptions _defaultOptions ;
private long _acceptTimeout ;
private String _name ;
2005-01-06 00:17:53 +00:00
private List _listeners ;
2004-10-24 01:42:34 +00:00
private static int __managerId = 0 ;
public static final short ACK = 0x51 ;
public static final short CLOSE_OUT = 0x52 ;
public static final short DATA_OUT = 0x50 ;
public static final short SYN = 0xA1 ;
public static final short CLOSE_IN = 0xA2 ;
public static final short DATA_IN = 0xA0 ;
public static final short CHAFF = 0xFF ;
/ * *
* How long to wait for the client app to accept ( ) before sending back CLOSE ?
* This includes the time waiting in the queue . Currently set to 5 seconds .
* /
private static final long ACCEPT_TIMEOUT_DEFAULT = 5 * 1000 ;
public I2PSocketManagerImpl ( ) {
this ( " SocketManager " + ( + + __managerId ) ) ;
}
public I2PSocketManagerImpl ( String name ) {
2004-10-24 23:00:44 +00:00
init ( I2PAppContext . getGlobalContext ( ) , null , null , name ) ;
}
public void init ( I2PAppContext context , I2PSession session , Properties opts , String name ) {
2004-10-24 01:42:34 +00:00
_name = name ;
2004-10-24 23:00:44 +00:00
_context = context ;
_log = _context . logManager ( ) . getLog ( I2PSocketManager . class ) ;
2004-10-24 01:42:34 +00:00
_inSockets = new HashMap ( 16 ) ;
_outSockets = new HashMap ( 16 ) ;
_acceptTimeout = ACCEPT_TIMEOUT_DEFAULT ;
2005-01-06 00:17:53 +00:00
_listeners = new ArrayList ( 1 ) ;
2004-10-24 23:00:44 +00:00
setSession ( session ) ;
2004-11-16 22:11:11 +00:00
setDefaultOptions ( buildOptions ( opts ) ) ;
2004-10-24 01:42:34 +00:00
_context . statManager ( ) . createRateStat ( " streaming.lifetime " , " How long before the socket is closed? " , " streaming " , new long [ ] { 10 * 60 * 1000 , 60 * 60 * 1000 , 24 * 60 * 60 * 1000 } ) ;
_context . statManager ( ) . createRateStat ( " streaming.sent " , " How many bytes are sent in the stream? " , " streaming " , new long [ ] { 10 * 60 * 1000 , 60 * 60 * 1000 , 24 * 60 * 60 * 1000 } ) ;
_context . statManager ( ) . createRateStat ( " streaming.received " , " How many bytes are received in the stream? " , " streaming " , new long [ ] { 10 * 60 * 1000 , 60 * 60 * 1000 , 24 * 60 * 60 * 1000 } ) ;
_context . statManager ( ) . createRateStat ( " streaming.transferBalance " , " How many streams send more than they receive (positive means more sent, negative means more received)? " , " streaming " , new long [ ] { 10 * 60 * 1000 , 60 * 60 * 1000 , 24 * 60 * 60 * 1000 } ) ;
_context . statManager ( ) . createRateStat ( " streaming.synNoAck " , " How many times have we sent a SYN but not received an ACK? " , " streaming " , new long [ ] { 10 * 60 * 1000 , 60 * 60 * 1000 , 24 * 60 * 60 * 1000 } ) ;
_context . statManager ( ) . createRateStat ( " streaming.ackSendFailed " , " How many times have we tried to send an ACK to a SYN and failed? " , " streaming " , new long [ ] { 10 * 60 * 1000 , 60 * 60 * 1000 , 24 * 60 * 60 * 1000 } ) ;
_context . statManager ( ) . createRateStat ( " streaming.nackSent " , " How many times have we refused a SYN with a NACK? " , " streaming " , new long [ ] { 10 * 60 * 1000 , 60 * 60 * 1000 , 24 * 60 * 60 * 1000 } ) ;
_context . statManager ( ) . createRateStat ( " streaming.nackReceived " , " How many times have we received a NACK to our SYN? " , " streaming " , new long [ ] { 10 * 60 * 1000 , 60 * 60 * 1000 , 24 * 60 * 60 * 1000 } ) ;
}
public I2PSession getSession ( ) {
return _session ;
}
public void setSession ( I2PSession session ) {
_session = session ;
if ( session ! = null ) session . setSessionListener ( this ) ;
}
/ * *
* How long should we wait for the client to . accept ( ) a socket before
* sending back a NACK / Close ?
*
* @param ms milliseconds to wait , maximum
* /
public void setAcceptTimeout ( long ms ) { _acceptTimeout = ms ; }
public long getAcceptTimeout ( ) { return _acceptTimeout ; }
public void disconnected ( I2PSession session ) {
_log . info ( getName ( ) + " : Disconnected from the session " ) ;
destroySocketManager ( ) ;
2005-01-06 00:17:53 +00:00
List listeners = null ;
synchronized ( _listeners ) {
listeners = new ArrayList ( _listeners ) ;
_listeners . clear ( ) ;
}
for ( int i = 0 ; i < listeners . size ( ) ; i + + ) {
2005-01-26 06:29:17 +00:00
I2PSocketManager . DisconnectListener lsnr = ( I2PSocketManager . DisconnectListener ) listeners . get ( i ) ;
2005-01-06 00:17:53 +00:00
lsnr . sessionDisconnected ( ) ;
}
2004-10-24 01:42:34 +00:00
}
public void errorOccurred ( I2PSession session , String message , Throwable error ) {
_log . error ( getName ( ) + " : Error occurred: [ " + message + " ] " , error ) ;
}
public void messageAvailable ( I2PSession session , int msgId , long size ) {
try {
I2PSocketImpl s ;
byte msg [ ] = session . receiveMessage ( msgId ) ;
if ( msg . length = = 1 & & msg [ 0 ] = = - 1 ) {
_log . debug ( getName ( ) + " : Ping received " ) ;
return ;
}
if ( msg . length < 4 ) {
_log . warn ( getName ( ) + " : ==== packet too short ==== " ) ;
return ;
}
int type = msg [ 0 ] & 0xff ;
String id = toString ( new byte [ ] { msg [ 1 ] , msg [ 2 ] , msg [ 3 ] } ) ;
byte [ ] payload = new byte [ msg . length - 4 ] ;
System . arraycopy ( msg , 4 , payload , 0 , payload . length ) ;
if ( _log . shouldLog ( Log . DEBUG ) )
_log . debug ( getName ( ) + " : Message read: type = [ " + Integer . toHexString ( type )
+ " ] id = [ " + getReadableForm ( id )
+ " ] payload length: [ " + payload . length + " ] " ) ;
switch ( type ) {
case ACK :
ackAvailable ( id , payload ) ;
return ;
case CLOSE_OUT :
disconnectAvailable ( id , payload ) ;
return ;
case DATA_OUT :
sendOutgoingAvailable ( id , payload ) ;
return ;
case SYN :
synIncomingAvailable ( id , payload , session ) ;
return ;
case CLOSE_IN :
disconnectIncoming ( id , payload ) ;
return ;
case DATA_IN :
sendIncoming ( id , payload ) ;
case CHAFF :
// ignore
return ;
default :
handleUnknown ( type , id , payload ) ;
return ;
}
} catch ( I2PException ise ) {
_log . warn ( getName ( ) + " : Error processing " , ise ) ;
} catch ( IllegalStateException ise ) {
_log . debug ( getName ( ) + " : Error processing " , ise ) ;
}
}
/ * *
* We ' ve received an ACK packet ( hopefully , in response to a SYN that we
* recently sent out ) . Notify the associated I2PSocket that we now have
* the remote stream ID ( which should get things going , since the handshake
* is complete ) .
*
* /
private void ackAvailable ( String id , byte payload [ ] ) {
long begin = _context . clock ( ) . now ( ) ;
I2PSocketImpl s = null ;
synchronized ( lock ) {
s = ( I2PSocketImpl ) _outSockets . get ( id ) ;
}
if ( s = = null ) {
_log . warn ( getName ( ) + " : No socket responsible for ACK packet for id " + getReadableForm ( id ) ) ;
return ;
}
long socketRetrieved = _context . clock ( ) . now ( ) ;
String remoteId = null ;
remoteId = s . getRemoteID ( false ) ;
if ( ( payload . length = = 3 ) & & ( remoteId = = null ) ) {
String newID = toString ( payload ) ;
long beforeSetRemId = _context . clock ( ) . now ( ) ;
s . setRemoteID ( newID ) ;
if ( _log . shouldLog ( Log . DEBUG ) ) {
_log . debug ( getName ( ) + " : ackAvailable - socket retrieval took "
+ ( socketRetrieved - begin ) + " ms, getRemoteId took "
+ ( beforeSetRemId - socketRetrieved ) + " ms, setRemoteId took "
+ ( _context . clock ( ) . now ( ) - beforeSetRemId ) + " ms " ) ;
}
return ;
} else {
// (payload.length != 3 || getRemoteId != null)
if ( _log . shouldLog ( Log . WARN ) ) {
if ( payload . length ! = 3 )
_log . warn ( getName ( ) + " : Ack packet had " + payload . length + " bytes " ) ;
else
_log . warn ( getName ( ) + " : Remote ID already exists? " + remoteId ) ;
}
if ( _log . shouldLog ( Log . DEBUG ) ) {
_log . debug ( getName ( ) + " : invalid ack - socket retrieval took "
+ ( socketRetrieved - begin ) + " ms, overall took "
+ ( _context . clock ( ) . now ( ) - begin ) + " ms " ) ;
}
return ;
}
}
/ * *
* We received a disconnect packet , telling us to tear down the specified
* stream .
* /
private void disconnectAvailable ( String id , byte payload [ ] ) {
I2PSocketImpl s = null ;
synchronized ( lock ) {
s = ( I2PSocketImpl ) _outSockets . get ( id ) ;
}
_log . debug ( getName ( ) + " : *Disconnect outgoing for socket " + s + " on id "
+ getReadableForm ( id ) ) ;
try {
if ( s ! = null ) {
if ( payload . length > 0 ) {
_log . debug ( getName ( ) + " : Disconnect packet had "
+ payload . length + " bytes " ) ;
}
if ( s . getRemoteID ( false ) = = null ) {
s . setRemoteID ( null ) ; // Just to wake up socket
return ;
}
s . internalClose ( ) ;
synchronized ( lock ) {
_outSockets . remove ( id ) ;
}
}
return ;
} catch ( Exception t ) {
_log . warn ( getName ( ) + " : Ignoring error on disconnect for socket " + s , t ) ;
}
}
/ * *
* We ' ve received data on a stream we created - toss the data onto
* the socket for handling .
*
* @throws IllegalStateException if the socket isn ' t open or isn ' t known
* /
private void sendOutgoingAvailable ( String id , byte payload [ ] ) throws IllegalStateException {
I2PSocketImpl s = null ;
synchronized ( lock ) {
s = ( I2PSocketImpl ) _outSockets . get ( id ) ;
}
// packet send outgoing
if ( _log . shouldLog ( Log . DEBUG ) )
_log . debug ( getName ( ) + " : *Packet send outgoing [ " + payload . length + " ] for socket "
+ s + " on id " + getReadableForm ( id ) ) ;
if ( s ! = null ) {
s . queueData ( payload ) ;
return ;
} else {
if ( _log . shouldLog ( Log . WARN ) )
_log . warn ( getName ( ) + " : Null socket with data available " ) ;
throw new IllegalStateException ( " Null socket with data available " ) ;
}
}
/ * *
* We ' ve received a SYN packet ( a request for a new stream ) . If the client has
* said they want incoming sockets ( by retrieving the serverSocket ) , the stream
* will be ACKed , but if they have not , they ' ll be NACKed )
*
* @throws DataFormatException if the destination in the SYN was invalid
* @throws I2PSessionException if there was an I2P error sending the ACK or NACK
* /
private void synIncomingAvailable ( String id , byte payload [ ] , I2PSession session )
throws DataFormatException , I2PSessionException {
Destination d = new Destination ( ) ;
d . fromByteArray ( payload ) ;
I2PSocketImpl s = null ;
boolean acceptConnections = ( _serverSocket ! = null ) ;
String newLocalID = null ;
synchronized ( lock ) {
newLocalID = makeID ( _inSockets ) ;
if ( acceptConnections ) {
s = new I2PSocketImpl ( d , this , false , newLocalID ) ;
s . setRemoteID ( id ) ;
}
}
_log . debug ( getName ( ) + " : *Syn! for socket " + s + " on id " + getReadableForm ( newLocalID )
+ " from " + d . calculateHash ( ) . toBase64 ( ) . substring ( 0 , 6 ) ) ;
if ( ! acceptConnections ) {
// The app did not instantiate an I2PServerSocket
byte [ ] packet = makePacket ( ( byte ) CLOSE_OUT , id , toBytes ( newLocalID ) ) ;
boolean replySentOk = false ;
synchronized ( _session ) {
replySentOk = _session . sendMessage ( d , packet ) ;
}
if ( ! replySentOk ) {
_log . warn ( getName ( ) + " : Error sending close to " + d . calculateHash ( ) . toBase64 ( )
+ " in response to a new con message " ,
new Exception ( " Failed creation " ) ) ;
}
_context . statManager ( ) . addRateData ( " streaming.nackSent " , 1 , 1 ) ;
return ;
}
if ( _serverSocket . addWaitForAccept ( s , _acceptTimeout ) ) {
_inSockets . put ( newLocalID , s ) ;
byte [ ] packet = makePacket ( ( byte ) ACK , id , toBytes ( newLocalID ) ) ;
boolean replySentOk = false ;
replySentOk = _session . sendMessage ( d , packet ) ;
if ( ! replySentOk ) {
if ( _log . shouldLog ( Log . WARN ) )
_log . warn ( getName ( ) + " : Error sending reply to " + d . calculateHash ( ) . toBase64 ( )
+ " in response to a new con message for socket " + s ,
new Exception ( " Failed creation " ) ) ;
s . internalClose ( ) ;
_context . statManager ( ) . addRateData ( " streaming.ackSendFailed " , 1 , 1 ) ;
}
} else {
// timed out or serverSocket closed
byte [ ] packet = toBytes ( " " + id ) ;
packet [ 0 ] = CLOSE_OUT ;
boolean nackSent = session . sendMessage ( d , packet ) ;
if ( ! nackSent ) {
_log . warn ( getName ( ) + " : Error sending NACK for session creation for socket " + s ) ;
}
s . internalClose ( ) ;
_context . statManager ( ) . addRateData ( " streaming,nackSent " , 1 , 1 ) ;
}
return ;
}
/ * *
* We ' ve received a disconnect for a socket we didn ' t initiate , so kill
* the socket .
*
* /
private void disconnectIncoming ( String id , byte payload [ ] ) {
I2PSocketImpl s = null ;
synchronized ( lock ) {
s = ( I2PSocketImpl ) _inSockets . get ( id ) ;
if ( payload . length = = 0 & & s ! = null ) {
_inSockets . remove ( id ) ;
}
}
_log . debug ( getName ( ) + " : *Disconnect incoming for socket " + s ) ;
try {
if ( payload . length = = 0 & & s ! = null ) {
s . internalClose ( ) ;
return ;
} else {
if ( ( payload . length > 0 ) & & ( _log . shouldLog ( Log . ERROR ) ) )
_log . warn ( getName ( ) + " : Disconnect packet had " + payload . length + " bytes " ) ;
if ( s ! = null )
s . internalClose ( ) ;
return ;
}
} catch ( Exception t ) {
_log . warn ( getName ( ) + " : Ignoring error on disconnect " , t ) ;
return ;
}
}
/ * *
* We ' ve received data on a stream we received - toss the data onto
* the socket for handling .
*
* @throws IllegalStateException if the socket isn ' t open or isn ' t known
* /
private void sendIncoming ( String id , byte payload [ ] ) throws IllegalStateException {
I2PSocketImpl s = null ;
synchronized ( lock ) {
s = ( I2PSocketImpl ) _inSockets . get ( id ) ;
}
if ( _log . shouldLog ( Log . DEBUG ) )
_log . debug ( getName ( ) + " : *Packet send incoming [ " + payload . length + " ] for socket " + s ) ;
if ( s ! = null ) {
s . queueData ( payload ) ;
return ;
} else {
_log . info ( getName ( ) + " : Null socket with data available " ) ;
throw new IllegalStateException ( " Null socket with data available " ) ;
}
}
/ * *
* Unknown packet . moo .
*
* /
private void handleUnknown ( int type , String id , byte payload [ ] ) {
_log . error ( getName ( ) + " : \ n \ n=============== Unknown packet! " + " ============ "
+ " \ nType: " + ( int ) type
+ " \ nID: " + getReadableForm ( id )
+ " \ nBase64'ed Data: " + Base64 . encode ( payload )
+ " \ n \ n \ n " ) ;
if ( id ! = null ) {
synchronized ( lock ) {
_inSockets . remove ( id ) ;
_outSockets . remove ( id ) ;
}
}
}
public void reportAbuse ( I2PSession session , int severity ) {
_log . error ( getName ( ) + " : Abuse reported [ " + severity + " ] " ) ;
}
public void setDefaultOptions ( I2PSocketOptions options ) {
_defaultOptions = options ;
}
public I2PSocketOptions getDefaultOptions ( ) {
return _defaultOptions ;
}
2004-11-16 22:11:11 +00:00
public I2PSocketOptions buildOptions ( ) { return buildOptions ( null ) ; }
public I2PSocketOptions buildOptions ( Properties opts ) {
return new I2PSocketOptionsImpl ( opts ) ;
}
2004-10-24 01:42:34 +00:00
public I2PServerSocket getServerSocket ( ) {
if ( _serverSocket = = null ) {
_serverSocket = new I2PServerSocketImpl ( this ) ;
}
return _serverSocket ;
}
/ * *
* Create a new connected socket ( block until the socket is created )
*
* @param peer Destination to connect to
* @param options I2P socket options to be used for connecting
*
* @throws ConnectException if the peer refuses the connection
* @throws NoRouteToHostException if the peer is not found or not reachable
* @throws InterruptedIOException if the connection timeouts
* @throws I2PException if there is some other I2P - related problem
* /
public I2PSocket connect ( Destination peer , I2PSocketOptions options )
throws I2PException , ConnectException ,
NoRouteToHostException , InterruptedIOException {
String localID , lcID ;
I2PSocketImpl s ;
synchronized ( lock ) {
localID = makeID ( _outSockets ) ;
lcID = getReadableForm ( localID ) ;
s = new I2PSocketImpl ( peer , this , true , localID ) ;
_outSockets . put ( localID , s ) ;
}
if ( _log . shouldLog ( Log . DEBUG ) )
_log . debug ( getName ( ) + " : connect( " + peer . calculateHash ( ) . toBase64 ( ) . substring ( 0 , 6 )
+ " , ...): localID = " + lcID ) ;
try {
ByteArrayOutputStream pubkey = new ByteArrayOutputStream ( ) ;
_session . getMyDestination ( ) . writeBytes ( pubkey ) ;
String remoteID ;
byte [ ] packet = makePacket ( ( byte ) SYN , localID , pubkey . toByteArray ( ) ) ;
boolean sent = false ;
sent = _session . sendMessage ( peer , packet ) ;
if ( ! sent ) {
_log . info ( getName ( ) + " : Unable to send & receive ack for SYN packet for socket "
+ s + " with localID = " + lcID ) ;
synchronized ( lock ) {
_outSockets . remove ( s . getLocalID ( ) ) ;
}
_context . statManager ( ) . addRateData ( " streaming.synNoAck " , 1 , 1 ) ;
throw new I2PException ( " Error sending through I2P network " ) ;
} else {
if ( _log . shouldLog ( Log . DEBUG ) )
_log . debug ( getName ( ) + " : syn sent ok to "
+ peer . calculateHash ( ) . toBase64 ( ) . substring ( 0 , 6 )
+ " with localID = " + lcID ) ;
}
if ( options ! = null )
remoteID = s . getRemoteID ( true , options . getConnectTimeout ( ) ) ;
else
remoteID = s . getRemoteID ( true , getDefaultOptions ( ) . getConnectTimeout ( ) ) ;
if ( _log . shouldLog ( Log . DEBUG ) )
_log . debug ( getName ( ) + " : remoteID received from "
+ peer . calculateHash ( ) . toBase64 ( ) . substring ( 0 , 6 )
+ " : " + getReadableForm ( remoteID )
+ " with localID = " + lcID ) ;
if ( remoteID = = null ) {
_context . statManager ( ) . addRateData ( " streaming.nackReceived " , 1 , 1 ) ;
throw new ConnectException ( " Connection refused by peer for socket " + s ) ;
}
if ( " " . equals ( remoteID ) ) {
_context . statManager ( ) . addRateData ( " streaming.synNoAck " , 1 , 1 ) ;
throw new NoRouteToHostException ( " Unable to reach peer for socket " + s ) ;
}
if ( _log . shouldLog ( Log . DEBUG ) )
_log . debug ( getName ( ) + " : TIMING: s given out for remoteID "
+ getReadableForm ( remoteID ) + " for socket " + s ) ;
return s ;
} catch ( InterruptedIOException ioe ) {
if ( _log . shouldLog ( Log . WARN ) )
_log . warn ( getName ( ) + " : Timeout waiting for ack from syn for id "
+ lcID + " to " + peer . calculateHash ( ) . toBase64 ( ) . substring ( 0 , 6 )
+ " for socket " + s , ioe ) ;
synchronized ( lock ) {
_outSockets . remove ( s . getLocalID ( ) ) ;
}
s . internalClose ( ) ;
_context . statManager ( ) . addRateData ( " streaming.synNoAck " , 1 , 1 ) ;
throw new InterruptedIOException ( " Timeout waiting for ack " ) ;
} catch ( ConnectException ex ) {
if ( _log . shouldLog ( Log . DEBUG ) )
_log . debug ( getName ( ) + " : Connection error waiting for ack from syn for id "
+ lcID + " to " + peer . calculateHash ( ) . toBase64 ( ) . substring ( 0 , 6 )
+ " for socket " + s , ex ) ;
s . internalClose ( ) ;
throw ex ;
} catch ( NoRouteToHostException ex ) {
if ( _log . shouldLog ( Log . DEBUG ) )
_log . debug ( getName ( ) + " : No route to host waiting for ack from syn for id "
+ lcID + " to " + peer . calculateHash ( ) . toBase64 ( ) . substring ( 0 , 6 )
+ " for socket " + s , ex ) ;
s . internalClose ( ) ;
throw ex ;
} catch ( IOException ex ) {
if ( _log . shouldLog ( Log . WARN ) )
_log . warn ( getName ( ) + " : Error sending syn on id "
+ lcID + " to " + peer . calculateHash ( ) . toBase64 ( ) . substring ( 0 , 6 )
+ " for socket " + s , ex ) ;
synchronized ( lock ) {
_outSockets . remove ( s . getLocalID ( ) ) ;
}
s . internalClose ( ) ;
throw new I2PException ( " Unhandled IOException occurred " ) ;
} catch ( I2PException ex ) {
if ( _log . shouldLog ( Log . INFO ) )
_log . info ( getName ( ) + " : Error sending syn on id "
+ lcID + " to " + peer . calculateHash ( ) . toBase64 ( ) . substring ( 0 , 6 )
+ " for socket " + s , ex ) ;
synchronized ( lock ) {
_outSockets . remove ( s . getLocalID ( ) ) ;
}
s . internalClose ( ) ;
throw ex ;
} catch ( Exception e ) {
s . internalClose ( ) ;
_log . warn ( getName ( ) + " : Unhandled error connecting on "
+ lcID + " to " + peer . calculateHash ( ) . toBase64 ( ) . substring ( 0 , 6 )
+ " for socket " + s , e ) ;
throw new ConnectException ( " Unhandled error connecting: " + e . getMessage ( ) ) ;
}
}
/ * *
* Create a new connected socket ( block until the socket is created )
*
* @param peer Destination to connect to
*
* @throws ConnectException if the peer refuses the connection
* @throws NoRouteToHostException if the peer is not found or not reachable
* @throws InterruptedIOException if the connection timeouts
* @throws I2PException if there is some other I2P - related problem
* /
public I2PSocket connect ( Destination peer ) throws I2PException , ConnectException ,
NoRouteToHostException , InterruptedIOException {
return connect ( peer , null ) ;
}
/ * *
* Destroy the socket manager , freeing all the associated resources . This
* method will block untill all the managed sockets are closed .
*
* /
public void destroySocketManager ( ) {
if ( _serverSocket ! = null ) {
_serverSocket . close ( ) ;
_serverSocket = null ;
}
synchronized ( lock ) {
Iterator iter ;
String id = null ;
I2PSocketImpl sock ;
iter = _inSockets . keySet ( ) . iterator ( ) ;
while ( iter . hasNext ( ) ) {
id = ( String ) iter . next ( ) ;
sock = ( I2PSocketImpl ) _inSockets . get ( id ) ;
if ( _log . shouldLog ( Log . DEBUG ) )
_log . debug ( getName ( ) + " : Closing inSocket \" "
+ getReadableForm ( sock . getLocalID ( ) ) + " \" " ) ;
sock . internalClose ( ) ;
}
iter = _outSockets . keySet ( ) . iterator ( ) ;
while ( iter . hasNext ( ) ) {
id = ( String ) iter . next ( ) ;
sock = ( I2PSocketImpl ) _outSockets . get ( id ) ;
if ( _log . shouldLog ( Log . DEBUG ) )
_log . debug ( getName ( ) + " : Closing outSocket \" "
+ getReadableForm ( sock . getLocalID ( ) ) + " \" " ) ;
sock . internalClose ( ) ;
}
}
_log . debug ( getName ( ) + " : Waiting for all open sockets to really close... " ) ;
synchronized ( lock ) {
while ( ( _inSockets . size ( ) ! = 0 ) | | ( _outSockets . size ( ) ! = 0 ) ) {
try {
lock . wait ( ) ;
} catch ( InterruptedException e ) { }
}
}
try {
_log . debug ( getName ( ) + " : Destroying I2P session... " ) ;
_session . destroySession ( ) ;
_log . debug ( getName ( ) + " : I2P session destroyed " ) ;
} catch ( I2PSessionException e ) {
_log . warn ( getName ( ) + " : Error destroying I2P session " , e ) ;
}
}
/ * *
* Retrieve a set of currently connected I2PSockets , either initiated locally or remotely .
*
* /
public Set listSockets ( ) {
Set sockets = new HashSet ( 8 ) ;
synchronized ( lock ) {
sockets . addAll ( _inSockets . values ( ) ) ;
sockets . addAll ( _outSockets . values ( ) ) ;
}
return sockets ;
}
/ * *
* Ping the specified peer , returning true if they replied to the ping within
* the timeout specified , false otherwise . This call blocks .
*
* /
public boolean ping ( Destination peer , long timeoutMs ) {
try {
return _session . sendMessage ( peer , new byte [ ] { ( byte ) CHAFF } ) ;
} catch ( I2PException ex ) {
_log . warn ( getName ( ) + " : I2PException: " , ex ) ;
return false ;
}
}
public void removeSocket ( I2PSocketImpl sock ) {
String localId = sock . getLocalID ( ) ;
boolean removed = false ;
synchronized ( lock ) {
removed = ( null ! = _inSockets . remove ( localId ) ) ;
removed = removed | | ( null ! = _outSockets . remove ( localId ) ) ;
lock . notify ( ) ;
}
long now = _context . clock ( ) . now ( ) ;
long lifetime = now - sock . getCreatedOn ( ) ;
long timeSinceClose = now - sock . getClosedOn ( ) ;
long sent = sock . getBytesSent ( ) ;
long recv = sock . getBytesReceived ( ) ;
if ( _log . shouldLog ( Log . DEBUG ) ) {
_log . debug ( getName ( ) + " : Removing socket \" " + getReadableForm ( localId ) + " \" [ " + sock
+ " , send: " + sent + " , recv: " + recv
+ " , lifetime: " + lifetime + " ms, time since close: " + timeSinceClose
+ " removed? " + removed + " )] " ,
new Exception ( " removeSocket called " ) ) ;
}
_context . statManager ( ) . addRateData ( " streaming.lifetime " , lifetime , lifetime ) ;
_context . statManager ( ) . addRateData ( " streaming.sent " , sent , lifetime ) ;
_context . statManager ( ) . addRateData ( " streaming.received " , recv , lifetime ) ;
if ( sent > recv ) {
_context . statManager ( ) . addRateData ( " streaming.transferBalance " , 1 , lifetime ) ;
} else if ( recv > sent ) {
_context . statManager ( ) . addRateData ( " streaming.transferBalance " , - 1 , lifetime ) ;
} else {
// noop
}
}
public String getName ( ) { return _name ; }
public void setName ( String name ) { _name = name ; }
2005-01-26 06:29:17 +00:00
public void addDisconnectListener ( I2PSocketManager . DisconnectListener lsnr ) {
2005-01-06 00:17:53 +00:00
synchronized ( _listeners ) {
_listeners . add ( lsnr ) ;
}
}
2005-01-26 06:29:17 +00:00
public void removeDisconnectListener ( I2PSocketManager . DisconnectListener lsnr ) {
2005-01-06 00:17:53 +00:00
synchronized ( _listeners ) {
_listeners . remove ( lsnr ) ;
}
}
2004-10-24 01:42:34 +00:00
public static String getReadableForm ( String id ) {
if ( id = = null ) return " (null) " ;
if ( id . length ( ) ! = 3 ) return " Bogus " ;
return Base64 . encode ( toBytes ( id ) ) ;
}
/ * *
* Create a new part the connection ID that is locally unique
*
* @param uniqueIn map of already known local IDs so we don ' t collide . WARNING - NOT THREADSAFE !
* /
private static String makeID ( HashMap uniqueIn ) {
String newID ;
do {
int id = ( int ) ( Math . random ( ) * 16777215 + 1 ) ;
byte [ ] nid = new byte [ 3 ] ;
nid [ 0 ] = ( byte ) ( id / 65536 ) ;
nid [ 1 ] = ( byte ) ( ( id / 256 ) % 256 ) ;
nid [ 2 ] = ( byte ) ( id % 256 ) ;
newID = toString ( nid ) ;
} while ( uniqueIn . get ( newID ) ! = null ) ;
return newID ;
}
/ * *
* Create a new packet of the given type for the specified connection containing
* the given payload
* /
public static byte [ ] makePacket ( byte type , String id , byte [ ] payload ) {
byte [ ] packet = new byte [ payload . length + 4 ] ;
packet [ 0 ] = type ;
byte [ ] temp = toBytes ( id ) ;
if ( temp . length ! = 3 ) throw new RuntimeException ( " Incorrect ID length: " + temp . length ) ;
System . arraycopy ( temp , 0 , packet , 1 , 3 ) ;
System . arraycopy ( payload , 0 , packet , 4 , payload . length ) ;
return packet ;
}
private static final String toString ( byte data [ ] ) {
try {
return new String ( data , " ISO-8859-1 " ) ;
} catch ( UnsupportedEncodingException uee ) {
throw new RuntimeException ( " WTF! iso-8859-1 isn't supported? " ) ;
}
}
private static final byte [ ] toBytes ( String str ) {
try {
return str . getBytes ( " ISO-8859-1 " ) ;
} catch ( UnsupportedEncodingException uee ) {
throw new RuntimeException ( " WTF! iso-8859-1 isn't supported? " ) ;
}
}
}