the following isn't the end of the 0.4.1 updates, as there are still more things left to clean

up and debug in the new tcp transport, but it all works, and i dont like having big changes
sitting on my local machine (and there's no real need for branching atm)
2004-09-26  jrandom
    * Complete rewrite of the TCP transport with IP autodetection and
      low CPU overhead reconnections.  More concise connectivity errors
      are listed on the /oldconsole.jsp as well.  The IP autodetection works
      by listening to the first person who tells you what your IP address is
      when you have not defined one yourself and you have no other TCP
      connections.
    * Update to the I2NP message format to add transparent verification at
      the I2NP level (beyond standard TCP verification).
    * Remove a potential weakness in our AESEngine's safeEncrypt and safeDecrypt
      implementation (rather than verifying with E(H(key)), we now verify with
      E(H(iv))).
    * The above changes are NOT BACKWARDS COMPATIBLE.
    * Removed all of the old unused PHTTP code.
    * Refactor various methods and clean up some javadoc.
This commit is contained in:
jrandom
2004-09-26 15:16:44 +00:00
committed by zzz
parent 4c29c20613
commit b67b243ebd
26 changed files with 2885 additions and 2236 deletions

View File

@ -57,7 +57,7 @@
<target name="javadoc">
<mkdir dir="./build" />
<mkdir dir="./build/javadoc" />
<javadoc
<javadoc access="package"
destdir="./build/javadoc"
packagenames="*"
use="true"

View File

@ -58,7 +58,7 @@ public class AESEngine {
if ((iv == null) || (payload == null) || (sessionKey == null) || (iv.length != 16)) return null;
ByteArrayOutputStream baos = new ByteArrayOutputStream(paddedSize + 64);
Hash h = _context.sha().calculateHash(sessionKey.getData());
Hash h = _context.sha().calculateHash(iv);
try {
h.writeBytes(baos);
DataHelper.writeLong(baos, 4, payload.length);
@ -84,7 +84,7 @@ public class AESEngine {
return null;
}
ByteArrayInputStream bais = new ByteArrayInputStream(decr);
Hash h = _context.sha().calculateHash(sessionKey.getData());
Hash h = _context.sha().calculateHash(iv);
try {
Hash rh = new Hash();
rh.readBytes(bais);

View File

@ -13,8 +13,13 @@ import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import net.i2p.I2PAppContext;
import net.i2p.data.ByteArray;
import net.i2p.data.DataHelper;
import net.i2p.data.SessionKey;
import net.i2p.util.Clock;
import net.i2p.util.I2PThread;
@ -135,7 +140,62 @@ public class DHSessionKeyBuilder {
_sessionKey = null;
_extraExchangedBytes = new ByteArray();
}
/**
* Conduct a DH exchange over the streams, returning the resulting data.
*
* @return exchanged data
* @throws IOException if there is an error (but does not close the streams
*/
public static DHSessionKeyBuilder exchangeKeys(InputStream in, OutputStream out) throws IOException {
DHSessionKeyBuilder builder = new DHSessionKeyBuilder();
// send: X
writeBigI(out, builder.getMyPublicValue());
// read: Y
BigInteger Y = readBigI(in);
if (Y == null) return null;
builder.setPeerPublicValue(Y);
return builder;
}
static BigInteger readBigI(InputStream in) throws IOException {
byte Y[] = new byte[256];
int read = DataHelper.read(in, Y);
if (read != 256) {
return null;
}
if (1 == (Y[0] & 0x80)) {
// high bit set, need to inject an additional byte to keep 2s complement
if (_log.shouldLog(Log.DEBUG))
_log.debug("High bit set");
byte Y2[] = new byte[257];
System.arraycopy(Y, 0, Y2, 1, 256);
Y = Y2;
}
return new NativeBigInteger(Y);
}
/**
* Write out the integer as a 256 byte value. This left pads with 0s so
* to keep in 2s complement, and if it is already 257 bytes (due to
* the sign bit) ignore that first byte.
*/
static void writeBigI(OutputStream out, BigInteger val) throws IOException {
byte x[] = val.toByteArray();
for (int i = x.length; i < 256; i++)
out.write(0);
if (x.length == 257)
out.write(x, 1, 256);
else if (x.length == 256)
out.write(x);
else if (x.length > 257)
throw new IllegalArgumentException("Value is too large! length="+x.length);
out.flush();
}
private static final int getSize() {
synchronized (_builders) {
return _builders.size();

View File

@ -99,24 +99,28 @@ public class DataHelper {
*/
public static void writeProperties(OutputStream rawStream, Properties props)
throws DataFormatException, IOException {
OrderedProperties p = new OrderedProperties();
if (props != null) p.putAll(props);
ByteArrayOutputStream baos = new ByteArrayOutputStream(32);
for (Iterator iter = p.keySet().iterator(); iter.hasNext();) {
String key = (String) iter.next();
String val = p.getProperty(key);
// now make sure they're in UTF-8
//key = new String(key.getBytes(), "UTF-8");
//val = new String(val.getBytes(), "UTF-8");
writeString(baos, key);
baos.write(_equalBytes);
writeString(baos, val);
baos.write(_semicolonBytes);
if (props != null) {
OrderedProperties p = new OrderedProperties();
p.putAll(props);
ByteArrayOutputStream baos = new ByteArrayOutputStream(32);
for (Iterator iter = p.keySet().iterator(); iter.hasNext();) {
String key = (String) iter.next();
String val = p.getProperty(key);
// now make sure they're in UTF-8
//key = new String(key.getBytes(), "UTF-8");
//val = new String(val.getBytes(), "UTF-8");
writeString(baos, key);
baos.write(_equalBytes);
writeString(baos, val);
baos.write(_semicolonBytes);
}
baos.close();
byte propBytes[] = baos.toByteArray();
writeLong(rawStream, 2, propBytes.length);
rawStream.write(propBytes);
} else {
writeLong(rawStream, 2, 0);
}
baos.close();
byte propBytes[] = baos.toByteArray();
writeLong(rawStream, 2, propBytes.length);
rawStream.write(propBytes);
}
/**

View File

@ -322,6 +322,22 @@ public class RouterInfo extends DataStructureImpl {
return true;
}
/**
* Pull the first workable target address for the given transport
*
*/
public RouterAddress getTargetAddress(String transportStyle) {
synchronized (_addresses) {
for (Iterator iter = _addresses.iterator(); iter.hasNext(); ) {
RouterAddress addr = (RouterAddress)iter.next();
if (addr.getTransportStyle().equals(transportStyle))
return addr;
}
}
return null;
}
/**
* Actually validate the signature

View File

@ -1,4 +1,20 @@
$Id: history.txt,v 1.18 2004/09/16 18:55:12 jrandom Exp $
$Id: history.txt,v 1.19 2004/09/21 19:10:26 jrandom Exp $
2004-09-26 jrandom
* Complete rewrite of the TCP transport with IP autodetection and
low CPU overhead reconnections. More concise connectivity errors
are listed on the /oldconsole.jsp as well. The IP autodetection works
by listening to the first person who tells you what your IP address is
when you have not defined one yourself and you have no other TCP
connections.
* Update to the I2NP message format to add transparent verification at
the I2NP level (beyond standard TCP verification).
* Remove a potential weakness in our AESEngine's safeEncrypt and safeDecrypt
implementation (rather than verifying with E(H(key)), we now verify with
E(H(iv))).
* The above changes are NOT BACKWARDS COMPATIBLE.
* Removed all of the old unused PHTTP code.
* Refactor various methods and clean up some javadoc.
2004-09-21 jrandom
* Have two tiers of hosts.txt files - the standard "hosts.txt" and

View File

@ -50,4 +50,7 @@ public interface I2NPMessage extends DataStructure {
*
*/
public Date getMessageExpiration();
/** How large the message is, including any checksums */
public int getSize();
}

View File

@ -8,6 +8,7 @@ package net.i2p.data.i2np;
*
*/
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -17,6 +18,7 @@ import net.i2p.I2PAppContext;
import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper;
import net.i2p.data.DataStructureImpl;
import net.i2p.data.Hash;
import net.i2p.util.Log;
/**
@ -72,12 +74,23 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM
type = (int)DataHelper.readLong(in, 1);
_uniqueId = DataHelper.readLong(in, 4);
_expiration = DataHelper.readDate(in);
int size = (int)DataHelper.readLong(in, 2);
Hash h = new Hash();
h.readBytes(in);
byte data[] = new byte[size];
int read = DataHelper.read(in, data);
if (read != size)
throw new I2NPMessageException("Payload is too short [" + read + ", wanted " + size + "]");
Hash calc = _context.sha().calculateHash(data);
if (!calc.equals(h))
throw new I2NPMessageException("Hash does not match");
if (_log.shouldLog(Log.DEBUG))
_log.debug("Reading bytes: type = " + type + " / uniqueId : " + _uniqueId + " / expiration : " + _expiration);
readMessage(new ByteArrayInputStream(data), type);
} catch (DataFormatException dfe) {
throw new I2NPMessageException("Error reading the message header", dfe);
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Reading bytes: type = " + type + " / uniqueId : " + _uniqueId + " / expiration : " + _expiration);
readMessage(in, type);
}
public void writeBytes(OutputStream out) throws DataFormatException, IOException {
try {
@ -87,6 +100,9 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM
if (_log.shouldLog(Log.DEBUG))
_log.debug("Writing bytes: type = " + getType() + " / uniqueId : " + _uniqueId + " / expiration : " + _expiration);
byte[] data = writeMessage();
DataHelper.writeLong(out, 2, data.length);
Hash h = _context.sha().calculateHash(data);
h.writeBytes(out);
out.write(data);
} catch (I2NPMessageException ime) {
throw new DataFormatException("Error writing out the I2NP message data", ime);
@ -105,4 +121,15 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM
*/
public Date getMessageExpiration() { return _expiration; }
public void setMessageExpiration(Date exp) { _expiration = exp; }
public int getSize() {
try {
byte msg[] = writeMessage();
return msg.length + 43;
} catch (IOException ioe) {
return 0;
} catch (I2NPMessageException ime) {
return 0;
}
}
}

View File

@ -30,6 +30,7 @@ import net.i2p.crypto.DHSessionKeyBuilder;
import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper;
import net.i2p.data.RouterInfo;
import net.i2p.data.SigningPrivateKey;
import net.i2p.data.i2np.GarlicMessage;
import net.i2p.data.i2np.TunnelMessage;
import net.i2p.router.message.GarlicMessageHandler;
@ -220,6 +221,38 @@ public class Router {
}
public boolean isAlive() { return _isAlive; }
/**
* Rebuild and republish our routerInfo since something significant
* has changed.
*/
public void rebuildRouterInfo() {
if (_log.shouldLog(Log.INFO))
_log.info("Rebuilding new routerInfo");
RouterInfo ri = null;
if (_routerInfo != null)
ri = new RouterInfo(_routerInfo);
else
ri = new RouterInfo();
try {
ri.setPublished(_context.clock().now());
Properties stats = _context.statPublisher().publishStatistics();
ri.setOptions(stats);
ri.setAddresses(_context.commSystem().createAddresses());
SigningPrivateKey key = _context.keyManager().getSigningPrivateKey();
if (key == null) {
_log.log(Log.CRIT, "Internal error - signing private key not known? wtf");
return;
}
ri.sign(key);
setRouterInfo(ri);
_context.netDb().publish(ri);
} catch (DataFormatException dfe) {
_log.log(Log.CRIT, "Internal error - unable to sign our own address?!", dfe);
}
}
/**
* coallesce the stats framework every minute

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.31 $ $Date: 2004/09/16 18:55:12 $";
public final static String ID = "$Revision: 1.32 $ $Date: 2004/09/21 19:10:26 $";
public final static String VERSION = "0.4.0.1";
public final static long BUILD = 3;
public final static long BUILD = 4;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -19,7 +19,6 @@ import net.i2p.data.RouterAddress;
import net.i2p.router.CommSystemFacade;
import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.phttp.PHTTPTransport;
import net.i2p.router.transport.tcp.TCPTransport;
import net.i2p.util.Log;
@ -70,9 +69,6 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
public Set createAddresses() {
Set addresses = new HashSet();
RouterAddress addr = createTCPAddress();
if (addr != null)
addresses.add(addr);
addr = createPHTTPAddress();
if (addr != null)
addresses.add(addr);
if (_log.shouldLog(Log.INFO))
@ -82,8 +78,6 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
private final static String PROP_I2NP_TCP_HOSTNAME = "i2np.tcp.hostname";
private final static String PROP_I2NP_TCP_PORT = "i2np.tcp.port";
private final static String PROP_I2NP_PHTTP_SEND_URL = "i2np.phttp.sendURL";
private final static String PROP_I2NP_PHTTP_REGISTER_URL = "i2np.phttp.registerURL";
private RouterAddress createTCPAddress() {
RouterAddress addr = new RouterAddress();
@ -104,23 +98,4 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
addr.setTransportStyle(TCPTransport.STYLE);
return addr;
}
private RouterAddress createPHTTPAddress() {
RouterAddress addr = new RouterAddress();
addr.setCost(50);
addr.setExpiration(null);
Properties props = new Properties();
String regURL = _context.router().getConfigSetting(PROP_I2NP_PHTTP_REGISTER_URL);
String sendURL = _context.router().getConfigSetting(PROP_I2NP_PHTTP_SEND_URL);
if ( (regURL == null) || (sendURL == null) ) {
_log.info("Polling HTTP registration/send URLs not specified in config file - skipping PHTTP transport");
return null;
} else {
_log.info("Creating Polling HTTP address on " + regURL + " / " + sendURL);
}
props.setProperty(PHTTPTransport.PROP_TO_REGISTER_URL, regURL);
props.setProperty(PHTTPTransport.PROP_TO_SEND_URL, sendURL);
addr.setOptions(props);
addr.setTransportStyle(PHTTPTransport.STYLE);
return addr;
}
}

View File

@ -37,6 +37,10 @@ public abstract class TransportImpl implements Transport {
private List _sendPool;
protected RouterContext _context;
/**
* Initialize the new transport
*
*/
public TransportImpl(RouterContext context) {
_context = context;
_log = _context.logManager().getLog(TransportImpl.class);
@ -52,8 +56,18 @@ public abstract class TransportImpl implements Transport {
_currentAddresses = new HashSet();
}
/**
* How many peers can we talk to right now?
*
*/
public int countActivePeers() { return 0; }
/**
* Nonblocking call to pull the next outbound message
* off the queue.
*
* @return the next message or null if none are available
*/
public OutNetMessage getNextMessage() {
OutNetMessage msg = null;
synchronized (_sendPool) {
@ -64,16 +78,45 @@ public abstract class TransportImpl implements Transport {
return msg;
}
public void afterSend(OutNetMessage msg, boolean sendSuccessful) {
/**
* The transport is done sending this message
*
* @param msg message in question
* @param sendSuccessful true if the peer received it
*/
protected void afterSend(OutNetMessage msg, boolean sendSuccessful) {
afterSend(msg, sendSuccessful, true, 0);
}
public void afterSend(OutNetMessage msg, boolean sendSuccessful, boolean allowRequeue) {
/**
* The transport is done sending this message
*
* @param msg message in question
* @param sendSuccessful true if the peer received it
* @param allowRequeue true if we should try other transports if available
*/
protected void afterSend(OutNetMessage msg, boolean sendSuccessful, boolean allowRequeue) {
afterSend(msg, sendSuccessful, allowRequeue, 0);
}
public void afterSend(OutNetMessage msg, boolean sendSuccessful, long msToSend) {
/**
* The transport is done sending this message
*
* @param msg message in question
* @param sendSuccessful true if the peer received it
* @param msToSend how long it took to transfer the data to the peer
*/
protected void afterSend(OutNetMessage msg, boolean sendSuccessful, long msToSend) {
afterSend(msg, sendSuccessful, true, msToSend);
}
public void afterSend(OutNetMessage msg, boolean sendSuccessful, boolean allowRequeue, long msToSend) {
/**
* The transport is done sending this message. This is the method that actually
* does all of the cleanup - firing off jobs, requeueing, updating stats, etc.
*
* @param msg message in question
* @param sendSuccessful true if the peer received it
* @param msToSend how long it took to transfer the data to the peer
* @param allowRequeue true if we should try other transports if available
*/
protected void afterSend(OutNetMessage msg, boolean sendSuccessful, boolean allowRequeue, long msToSend) {
boolean log = false;
msg.timestamp("afterSend(" + sendSuccessful + ")");
@ -225,6 +268,10 @@ public abstract class TransportImpl implements Transport {
*/
protected abstract void outboundMessageReady();
/**
* Message received from the I2NPMessageReader - send it to the listener
*
*/
public void messageReceived(I2NPMessage inMsg, RouterIdentity remoteIdent, Hash remoteIdentHash, long msToReceive, int bytesReceived) {
int level = Log.INFO;
if (msToReceive > 5000)
@ -273,25 +320,17 @@ public abstract class TransportImpl implements Transport {
_log.error("WTF! Null listener! this = " + toString(), new Exception("Null listener"));
}
}
/**
* Pull the first workable target address for this transport
*
*/
protected RouterAddress getTargetAddress(RouterInfo address) {
if (address == null) return null;
for (Iterator iter = address.getAddresses().iterator(); iter.hasNext(); ) {
RouterAddress addr = (RouterAddress)iter.next();
if (getStyle().equals(addr.getTransportStyle()))
return addr;
}
return null;
}
/** What addresses are we currently listening to? */
public Set getCurrentAddresses() { return _currentAddresses; }
/** Add an address to our listening set */
protected void addCurrentAddress(RouterAddress address) { _currentAddresses.add(address); }
/** Remove an address from our listening set */
protected void removeCurrentAddress(RouterAddress address) { _currentAddresses.remove(address); }
/** Who to notify on message availability */
public void setListener(TransportEventListener listener) { _listener = listener; }
/** Make this stuff pretty (only used in the old console) */
public String renderStatusHTML() { return null; }
protected RouterContext getContext() { return _context; }
}

View File

@ -30,7 +30,6 @@ import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.InNetMessage;
import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.phttp.PHTTPTransport;
import net.i2p.router.transport.tcp.TCPTransport;
import net.i2p.util.Log;
@ -62,31 +61,14 @@ public class TransportManager implements TransportEventListener {
}
private void configTransports() {
RouterIdentity ident = _context.router().getRouterInfo().getIdentity();
Set addresses = _context.commSystem().createAddresses();
RouterAddress tcpAddr = null;
RouterAddress phttpAddr = null;
for (Iterator iter = addresses.iterator(); iter.hasNext();) {
RouterAddress addr = (RouterAddress)iter.next();
if (TCPTransport.STYLE.equals(addr.getTransportStyle())) {
tcpAddr = addr;
}
if (PHTTPTransport.STYLE.equals(addr.getTransportStyle())) {
phttpAddr = addr;
}
}
String disableTCP = _context.router().getConfigSetting(PROP_DISABLE_TCP);
if ( (disableTCP != null) && (Boolean.TRUE.toString().equalsIgnoreCase(disableTCP)) ) {
_log.info("Explicitly disabling the TCP transport!");
} else {
Transport t = new TCPTransport(_context, tcpAddr);
Transport t = new TCPTransport(_context);
t.setListener(this);
_transports.add(t);
}
Transport t = new PHTTPTransport(_context, phttpAddr);
t.setListener(this);
_transports.add(t);
}
public void startListening() {

View File

@ -0,0 +1,740 @@
package net.i2p.router.transport.tcp;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.math.BigInteger;
import java.net.Socket;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import net.i2p.crypto.AESInputStream;
import net.i2p.crypto.AESOutputStream;
import net.i2p.crypto.DHSessionKeyBuilder;
import net.i2p.data.Base64;
import net.i2p.data.ByteArray;
import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.data.RouterAddress;
import net.i2p.data.RouterInfo;
import net.i2p.data.SessionKey;
import net.i2p.data.Signature;
import net.i2p.router.RouterContext;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
import net.i2p.util.NativeBigInteger;
import net.i2p.util.SimpleTimer;
/**
* Class responsible for all of the handshaking necessary to establish a
* connection with a peer.
*
*/
public class ConnectionBuilder {
private Log _log;
private RouterContext _context;
private TCPTransport _transport;
/** who we're trying to talk with */
private RouterInfo _target;
/** who we're actually talking with */
private RouterInfo _actualPeer;
/** raw socket to the peer */
private Socket _socket;
/** raw stream to read from the peer */
private InputStream _rawIn;
/** raw stream to write to the peer */
private OutputStream _rawOut;
/** secure stream to read from the peer */
private InputStream _connectionIn;
/** secure stream to write to the peer */
private OutputStream _connectionOut;
/** protocol version agreed to, or -1 */
private int _agreedProtocol;
/** IP address the peer says we are at */
private String _localIP;
/** IP address of the peer we connected to */
private TCPAddress _remoteAddress;
/** connection tag to identify ourselves, or null if no known tag is available */
private ByteArray _connectionTag;
/** connection tag to identify ourselves next time */
private ByteArray _nextConnectionTag;
/** nonce the peer gave us */
private ByteArray _nonce;
/** key that we will be encrypting comm with */
private SessionKey _key;
/** initialization vector for the encryption */
private byte[] _iv;
/**
* Contains a message describing why the connection failed (or null if it
* succeeded). This should include a timestamp of some sort.
*/
private String _error;
/** If the connection hasn't been built in 10 seconds, give up */
public static final int CONNECTION_TIMEOUT = 10*1000;
public ConnectionBuilder(RouterContext context, TCPTransport transport, RouterInfo info) {
_context = context;
_log = context.logManager().getLog(ConnectionBuilder.class);
_transport = transport;
_target = info;
_error = null;
_agreedProtocol = -1;
}
/**
* Blocking call to establish a TCP connection to the given peer through a
* brand new socket.
*
* @return fully established but not yet running connection, or null on error
*/
public TCPConnection establishConnection() {
SimpleTimer.getInstance().addEvent(new DieIfTooSlow(), CONNECTION_TIMEOUT);
try {
return doEstablishConnection();
} catch (Exception e) { // catchall in case the timeout gets us flat footed
_log.error("Error connecting", e);
return null;
}
}
private TCPConnection doEstablishConnection() {
createSocket();
if ( (_socket == null) || (_error != null) )
return null;
negotiateProtocol();
if ( (_agreedProtocol < 0) || (_error != null) )
return null;
boolean ok = false;
if (_connectionTag != null)
ok = connectExistingSession();
else
ok = connectNewSession();
if (_log.shouldLog(Log.DEBUG))
_log.debug("connection ok? " + ok + " error: " + _error);
if (ok && (_error == null) ) {
establishComplete();
TCPConnection con = new TCPConnection(_context);
con.setInputStream(_connectionIn);
con.setOutputStream(_connectionOut);
con.setSocket(_socket);
con.setRemoteRouterIdentity(_actualPeer.getIdentity());
con.setRemoteAddress(_remoteAddress);
if (_error == null) {
if (_log.shouldLog(Log.INFO))
_log.info("Establishment successful! returning the con");
return con;
} else {
return null;
}
} else {
return null;
}
}
/**
* Agree on what protocol to communicate with, and set _agreedProtocol
* accordingly. If no common protocols are available, disconnect, set
* _agreedProtocol to -1, and update the _error accordingly.
*/
private void negotiateProtocol() {
ConnectionTagManager mgr = _transport.getTagManager();
ByteArray tag = mgr.getTag(_target.getIdentity().getHash());
_key = mgr.getKey(_target.getIdentity().getHash());
_connectionTag = tag;
boolean ok = sendPreferredProtocol();
if (!ok) return;
ok = receiveAgreedProtocol();
if (!ok) return;
}
/**
* Send <code>#bytesFollowing + #versions + v1 [+ v2 [etc]] +
* tag? + tagData + properties</code>
*/
private boolean sendPreferredProtocol() {
try {
// #bytesFollowing + #versions + v1 [+ v2 [etc]] + tag? + tagData + properties
ByteArrayOutputStream baos = new ByteArrayOutputStream(64);
DataHelper.writeLong(baos, 1, TCPTransport.SUPPORTED_PROTOCOLS.length);
for (int i = 0; i < TCPTransport.SUPPORTED_PROTOCOLS.length; i++) {
DataHelper.writeLong(baos, 1, TCPTransport.SUPPORTED_PROTOCOLS[i]);
}
if (_connectionTag != null) {
baos.write(0x1);
baos.write(_connectionTag.getData());
} else {
baos.write(0x0);
}
DataHelper.writeProperties(baos, null); // no options atm
byte line[] = baos.toByteArray();
DataHelper.writeLong(_rawOut, 2, line.length);
_rawOut.write(line);
_rawOut.flush();
if (_log.shouldLog(Log.DEBUG))
_log.debug("SendProtocol[X]: tag: "
+ (_connectionTag != null ? Base64.encode(_connectionTag.getData()) : "none")
+ " socket: " + _socket);
return true;
} catch (IOException ioe) {
fail("Error sending our handshake to "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6)
+ ": " + ioe.getMessage(), ioe);
return false;
} catch (DataFormatException dfe) {
fail("Error sending our handshake to "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6)
+ ": " + dfe.getMessage(), dfe);
return false;
}
}
/**
* Receive <code>#bytesFollowing + versionOk + #bytesIP + IP + tagOk? + nonce + properties</code>
*
*/
private boolean receiveAgreedProtocol() {
try {
// #bytesFollowing + versionOk + #bytesIP + IP + tagOk? + nonce + properties
int numBytes = (int)DataHelper.readLong(_rawIn, 2);
// 0xFF is a reserved value
if ( (numBytes <= 0) || (numBytes >= 0xFF) )
throw new IOException("Invalid number of bytes in response");
byte line[] = new byte[numBytes];
int read = DataHelper.read(_rawIn, line);
if (read != numBytes) {
fail("Handshake too short with "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6));
return false;
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("ReadProtocol1[X]: "
+ "\nLine: " + Base64.encode(line));
ByteArrayInputStream bais = new ByteArrayInputStream(line);
int version = (int)DataHelper.readLong(bais, 1);
for (int i = 0; i < TCPTransport.SUPPORTED_PROTOCOLS.length; i++) {
if (version == TCPTransport.SUPPORTED_PROTOCOLS[i]) {
_agreedProtocol = version;
break;
}
}
if (_agreedProtocol == -1) {
fail("No valid protocol versions to contact "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6));
return false;
}
int bytesInIP = (int)DataHelper.readLong(bais, 1);
byte ip[] = new byte[bytesInIP];
DataHelper.read(bais, ip); // ignore return value, this is an array
_localIP = new String(ip);
// if we don't already know our IP address, this may cause us
// to fire up a socket listener, so may take a few seconds.
_transport.ourAddressReceived(_localIP);
int tagOk = (int)DataHelper.readLong(bais, 1);
if ( (tagOk == 0x01) && (_connectionTag != null) ) {
// tag is ok
} else {
_connectionTag = null;
_key = null;
}
byte nonce[] = new byte[4];
read = DataHelper.read(bais, nonce);
if (read != 4) {
fail("No nonce specified by "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6));
return false;
}
_nonce = new ByteArray(nonce);
Properties opts = DataHelper.readProperties(bais);
if (_log.shouldLog(Log.DEBUG))
_log.debug("ReadProtocol[X]: agreed=" + _agreedProtocol + " nonce: "
+ Base64.encode(nonce) + " tag: "
+ (_connectionTag != null ? Base64.encode(_connectionTag.getData()) : "none")
+ " props: " + opts
+ " socket: " + _socket
+ "\nLine: " + Base64.encode(line));
// we dont care about any of the properties, so we can just
// ignore it, and we're done with this step
return true;
} catch (IOException ioe) {
fail("Error reading the handshake from "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6)
+ ": " + ioe.getMessage(), ioe);
return false;
} catch (DataFormatException dfe) {
fail("Error reading the handshake from "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6)
+ ": " + dfe.getMessage(), dfe);
return false;
}
}
/** Set the next tag to <code>H(E(nonce + tag, sessionKey))</code> */
private void updateNextTagExisting() {
byte pre[] = new byte[48];
byte encr[] = _context.AESEngine().encrypt(pre, _key, _iv);
Hash h = _context.sha().calculateHash(encr);
_nextConnectionTag = new ByteArray(h.getData());
}
/**
* We have a valid tag, so use it to do the handshaking. On error, fail()
* appropriately.
*
* @return true if the connection went ok, or false if it failed.
*/
private boolean connectExistingSession() {
// iv to the SHA256 of the tag appended by the nonce.
byte data[] = new byte[36];
System.arraycopy(_connectionTag.getData(), 0, data, 0, 32);
System.arraycopy(_nonce.getData(), 0, data, 32, 4);
Hash h = _context.sha().calculateHash(data);
_iv = new byte[16];
System.arraycopy(h.getData(), 0, _iv, 0, 16);
updateNextTagExisting();
_rawOut = new AESOutputStream(_context, _rawOut, _key, _iv);
_rawIn = new AESInputStream(_context, _rawIn, _key, _iv);
// send: H(nonce)
try {
h = _context.sha().calculateHash(_nonce.getData());
h.writeBytes(_rawOut);
_rawOut.flush();
} catch (IOException ioe) {
fail("Error writing the encrypted nonce to "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6)
+ ": " + ioe.getMessage());
return false;
} catch (DataFormatException dfe) {
fail("Error writing the encrypted nonce to "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6)
+ ": " + dfe.getMessage());
return false;
}
// read: H(tag)
try {
Hash readHash = new Hash();
readHash.readBytes(_rawIn);
Hash expectedHash = _context.sha().calculateHash(_connectionTag.getData());
if (!readHash.equals(expectedHash)) {
fail("Key verification failed with "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6));
return false;
}
} catch (IOException ioe) {
fail("Error reading the initial key verification from "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6)
+ ": " + ioe.getMessage());
return false;
} catch (DataFormatException dfe) {
fail("Error reading the initial key verification from "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6)
+ ": " + dfe.getMessage());
return false;
}
// send: routerInfo + currentTime + H(routerInfo + currentTime + nonce + tag)
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
_context.router().getRouterInfo().writeBytes(baos);
DataHelper.writeDate(baos, new Date(_context.clock().now()));
_rawOut.write(baos.toByteArray());
baos.write(_nonce.getData());
baos.write(_connectionTag.getData());
Hash verification = _context.sha().calculateHash(baos.toByteArray());
verification.writeBytes(_rawOut);
_rawOut.flush();
} catch (IOException ioe) {
fail("Error writing the verified info to "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6)
+ ": " + ioe.getMessage());
return false;
} catch (DataFormatException dfe) {
fail("Error writing the verified info to "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6)
+ ": " + dfe.getMessage());
return false;
}
// read: routerInfo + status + properties
// + H(routerInfo + status + properties + nonce + tag)
try {
RouterInfo peer = new RouterInfo();
peer.readBytes(_rawIn);
int status = (int)_rawIn.read() & 0xFF;
boolean ok = validateStatus(status);
if (!ok) return false;
Properties props = DataHelper.readProperties(_rawIn);
// ignore these now
Hash readHash = new Hash();
readHash.readBytes(_rawIn);
// H(routerInfo + status + properties + nonce + tag)
ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
peer.writeBytes(baos);
baos.write(status);
DataHelper.writeProperties(baos, props);
baos.write(_nonce.getData());
baos.write(_connectionTag.getData());
Hash expectedHash = _context.sha().calculateHash(baos.toByteArray());
if (!expectedHash.equals(readHash)) {
fail("Error verifying info from "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6)
+ " (claiming to be "
+ peer.getIdentity().calculateHash().toBase64().substring(0,6)
+ ")");
return false;
}
_actualPeer = peer;
return true;
} catch (IOException ioe) {
fail("Error reading the verified info from "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6)
+ ": " + ioe.getMessage());
return false;
} catch (DataFormatException dfe) {
fail("Error reading the verified info from "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6)
+ ": " + dfe.getMessage());
return false;
}
}
/**
* We do not have a valid tag, so exchange a new one and then do the
* handshaking. On error, fail() appropriately.
*
* @return true if the connection went ok, or false if it failed.
*/
private boolean connectNewSession() {
DHSessionKeyBuilder builder = null;
try {
builder = DHSessionKeyBuilder.exchangeKeys(_rawIn, _rawOut);
} catch (IOException ioe) {
fail("Error exchanging keys with "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6));
return false;
}
// load up the key initialize the encrypted streams
_key = builder.getSessionKey();
byte extra[] = builder.getExtraBytes().getData();
_iv = new byte[16];
System.arraycopy(extra, 0, _iv, 0, 16);
byte nextTag[] = new byte[32];
System.arraycopy(extra, 16, nextTag, 0, 32);
_nextConnectionTag = new ByteArray(nextTag);
if (_log.shouldLog(Log.DEBUG))
_log.debug("\nNew session[X]: key=" + _key.toBase64() + " iv="
+ Base64.encode(_iv) + " nonce=" + Base64.encode(_nonce.getData())
+ " socket: " + _socket);
_rawOut = new AESOutputStream(_context, _rawOut, _key, _iv);
_rawIn = new AESInputStream(_context, _rawIn, _key, _iv);
// send: H(nonce)
try {
Hash h = _context.sha().calculateHash(_nonce.getData());
h.writeBytes(_rawOut);
_rawOut.flush();
} catch (IOException ioe) {
fail("Error writing the verification to "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6), ioe);
return false;
} catch (DataFormatException dfe) {
fail("Error writing the verification to "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6), dfe);
return false;
}
// read: H(nextTag)
try {
byte val[] = new byte[32];
int read = DataHelper.read(_rawIn, val);
if (read != 32) {
fail("Not enough data to read the verification from "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6));
return false;
}
Hash expected = _context.sha().calculateHash(_nextConnectionTag.getData());
if (!DataHelper.eq(expected.getData(), val)) {
fail("Verification failed from "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6));
return false;
}
} catch (IOException ioe) {
fail("Error reading the verification from "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6), ioe);
return false;
}
// send: routerInfo + currentTime
// + S(routerInfo + currentTime + nonce + nextTag, routerIdent.signingKey)
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
_context.router().getRouterInfo().writeBytes(baos);
DataHelper.writeDate(baos, new Date(_context.clock().now()));
_rawOut.write(baos.toByteArray());
baos.write(_nonce.getData());
baos.write(_nextConnectionTag.getData());
Signature sig = _context.dsa().sign(baos.toByteArray(),
_context.keyManager().getSigningPrivateKey());
sig.writeBytes(_rawOut);
_rawOut.flush();
} catch (IOException ioe) {
fail("Error sending the info to "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6));
return false;
} catch (DataFormatException dfe) {
fail("Error sending the info to "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6));
return false;
}
// read: routerInfo + status + properties
// + S(routerInfo + status + properties + nonce + nextTag, routerIdent.signingKey)
try {
RouterInfo peer = new RouterInfo();
peer.readBytes(_rawIn);
int status = (int)_rawIn.read() & 0xFF;
boolean ok = validateStatus(status);
if (!ok) return false;
Properties props = DataHelper.readProperties(_rawIn);
// ignore these now
Signature sig = new Signature();
sig.readBytes(_rawIn);
// S(routerInfo + status + properties + nonce + nextTag, routerIdent.signingKey)
ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
peer.writeBytes(baos);
baos.write(status);
DataHelper.writeProperties(baos, props);
baos.write(_nonce.getData());
baos.write(_nextConnectionTag.getData());
ok = _context.dsa().verifySignature(sig, baos.toByteArray(),
peer.getIdentity().getSigningPublicKey());
if (!ok) {
fail("Error verifying info from "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6)
+ " (claiming to be "
+ peer.getIdentity().calculateHash().toBase64().substring(0,6)
+ ")");
return false;
}
_actualPeer = peer;
return true;
} catch (IOException ioe) {
fail("Error reading the verified info from "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6)
+ ": " + ioe.getMessage());
return false;
} catch (DataFormatException dfe) {
fail("Error reading the verified info from "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6)
+ ": " + dfe.getMessage());
return false;
}
}
/**
* Is the given status value ok for an existing session?
*
* @return true if ok, false if fail()ed
*/
private boolean validateStatus(int status) {
switch (status) {
case -1:
fail("Error reading the status from "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6));
return false;
case 0: // ok
return true;
case 1: // not reachable
fail("According to "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6)
+ ", we are not reachable on " + _localIP);
return false;
case 2: // clock skew
fail("According to "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6)
+ ", our clock is off");
return false;
case 3: // signature failure (only for new sessions)
fail("Signature failure talking to "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6));
return false;
default: // unknown error
fail("Unknown error [" + status + "] connecting to "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6));
return false;
}
}
/**
* Finish up the establishment (wrapping the streams, storing the netDb,
* persisting the connection tags, etc)
*
*/
private void establishComplete() {
// todo: add bw limiter
_connectionIn = _rawIn;
_connectionOut = _rawOut;
Hash peer = _actualPeer.getIdentity().getHash();
_context.netDb().store(peer, _actualPeer);
_transport.getTagManager().replaceTag(peer, _nextConnectionTag, _key);
}
/**
* Build a socket to the peer, and populate _socket, _rawIn, and _rawOut
* accordingly. On error or timeout, close and null them all and
* set _error.
*
*/
private void createSocket() {
CreateSocketRunner r = new CreateSocketRunner();
I2PThread t = new I2PThread(r);
t.start();
try { t.join(CONNECTION_TIMEOUT); } catch (InterruptedException ie) {}
if (!r.getCreated()) {
fail("Unable to establish a socket in time to "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6));
}
}
/** Brief description of why the connection failed (or null if it succeeded) */
public String getError() { return _error; }
/**
* Kill the builder, closing all sockets and streams, setting everything
* back to failure states, and setting the given error.
*
*/
private void fail(String error) {
fail(error, null);
}
private void fail(String error, Exception e) {
if (_error == null) // only grab the first error
_error = error;
if (_rawIn != null) try { _rawIn.close(); } catch (IOException ioe) {}
if (_rawOut != null) try { _rawOut.close(); } catch (IOException ioe) {}
if (_socket != null) try { _socket.close(); } catch (IOException ioe) {}
_socket = null;
_rawIn = null;
_rawOut = null;
_agreedProtocol = -1;
_nonce = null;
_connectionTag = null;
_actualPeer = null;
if (_log.shouldLog(Log.WARN))
_log.warn(error, e);
}
/**
* Lookup and establish a connection to the peer, exposing getCreate() == true
* once we are done. This allows for asynchronous timeouts without depending
* upon the interruptability of the socket (since it isn't open yet).
*
*/
private class CreateSocketRunner implements Runnable {
private boolean _created;
public CreateSocketRunner() {
_created = false;
}
public boolean getCreated() { return _created; }
public void run() {
RouterAddress addr = _target.getTargetAddress(_transport.getStyle());
if (addr == null) {
fail("Peer "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6)
+ " has no TCP addresses");
return;
}
TCPAddress tcpAddr = new TCPAddress(addr);
if (tcpAddr.getPort() <= 0) {
fail("Peer "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6)
+ " has an invalid TCP address");
return;
}
try {
_socket = new Socket(tcpAddr.getAddress(), tcpAddr.getPort());
_rawIn = _socket.getInputStream();
_rawOut = _socket.getOutputStream();
_error = null;
_remoteAddress = tcpAddr;
_created = true;
} catch (IOException ioe) {
fail("Error contacting "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6)
+ " on " + tcpAddr.toString() + ": " + ioe.getMessage());
return;
}
}
}
/**
* In addition to the socket creation timeout, we have a timed event for
* the overall connection establishment, killing everything if we haven't
* completed a connection yet.
*
*/
private class DieIfTooSlow implements SimpleTimer.TimedEvent {
public void timeReached() {
if ( (_actualPeer == null) && (_error == null) ) {
fail("Took too long to connect with "
+ _target.getIdentity().calculateHash().toBase64().substring(0,6));
}
}
}
}

View File

@ -0,0 +1,806 @@
package net.i2p.router.transport.tcp;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import net.i2p.crypto.AESInputStream;
import net.i2p.crypto.AESOutputStream;
import net.i2p.crypto.DHSessionKeyBuilder;
import net.i2p.data.Base64;
import net.i2p.data.ByteArray;
import net.i2p.data.DataHelper;
import net.i2p.data.DataFormatException;
import net.i2p.data.Hash;
import net.i2p.data.SessionKey;
import net.i2p.data.Signature;
import net.i2p.data.RouterInfo;
import net.i2p.router.RouterContext;
import net.i2p.router.Router;
import net.i2p.util.Log;
import net.i2p.util.NativeBigInteger;
/**
* Class responsible for all of the handshaking necessary to turn a socket into
* a TCPConnection.
*
*/
public class ConnectionHandler {
private RouterContext _context;
private Log _log;
private TCPTransport _transport;
/** who we're actually talking with */
private RouterInfo _actualPeer;
/** raw socket to the peer */
private Socket _socket;
/** raw stream to read from the peer */
private InputStream _rawIn;
/** raw stream to write to the peer */
private OutputStream _rawOut;
/** secure stream to read from the peer */
private InputStream _connectionIn;
/** secure stream to write to the peer */
private OutputStream _connectionOut;
/** protocol version agreed to, or -1 */
private int _agreedProtocol;
/**
* Contains a message describing why the connection failed (or null if it
* succeeded). This should include a timestamp of some sort.
*/
private String _error;
/**
* If we're handing a reachability test, set this to true once
* we're done
*/
private boolean _testComplete;
/** IP address of the peer who contacted us */
private String _from;
/** Where we verified their address */
private TCPAddress _remoteAddress;
/** connection tag to identify ourselves, or null if no known tag is available */
private ByteArray _connectionTag;
/** connection tag to identify ourselves next time */
private ByteArray _nextConnectionTag;
/** nonce the peer gave us */
private ByteArray _nonce;
/** key that we will be encrypting comm with */
private SessionKey _key;
/** initialization vector for the encryption */
private byte[] _iv;
public ConnectionHandler(RouterContext ctx, TCPTransport transport, Socket socket) {
_context = ctx;
_log = ctx.logManager().getLog(ConnectionHandler.class);
_transport = transport;
_socket = socket;
_error = null;
_agreedProtocol = -1;
InetAddress addr = _socket.getInetAddress();
if (addr != null) {
_from = addr.getHostAddress();
}
}
/**
* Blocking call to establish a TCP connection over the current socket.
* At this point, no data whatsoever need to have been transmitted over the
* socket - the builder is responsible for all aspects of the handshaking.
*
* @return fully established but not yet running connection, or null on error
*/
public TCPConnection receiveConnection() {
try {
_rawIn = _socket.getInputStream();
_rawOut = _socket.getOutputStream();
} catch (IOException ioe) {
fail("Error accessing the socket streams from " + _from, ioe);
return null;
}
negotiateProtocol();
if ( (_agreedProtocol < 0) || (_error != null) )
return null;
boolean ok = false;
if ( (_connectionTag != null) && (_key != null) )
ok = connectExistingSession();
else
ok = connectNewSession();
if (_log.shouldLog(Log.DEBUG))
_log.debug("connection ok? " + ok + " error: " + _error);
if (ok && (_error == null) ) {
establishComplete();
if (_log.shouldLog(Log.INFO))
_log.info("Establishment ok... building the con");
TCPConnection con = new TCPConnection(_context);
con.setInputStream(_connectionIn);
con.setOutputStream(_connectionOut);
con.setSocket(_socket);
con.setRemoteRouterIdentity(_actualPeer.getIdentity());
con.setRemoteAddress(_remoteAddress);
if (_error == null) {
if (_log.shouldLog(Log.INFO))
_log.info("Establishment successful! returning the con");
return con;
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Establishment ok but we failed?! error = " + _error);
return null;
}
} else {
return null;
}
}
/**
* Agree on what protocol to communicate with, and set _agreedProtocol
* accordingly. If no common protocols are available, disconnect, set
* _agreedProtocol to -1, and update the _error accordingly.
*/
private void negotiateProtocol() {
boolean ok = readPreferredProtocol();
if (!ok) return;
sendAgreedProtocol();
}
/**
* Receive <code>#bytesFollowing + #versions + v1 [+ v2 [etc]] + tag? + tagData + properties</code>
*
*/
private boolean readPreferredProtocol() {
try {
int numBytes = (int)DataHelper.readLong(_rawIn, 2);
if (numBytes <= 0)
throw new IOException("Invalid number of bytes in connection");
// 0xFF is a reserved value identifying the connection as a reachability test
if (numBytes == 0xFFFF) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("ReadProtocol[Y]: test called, handle it");
handleTest();
return false;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("ReadProtocol[Y]: not a test (line len=" + numBytes + ")");
}
byte line[] = new byte[numBytes];
int read = DataHelper.read(_rawIn, line);
if (read != numBytes) {
fail("Handshake too short from " + _from);
return false;
}
ByteArrayInputStream bais = new ByteArrayInputStream(line);
int numVersions = (int)DataHelper.readLong(bais, 1);
if ( (numVersions <= 0) || (numVersions > 0x8) ) {
fail("Invalid number of protocol versions from " + _from);
return false;
}
int versions[] = new int[numVersions];
for (int i = 0; i < numVersions; i++)
versions[i] = (int)DataHelper.readLong(bais, 1);
for (int i = 0; i < numVersions && _agreedProtocol == -1; i++) {
for (int j = 0; j < TCPTransport.SUPPORTED_PROTOCOLS.length; j++) {
if (versions[i] == TCPTransport.SUPPORTED_PROTOCOLS[j]) {
_agreedProtocol = versions[i];
break;
}
}
}
int tag = (int)DataHelper.readLong(bais, 1);
if (tag == 0x1) {
byte tagData[] = new byte[32];
read = DataHelper.read(bais, tagData);
if (read != 32)
throw new IOException("Not enough data for the tag");
_connectionTag = new ByteArray(tagData);
_key = _transport.getTagManager().getKey(_connectionTag);
if (_key == null)
_connectionTag = null;
}
Properties opts = DataHelper.readProperties(bais);
// ignore them
if (_log.shouldLog(Log.DEBUG))
_log.debug("ReadProtocol[Y]: agreed=" + _agreedProtocol + " tag: "
+ (_connectionTag != null ? Base64.encode(_connectionTag.getData()) : "none"));
return true;
} catch (IOException ioe) {
fail("Error reading the handshake from " + _from
+ ": " + ioe.getMessage(), ioe);
return false;
} catch (DataFormatException dfe) {
fail("Error reading the handshake from " + _from
+ ": " + dfe.getMessage(), dfe);
return false;
}
}
/**
* Send <code>#bytesFollowing + versionOk + #bytesIP + IP + tagOk? + nonce + properties</code>
*/
private void sendAgreedProtocol() {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream(128);
if (_agreedProtocol <= 0)
baos.write(0x0);
else
baos.write(_agreedProtocol);
byte ip[] = _from.getBytes();
baos.write(ip.length);
baos.write(ip);
if (_key != null)
baos.write(0x1);
else
baos.write(0x0);
byte nonce[] = new byte[4];
_context.random().nextBytes(nonce);
_nonce = new ByteArray(nonce);
baos.write(nonce);
Properties opts = new Properties();
opts.setProperty("foo", "bar");
DataHelper.writeProperties(baos, opts); // no options atm
byte line[] = baos.toByteArray();
DataHelper.writeLong(_rawOut, 2, line.length);
_rawOut.write(line);
_rawOut.flush();
if (_log.shouldLog(Log.DEBUG))
_log.debug("SendProtocol[Y]: agreed=" + _agreedProtocol + " IP: " + _from
+ " nonce: " + Base64.encode(nonce) + " tag: "
+ (_connectionTag != null ? Base64.encode(_connectionTag.getData()) : " none")
+ " props: " + opts
+ "\nLine: " + Base64.encode(line));
if (_agreedProtocol <= 0) {
fail("Connection from " + _from + " rejected, since no compatible protocols were found");
return;
}
} catch (IOException ioe) {
fail("Error writing the handshake to " + _from
+ ": " + ioe.getMessage(), ioe);
return;
} catch (DataFormatException dfe) {
fail("Error writing the handshake to " + _from
+ ": " + dfe.getMessage(), dfe);
return;
}
}
/** Set the next tag to <code>H(E(nonce + tag, sessionKey))</code> */
private void updateNextTagExisting() {
byte pre[] = new byte[48];
byte encr[] = _context.AESEngine().encrypt(pre, _key, _iv);
Hash h = _context.sha().calculateHash(encr);
_nextConnectionTag = new ByteArray(h.getData());
}
/**
* We have a valid tag, so use it to do the handshaking. On error, fail()
* appropriately.
*
* @return true if the connection went ok, or false if it failed.
*/
private boolean connectExistingSession() {
// iv to the SHA256 of the tag appended by the nonce.
byte data[] = new byte[36];
System.arraycopy(_connectionTag.getData(), 0, data, 0, 32);
System.arraycopy(_nonce.getData(), 0, data, 32, 4);
Hash h = _context.sha().calculateHash(data);
_iv = new byte[16];
System.arraycopy(h.getData(), 0, _iv, 0, 16);
updateNextTagExisting();
_rawOut = new AESOutputStream(_context, _rawOut, _key, _iv);
_rawIn = new AESInputStream(_context, _rawIn, _key, _iv);
// read: H(nonce)
try {
Hash readHash = new Hash();
readHash.readBytes(_rawIn);
Hash expected = _context.sha().calculateHash(_nonce.getData());
if (!expected.equals(readHash)) {
fail("Verification hash failed from " + _from);
return false;
}
} catch (IOException ioe) {
fail("Error reading the encrypted nonce from " + _from
+ ": " + ioe.getMessage(), ioe);
return false;
} catch (DataFormatException dfe) {
fail("Error reading the encrypted nonce from " + _from
+ ": " + dfe.getMessage(), dfe);
return false;
}
// send: H(tag)
try {
Hash tagHash = _context.sha().calculateHash(_connectionTag.getData());
tagHash.writeBytes(_rawOut);
_rawOut.flush();
} catch (IOException ioe) {
fail("Error writing the encrypted tag to " + _from
+ ": " + ioe.getMessage(), ioe);
return false;
} catch (DataFormatException dfe) {
fail("Error writing the encrypted tag to " + _from
+ ": " + dfe.getMessage(), dfe);
return false;
}
long clockSkew = 0;
// read: routerInfo + currentTime + H(routerInfo + currentTime + nonce + tag)
try {
RouterInfo peer = new RouterInfo();
peer.readBytes(_rawIn);
Date now = DataHelper.readDate(_rawIn);
Hash readHash = new Hash();
readHash.readBytes(_rawIn);
ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
peer.writeBytes(baos);
DataHelper.writeDate(baos, now);
baos.write(_nonce.getData());
baos.write(_connectionTag.getData());
Hash expectedHash = _context.sha().calculateHash(baos.toByteArray());
if (!expectedHash.equals(readHash)) {
fail("Invalid hash read for the info from " + _from);
return false;
}
_actualPeer = peer;
clockSkew = _context.clock().now() - now.getTime();
} catch (IOException ioe) {
fail("Error reading the peer info from " + _from
+ ": " + ioe.getMessage(), ioe);
return false;
} catch (DataFormatException dfe) {
fail("Error reading the peer info from " + _from
+ ": " + dfe.getMessage(), dfe);
return false;
}
// verify routerInfo
boolean reachable = verifyReachability();
// send routerInfo + status + properties + H(routerInfo + status + properties + nonce + tag)
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
_context.router().getRouterInfo().writeBytes(baos);
Properties props = new Properties();
int status = -1;
if (!reachable) {
status = 1;
} else if ( (clockSkew > Router.CLOCK_FUDGE_FACTOR)
|| (clockSkew < 0 - Router.CLOCK_FUDGE_FACTOR) ) {
status = 2;
SimpleDateFormat fmt = new SimpleDateFormat("yyyyMMddhhmmssSSS");
props.setProperty("SKEW", fmt.format(new Date(_context.clock().now())));
} else {
status = 0;
}
baos.write(status);
DataHelper.writeProperties(baos, props);
byte beginning[] = baos.toByteArray();
baos.write(_nonce.getData());
baos.write(_connectionTag.getData());
Hash verification = _context.sha().calculateHash(baos.toByteArray());
_rawOut.write(beginning);
verification.writeBytes(_rawOut);
_rawOut.flush();
return handleStatus(status, clockSkew);
} catch (IOException ioe) {
fail("Error writing the peer info to " + _from
+ ": " + ioe.getMessage(), ioe);
return false;
} catch (DataFormatException dfe) {
fail("Error writing the peer info to " + _from
+ ": " + dfe.getMessage(), dfe);
return false;
}
}
/**
*
* We do not have a valid tag, so DH then do the handshaking. On error,
* fail() appropriately.
*
* @return true if the connection went ok, or false if it failed.
*/
private boolean connectNewSession() {
DHSessionKeyBuilder builder = null;
try {
builder = DHSessionKeyBuilder.exchangeKeys(_rawIn, _rawOut);
} catch (IOException ioe) {
fail("Error exchanging keys with " + _from);
return false;
}
// load up the key initialize the encrypted streams
_key = builder.getSessionKey();
byte extra[] = builder.getExtraBytes().getData();
_iv = new byte[16];
System.arraycopy(extra, 0, _iv, 0, 16);
byte nextTag[] = new byte[32];
System.arraycopy(extra, 16, nextTag, 0, 32);
_nextConnectionTag = new ByteArray(nextTag);
if (_log.shouldLog(Log.DEBUG))
_log.debug("\nNew session[Y]: key=" + _key.toBase64() + " iv="
+ Base64.encode(_iv) + " nonce=" + Base64.encode(_nonce.getData())
+ " socket: " + _socket);
_rawOut = new AESOutputStream(_context, _rawOut, _key, _iv);
_rawIn = new AESInputStream(_context, _rawIn, _key, _iv);
// read: H(nonce)
try {
Hash h = new Hash();
h.readBytes(_rawIn);
Hash expected = _context.sha().calculateHash(_nonce.getData());
if (!expected.equals(h)) {
fail("Hash after negotiation from " + _from + " does not match");
return false;
}
} catch (IOException ioe) {
fail("Error reading the hash from " + _from
+ ": " + ioe.getMessage(), ioe);
return false;
} catch (DataFormatException dfe) {
fail("Error reading the hash from " + _from
+ ": " + dfe.getMessage(), dfe);
return false;
}
// send: H(nextTag)
try {
Hash h = _context.sha().calculateHash(_nextConnectionTag.getData());
h.writeBytes(_rawOut);
_rawOut.flush();
} catch (IOException ioe) {
fail("Error writing the hash to " + _from
+ ": " + ioe.getMessage(), ioe);
return false;
} catch (DataFormatException dfe) {
fail("Error writing the hash to " + _from
+ ": " + dfe.getMessage(), dfe);
return false;
}
long clockSkew = 0;
boolean sigOk = false;
// read: routerInfo + currentTime
// + S(routerInfo + currentTime + nonce + nextTag, routerIdent.signingKey)
try {
RouterInfo info = new RouterInfo();
info.readBytes(_rawIn);
Date now = DataHelper.readDate(_rawIn);
Signature sig = new Signature();
sig.readBytes(_rawIn);
ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
info.writeBytes(baos);
DataHelper.writeDate(baos, now);
baos.write(_nonce.getData());
baos.write(_nextConnectionTag.getData());
sigOk = _context.dsa().verifySignature(sig, baos.toByteArray(),
info.getIdentity().getSigningPublicKey());
clockSkew = _context.clock().now() - now.getTime();
_actualPeer = info;
} catch (IOException ioe) {
fail("Error reading the info from " + _from
+ ": " + ioe.getMessage(), ioe);
return false;
} catch (DataFormatException dfe) {
fail("Error reading the info from " + _from
+ ": " + dfe.getMessage(), dfe);
return false;
}
// verify routerInfo
boolean reachable = verifyReachability();
// send: routerInfo + status + properties
// + S(routerInfo + status + properties + nonce + nextTag, routerIdent.signingKey)
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
_context.router().getRouterInfo().writeBytes(baos);
Properties props = new Properties();
int status = -1;
if (!reachable) {
status = 1;
} else if ( (clockSkew > Router.CLOCK_FUDGE_FACTOR)
|| (clockSkew < 0 - Router.CLOCK_FUDGE_FACTOR) ) {
status = 2;
SimpleDateFormat fmt = new SimpleDateFormat("yyyyMMddhhmmssSSS");
props.setProperty("SKEW", fmt.format(new Date(_context.clock().now())));
} else if (!sigOk) {
status = 3;
} else {
status = 0;
}
baos.write(status);
DataHelper.writeProperties(baos, props);
byte beginning[] = baos.toByteArray();
baos.write(_nonce.getData());
baos.write(_nextConnectionTag.getData());
Signature sig = _context.dsa().sign(baos.toByteArray(),
_context.keyManager().getSigningPrivateKey());
_rawOut.write(beginning);
sig.writeBytes(_rawOut);
_rawOut.flush();
return handleStatus(status, clockSkew);
} catch (IOException ioe) {
fail("Error writing the info to " + _from
+ ": " + ioe.getMessage(), ioe);
return false;
} catch (DataFormatException dfe) {
fail("Error writing the info to " + _from
+ ": " + dfe.getMessage(), dfe);
return false;
}
}
/**
* Act according to the status code, failing as necessary and returning
* whether we should continue going or not.
*
* @return true if we should keep going.
*/
private boolean handleStatus(int status, long clockSkew) {
switch (status) {
case 0: // ok
return true;
case 1:
fail("Peer " + _actualPeer.getIdentity().calculateHash().toBase64().substring(0,6)
+ " at " + _from + " is unreachable");
return false;
case 2:
fail("Peer " + _actualPeer.getIdentity().calculateHash().toBase64().substring(0,6)
+ " was skewed by " + DataHelper.formatDuration(clockSkew));
return false;
case 3:
fail("Forged signature on " + _from + " pretending to be "
+ _actualPeer.getIdentity().calculateHash().toBase64().substring(0,6));
return false;
default:
fail("Unknown error verifying "
+ _actualPeer.getIdentity().calculateHash().toBase64().substring(0,6)
+ ": " + status);
return false;
}
}
/**
* Can the peer be contacted on their public addresses? If so,
* be sure to set _remoteAddress. We can do this without branching onto
* another thread because we already have a timer killing this handler if
* it takes too long
*/
private boolean verifyReachability() {
if (_actualPeer == null) return false;
_remoteAddress = new TCPAddress(_actualPeer.getTargetAddress(TCPTransport.STYLE));
//if (true) return true;
Socket s = null;
try {
s = new Socket(_remoteAddress.getAddress(), _remoteAddress.getPort());
OutputStream out = s.getOutputStream();
InputStream in = s.getInputStream();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Beginning verification of reachability");
// send: 0xFFFF + #versions + v1 [+ v2 [etc]] + properties
out.write(0xFF);
out.write(0xFF);
out.write(TCPTransport.SUPPORTED_PROTOCOLS.length);
for (int i = 0; i < TCPTransport.SUPPORTED_PROTOCOLS.length; i++)
out.write(TCPTransport.SUPPORTED_PROTOCOLS[i]);
DataHelper.writeProperties(out, null);
out.flush();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Verification of reachability request sent");
// read: 0xFFFF + versionOk + #bytesIP + IP + currentTime + properties
int ok = in.read();
if (ok != 0xFF)
throw new IOException("Unable to verify the peer - invalid response");
ok = in.read();
if (ok != 0xFF)
throw new IOException("Unable to verify the peer - invalid response");
int version = in.read();
if (version == -1)
throw new IOException("Unable to verify the peer - invalid version");
if (version == 0)
throw new IOException("Unable to verify the peer - no matching version");
int numBytes = in.read();
if ( (numBytes == -1) || (numBytes > 32) )
throw new IOException("Unable to verify the peer - invalid num bytes");
byte ip[] = new byte[numBytes];
int read = DataHelper.read(in, ip);
if (read != numBytes)
throw new IOException("Unable to verify the peer - invalid num bytes");
Date now = DataHelper.readDate(in);
Properties opts = DataHelper.readProperties(in);
return true;
} catch (IOException ioe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Error verifying "
+ _actualPeer.getIdentity().calculateHash().toBase64().substring(0,6)
+ "at " + _remoteAddress, ioe);
return false;
} catch (DataFormatException dfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Error verifying "
+ _actualPeer.getIdentity().calculateHash().toBase64().substring(0,6)
+ "at " + _remoteAddress, dfe);
return false;
}
}
/**
* The peer contacting us is just testing us. Verify that we are reachable
* by following the protocol, then close the socket. This is called only
* after reading the initial 0xFF.
*
*/
private void handleTest() {
try {
// read: #versions + v1 [+ v2 [etc]] + properties
int numVersions = _rawIn.read();
if (numVersions == -1) throw new IOException("Unable to read versions");
if (numVersions > 256) throw new IOException("Too many versions");
int versions[] = new int[numVersions];
for (int i = 0; i < numVersions; i++) {
versions[i] = _rawIn.read();
if (versions[i] == -1)
throw new IOException("Not enough versions");
}
Properties opts = DataHelper.readProperties(_rawIn);
int version = 0;
for (int i = 0; i < versions.length && version == 0; i++) {
for (int j = 0; j < TCPTransport.SUPPORTED_PROTOCOLS.length; j++) {
if (TCPTransport.SUPPORTED_PROTOCOLS[j] == versions[i]) {
version = versions[i];
break;
}
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("HandleTest: version=" + version + " opts=" +opts);
// send: 0xFF + versionOk + #bytesIP + IP + currentTime + properties
_rawOut.write(0xFF);
_rawOut.write(0xFF);
_rawOut.write(version);
byte ip[] = _from.getBytes();
_rawOut.write(ip.length);
_rawOut.write(ip);
DataHelper.writeDate(_rawOut, new Date(_context.clock().now()));
DataHelper.writeProperties(_rawOut, null);
_rawOut.flush();
if (_log.shouldLog(Log.DEBUG))
_log.debug("HandleTest: result flushed");
} catch (IOException ioe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Unable to verify test connection from " + _from, ioe);
} catch (DataFormatException dfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Unable to verify test connection from " + _from, dfe);
} finally {
if (_rawIn != null) try { _rawIn.close(); } catch (IOException ioe) {}
if (_rawOut != null) try { _rawOut.close(); } catch (IOException ioe) {}
if (_socket != null) try { _socket.close(); } catch (IOException ioe) {}
_socket = null;
_rawIn = null;
_rawOut = null;
_agreedProtocol = -1;
_nonce = null;
_connectionTag = null;
_actualPeer = null;
_testComplete = true;
}
// send: 0xFF + versionOk + #bytesIP + IP + currentTime + properties
}
/**
* Finish up the establishment (wrapping the streams, storing the netDb,
* persisting the connection tags, etc)
*
*/
private void establishComplete() {
// todo: add bw limiter
_connectionIn = _rawIn;
_connectionOut = _rawOut;
Hash peer = _actualPeer.getIdentity().getHash();
_context.netDb().store(peer, _actualPeer);
_transport.getTagManager().replaceTag(peer, _nextConnectionTag, _key);
}
public String getError() { return _error; }
public boolean getTestComplete() { return _testComplete; }
/**
* Kill the handler, closing all sockets and streams, setting everything
* back to failure states, and setting the given error.
*
*/
private void fail(String error) {
fail(error, null);
}
private void fail(String error, Exception e) {
if (_error == null) // only grab the first error
_error = error;
if (_rawIn != null) try { _rawIn.close(); } catch (IOException ioe) {}
if (_rawOut != null) try { _rawOut.close(); } catch (IOException ioe) {}
if (_socket != null) try { _socket.close(); } catch (IOException ioe) {}
_socket = null;
_rawIn = null;
_rawOut = null;
_agreedProtocol = -1;
_nonce = null;
_connectionTag = null;
_actualPeer = null;
if (_log.shouldLog(Log.WARN))
_log.warn(error, e);
}
}

View File

@ -0,0 +1,84 @@
package net.i2p.router.transport.tcp;
import java.io.IOException;
import java.io.OutputStream;
import net.i2p.data.DataFormatException;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.OutNetMessage;
import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
/**
* Push out I2NPMessages across the wire
*
*/
class ConnectionRunner implements Runnable {
private Log _log;
private RouterContext _context;
private TCPConnection _con;
private boolean _keepRunning;
public ConnectionRunner(RouterContext ctx, TCPConnection con) {
_context = ctx;
_log = ctx.logManager().getLog(ConnectionRunner.class);
_con = con;
_keepRunning = false;
}
public void startRunning() {
_keepRunning = true;
String name = "TCP " + _context.routerHash().toBase64().substring(0,6)
+ " to "
+ _con.getRemoteRouterIdentity().calculateHash().toBase64().substring(0,6);
I2PThread t = new I2PThread(this, name);
t.start();
}
public void stopRunning() {
_keepRunning = false;
}
public void run() {
while (_keepRunning && !_con.getIsClosed()) {
OutNetMessage msg = _con.getNextMessage();
if ( (msg == null) && (_keepRunning) ) {
_log.error("next message is null but we should keep running?");
} else {
sendMessage(msg);
}
}
}
private void sendMessage(OutNetMessage msg) {
byte data[] = msg.getMessageData();
if (data == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("message " + msg.getMessageType() + "/" + msg.getMessageId()
+ " expired before it could be sent");
_con.sent(msg, false, 0);
return;
}
OutputStream out = _con.getOutputStream();
boolean ok = false;
long before = -1;
long after = -1;
try {
synchronized (out) {
before = _context.clock().now();
out.write(data);
out.flush();
after = _context.clock().now();
}
ok = true;
} catch (IOException ioe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Error writing out the message", ioe);
}
_con.sent(msg, ok, after - before);
}
}

View File

@ -0,0 +1,64 @@
package net.i2p.router.transport.tcp;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import net.i2p.data.ByteArray;
import net.i2p.data.Hash;
import net.i2p.data.SessionKey;
import net.i2p.router.RouterContext;
/**
* Organize the tags used to connect with peers.
*
*/
public class ConnectionTagManager {
private RouterContext _context;
/** H(routerIdentity) to ByteArray */
private Map _tags;
/** H(routerIdentity) to SessionKey */
private Map _keys;
/** synchronize against this when dealing with the data */
private Object _lock;
public ConnectionTagManager(RouterContext context) {
_context = context;
_tags = new HashMap(128);
_keys = new HashMap(128);
_lock = new Object();
}
/** Retrieve the associated tag (but do not consume it) */
public ByteArray getTag(Hash peer) {
synchronized (_lock) {
return (ByteArray)_tags.get(peer);
}
}
public SessionKey getKey(Hash peer) {
synchronized (_lock) { //
return (SessionKey)_keys.get(peer);
}
}
public SessionKey getKey(ByteArray tag) {
synchronized (_lock) { //
for (Iterator iter = _tags.keySet().iterator(); iter.hasNext(); ) {
Hash peer = (Hash)iter.next();
ByteArray cur = (ByteArray)_tags.get(peer);
if (cur.equals(tag))
return (SessionKey)_keys.get(peer);
}
return null;
}
}
/** Update the tag associated with a peer, dropping the old one */
public void replaceTag(Hash peer, ByteArray newTag, SessionKey key) {
synchronized (_lock) {
_tags.put(peer, newTag);
_keys.put(peer, key);
}
}
}

View File

@ -0,0 +1,37 @@
package net.i2p.router.transport.tcp;
import net.i2p.data.Hash;
import net.i2p.data.RouterIdentity;
import net.i2p.data.i2np.I2NPMessageReader;
import net.i2p.data.i2np.I2NPMessage;
/**
* Receive messages from a message reader and bounce them off to the transport
* for further enqueueing.
*/
public class MessageHandler implements I2NPMessageReader.I2NPMessageEventListener {
private TCPTransport _transport;
private TCPConnection _con;
private RouterIdentity _ident;
private Hash _identHash;
public MessageHandler(TCPTransport transport, TCPConnection con) {
_transport = transport;
_con = con;
_ident = con.getRemoteRouterIdentity();
_identHash = _ident.calculateHash();
}
public void disconnected(I2NPMessageReader reader) {
_con.closeConnection();
}
public void messageReceived(I2NPMessageReader reader, I2NPMessage message, long msToRead) {
_transport.messageReceived(message, _ident, _identHash, msToRead, message.getSize());
}
public void readError(I2NPMessageReader reader, Exception error) {
_con.closeConnection();
}
}

View File

@ -1,338 +0,0 @@
package net.i2p.router.transport.tcp;
/*
* free (adj.): unencumbered; not under the control of others
* Written by jrandom in 2003 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.math.BigInteger;
import java.net.Socket;
import java.util.Date;
import java.util.Iterator;
import net.i2p.crypto.AESInputStream;
import net.i2p.crypto.AESOutputStream;
import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper;
import net.i2p.data.RouterAddress;
import net.i2p.data.RouterIdentity;
import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.BandwidthLimitedInputStream;
import net.i2p.router.transport.BandwidthLimitedOutputStream;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
/**
* TCPConnection that validates the time and protocol version, dropping connection if
* the clocks are too skewed or the versions don't match.
*
*/
class RestrictiveTCPConnection extends TCPConnection {
private Log _log;
public RestrictiveTCPConnection(RouterContext context, Socket s, boolean locallyInitiated) throws IOException {
super(context, s, locallyInitiated);
_log = context.logManager().getLog(RestrictiveTCPConnection.class);
_context.statManager().createRateStat("tcp.establishConnectionTime", "How long does it take for us to successfully establish a connection (either locally or remotely initiated)?", "TCP Transport", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
}
/** passed in the handshake process for the connection, and only equivilant protocols will be accepted */
private final static long PROTO_ID = 13;
/** read / write buffer size */
private final static int BUF_SIZE = 2*1024;
private boolean validateVersion() throws DataFormatException, IOException {
if (_log.shouldLog(Log.DEBUG)) _log.debug("Before validating version");
ByteArrayOutputStream baos = new ByteArrayOutputStream(8);
DataHelper.writeLong(baos, 4, PROTO_ID);
byte encr[] = _context.AESEngine().safeEncrypt(baos.toByteArray(), _key, _iv, 16);
DataHelper.writeLong(_out, 2, encr.length);
_out.write(encr);
if (_log.shouldLog(Log.DEBUG)) _log.debug("Version sent");
// we've sent our version, now read what theirs is
int rlen = (int)DataHelper.readLong(_in, 2);
byte pencr[] = new byte[rlen];
int read = DataHelper.read(_in, pencr);
if (read != rlen)
throw new DataFormatException("Not enough data in peer version");
byte decr[] = _context.AESEngine().safeDecrypt(pencr, _key, _iv);
if (decr == null)
throw new DataFormatException("Unable to decrypt - failed version?");
ByteArrayInputStream bais = new ByteArrayInputStream(decr);
long peerProtoId = DataHelper.readLong(bais, 4);
if (_log.shouldLog(Log.DEBUG)) _log.debug("Version received [" + peerProtoId + "]");
return validateVersion(PROTO_ID, peerProtoId);
}
private boolean validateVersion(long us, long them) throws DataFormatException, IOException {
if (us != them) {
if (_log.shouldLog(Log.ERROR))
_log.error("INVALID PROTOCOL VERSIONS! us = " + us + " them = " + them + ": " + _remoteIdentity.getHash());
if (them > us)
_context.router().setHigherVersionSeen(true);
return false;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Valid protocol version: us = " + us + " them = " + them + ": " + _remoteIdentity.getHash());
return true;
}
}
private boolean validateTime() throws DataFormatException, IOException {
Date now = new Date(_context.clock().now());
ByteArrayOutputStream baos = new ByteArrayOutputStream(8);
DataHelper.writeDate(baos, now);
byte encr[] = _context.AESEngine().safeEncrypt(baos.toByteArray(), _key, _iv, 16);
DataHelper.writeLong(_out, 2, encr.length);
_out.write(encr);
// we've sent our date, now read what theirs is
int rlen = (int)DataHelper.readLong(_in, 2);
byte pencr[] = new byte[rlen];
int read = DataHelper.read(_in, pencr);
if (read != rlen)
throw new DataFormatException("Not enough data in peer date");
byte decr[] = _context.AESEngine().safeDecrypt(pencr, _key, _iv);
if (decr == null)
throw new DataFormatException("Unable to decrypt - failed date?");
ByteArrayInputStream bais = new ByteArrayInputStream(decr);
Date theirNow = DataHelper.readDate(bais);
long diff = now.getTime() - theirNow.getTime();
if ( (diff > Router.CLOCK_FUDGE_FACTOR) || (diff < (0-Router.CLOCK_FUDGE_FACTOR)) ) {
if (_log.shouldLog(Log.ERROR))
_log.error("Peer is out of time sync by " + DataHelper.formatDuration(diff)
+ "! They think it is " + theirNow + ", we think it is "
+ new Date(_context.clock().now()) + ": " + _remoteIdentity.getHash(),
new Exception("Time sync error - please make sure your clock is correct!"));
return false;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Peer sync difference: " + diff + "ms: " + _remoteIdentity.getHash());
return true;
}
}
/**
* Exchange TCP addresses, and if we're didn't establish this connection, validate
* the peer with validatePeerAddresses(TCPAddress[]).
*
* @return true if the peer is valid (and reachable)
*/
private boolean validatePeerAddress() throws DataFormatException, IOException {
if (_log.shouldLog(Log.DEBUG)) _log.debug("Before sending my addresses");
TCPAddress me[] = _transport.getMyAddresses();
ByteArrayOutputStream baos = new ByteArrayOutputStream(256);
if (_log.shouldLog(Log.DEBUG)) _log.debug("Sending " + me.length + " addresses");
DataHelper.writeLong(baos, 1, me.length);
for (int i = 0; i < me.length; i++) {
DataHelper.writeString(baos, me[i].getHost());
DataHelper.writeLong(baos, 2, me[i].getPort());
if (_log.shouldLog(Log.DEBUG)) _log.debug("Sent my address [" + me[i].getHost() + ":" + me[i].getPort() + "]");
}
if (_log.shouldLog(Log.DEBUG)) _log.debug("Sent my " + me.length + " addresses");
byte encr[] = _context.AESEngine().safeEncrypt(baos.toByteArray(), _key, _iv, 256);
DataHelper.writeLong(_out, 2, encr.length);
_out.write(encr);
// we've sent our addresses, now read their addresses
int rlen = (int)DataHelper.readLong(_in, 2);
byte pencr[] = new byte[rlen];
int read = DataHelper.read(_in, pencr);
if (read != rlen)
throw new DataFormatException("Not enough data in peer addresses");
byte decr[] = _context.AESEngine().safeDecrypt(pencr, _key, _iv);
if (decr == null)
throw new DataFormatException("Unable to decrypt - invalid addresses?");
ByteArrayInputStream bais = new ByteArrayInputStream(decr);
long numAddresses = DataHelper.readLong(bais, 1);
if (_log.shouldLog(Log.DEBUG)) _log.debug("Peer will send us " + numAddresses + " addresses");
TCPAddress peer[] = new TCPAddress[(int)numAddresses];
for (int i = 0; i < peer.length; i++) {
String host = DataHelper.readString(bais);
int port = (int)DataHelper.readLong(bais, 2);
peer[i] = new TCPAddress(host, port);
if (_log.shouldLog(Log.DEBUG)) _log.debug("Received peer address [" + peer[i].getHost() + ":" + peer[i].getPort() + "]");
}
// ok, we've received their addresses, now we determine whether we need to
// validate them or not
if (weInitiatedConnection()) {
if (_log.shouldLog(Log.DEBUG)) _log.debug("We initiated the connection, so no need to validate");
return true; // we connected to them, so we know we can, um, connect to them
} else {
if (_log.shouldLog(Log.DEBUG)) _log.debug("We received the connection, so validate");
boolean valid = validatePeerAddresses(peer);
if (_log.shouldLog(Log.DEBUG)) _log.debug("We received the connection, validated? " + valid);
return valid;
}
}
/**
* They connected to us, but since we don't want to deal with restricted route topologies
* (yet), we want to make sure *they* are reachable by other people. In the long run, we'll
* likely want to test this by routing messages through random peers to see if *they* can
* contact them (but only when we want to determine whether to use them as a gateway, etc).
*
* Oh, I suppose I should explain what this method does, not just why. Ok, this iterates
* through all of the supplied TCP addresses attempting to open a socket. If it receives
* any data on that socket, we'll assume their address is valid and we're satisfied. (yes,
* this means it could point at random addresses, etc - this is not sufficient for dealing
* with hostile peers, just with misconfigured peers). If we can't find a peer address that
* we can connect to, they suck and can go eat worms.
*
*/
private boolean validatePeerAddresses(TCPAddress addresses[]) throws DataFormatException, IOException {
if (_log.shouldLog(Log.DEBUG)) _log.debug("Before validating peer addresses [" + addresses.length + "]...");
for (int i = 0; i < addresses.length; i++) {
if (_log.shouldLog(Log.DEBUG)) _log.debug("Before validating peer address (" + addresses[i].getHost() + ":"+ addresses[i].getPort() + ")...");
boolean ok = sendsUsData(addresses[i]);
if (_log.shouldLog(Log.DEBUG)) _log.debug("Before validating peer address (" + addresses[i].getHost() + ":"+ addresses[i].getPort() + ") [" + ok + "]...");
if (ok) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Peer address " + addresses[i].getHost() + ":" + addresses[i].getPort() + " validated!");
return true;
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Peer address " + addresses[i].getHost() + ":" + addresses[i].getPort() + " could NOT be validated");
}
}
if (_log.shouldLog(Log.WARN))
_log.warn("None of the peer addresses could be validated!");
return false;
}
private boolean sendsUsData(TCPAddress peer) {
SocketCreator creator = new SocketCreator(peer.getHost(), peer.getPort(), false);
// blocking call, timing out after the SOCKET_CREATE_TIMEOUT if there
// isn't a definitive yes or no on whether the peer is running I2NP or not
// the call closes the socket created regardless
boolean established = creator.verifyReachability(TCPTransport.SOCKET_CREATE_TIMEOUT);
if (_log.shouldLog(Log.DEBUG))
_log.debug("After joining socket creator via peer callback [could establish? " + established + "]");
return established;
}
public RouterIdentity establishConnection() {
long start = _context.clock().now();
long success = 0;
if (_log.shouldLog(Log.DEBUG)) _log.debug("Establishing connection...");
BigInteger myPub = _builder.getMyPublicValue();
try {
_socket.setSoTimeout(ESTABLISHMENT_TIMEOUT);
if (_log.shouldLog(Log.DEBUG)) _log.debug("Before key exchange...");
exchangeKey();
if (_log.shouldLog(Log.DEBUG)) _log.debug("Key exchanged...");
// key exchanged. now say who we are and prove it
boolean ok = identifyStationToStation();
if (_log.shouldLog(Log.DEBUG)) _log.debug("After station to station [" + ok + "]...");
if (!ok) {
throw new DataFormatException("Station to station identification failed! MITM?");
}
if (_log.shouldLog(Log.DEBUG)) _log.debug("before validateVersion...");
boolean versionOk = validateVersion();
if (_log.shouldLog(Log.DEBUG)) _log.debug("after validateVersion [" + versionOk + "]...");
if (!versionOk) {
// not only do we remove the reference to the invalid peer
_context.netDb().fail(_remoteIdentity.getHash());
// but we make sure that we don't try to talk to them soon even if we get a new ref
_context.shitlist().shitlistRouter(_remoteIdentity.getHash(), "Invalid protocol version");
throw new DataFormatException("Peer uses an invalid version! dropping");
}
if (_log.shouldLog(Log.DEBUG)) _log.debug("before validateTime...");
boolean timeOk = validateTime();
if (_log.shouldLog(Log.DEBUG)) _log.debug("after validateTime [" + timeOk + "]...");
if (!timeOk) {
_context.shitlist().shitlistRouter(_remoteIdentity.getHash(), "Time too far out of sync");
throw new DataFormatException("Peer is too far out of sync with the current router's clock! dropping");
}
try {
_context.netDb().store(_remoteIdentity.getHash(), _remoteInfo);
} catch (IllegalArgumentException iae) {
if (_log.shouldLog(Log.ERROR))
_log.error("Peer gave us invalid router info", iae);
// not only do we remove the reference to the invalid peer
_context.netDb().fail(_remoteIdentity.getHash());
// but we make sure that we don't try to talk to them soon even if we get a new ref
_context.shitlist().shitlistRouter(_remoteIdentity.getHash(), "Invalid peer info");
throw new DataFormatException("Invalid peer info provided");
}
if (_log.shouldLog(Log.DEBUG)) _log.debug("before validate peer address...");
boolean peerReachable = validatePeerAddress();
if (_log.shouldLog(Log.DEBUG)) _log.debug("after validatePeerAddress [" + peerReachable + "]...");
if (!peerReachable) {
_context.shitlist().shitlistRouter(_remoteIdentity.getHash(), "Unreachable address");
throw new DataFormatException("Peer provided us with an unreachable router address, and we can't handle restricted routes yet! dropping");
}
if (_log.shouldLog(Log.INFO))
_log.info("TCP connection " + _id + " established with " + _remoteIdentity.getHash().toBase64());
_in = new BandwidthLimitedInputStream(_context, new AESInputStream(_context, _in, _key, _iv), _remoteIdentity);
_out = new AESOutputStream(_context, new BufferedOutputStream(new BandwidthLimitedOutputStream(_context, _out, _remoteIdentity), BUF_SIZE), _key, _iv);
_socket.setSoTimeout(0);
success = _context.clock().now();
for (Iterator iter = _remoteInfo.getAddresses().iterator(); iter.hasNext(); ) {
RouterAddress curAddr = (RouterAddress)iter.next();
if (TCPTransport.STYLE.equals(curAddr.getTransportStyle())) {
_remoteAddress = new TCPAddress(curAddr);
break;
}
}
if (_remoteAddress == null) {
throw new DataFormatException("wtf, no TCP addresses? we already verified!");
}
established();
return _remoteIdentity;
} catch (IOException ioe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Error establishing connection with " + _remoteHost + ":" + _remotePort, ioe);
closeConnection();
return null;
} catch (DataFormatException dfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Error establishing connection with " + _remoteHost + ":" + _remotePort, dfe);
closeConnection();
return null;
} catch (Throwable t) {
if (_log.shouldLog(Log.ERROR))
_log.error("jrandom is paranoid so we're catching it all during establishConnection " + _remoteHost + ":" + _remotePort, t);
closeConnection();
return null;
} finally {
if (success > 0)
_context.statManager().addRateData("tcp.establishConnectionTime", success-start, success-start);
}
}
}

View File

@ -1,217 +0,0 @@
package net.i2p.router.transport.tcp;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
/**
* Helper class to coordinate the creation of sockets to I2P routers
*
*/
class SocketCreator implements SimpleTimer.TimedEvent {
private final static Log _log = new Log(SocketCreator.class);
private String _host;
private int _port;
private Socket _socket;
private boolean _keepOpen;
private boolean _established;
private long _created;
private long _timeoutMs;
private String _caller;
public SocketCreator(String host, int port) {
this(host, port, true);
}
public SocketCreator(String host, int port, boolean keepOpen) {
_host = host;
_port = port;
_socket = null;
_keepOpen = keepOpen;
_established = false;
_created = System.currentTimeMillis();
}
public Socket getSocket() { return _socket; }
public boolean couldEstablish() { return _established; }
/** the first byte sent and received must be 0x42 */
public final static int I2P_FLAG = 0x42;
/** sent if we arent trying to talk */
private final static int NOT_I2P_FLAG = 0x2B;
/**
* Blocking call to determine whether the socket configured can be reached
* (and whether it is a valid I2P router). The socket created to test this
* will be closed afterwards.
*
* @param timeoutMs max time to wait for validation
* @return true if the peer is reachable and sends us the I2P_FLAG, false
* otherwise
*/
public boolean verifyReachability(long timeoutMs) {
_timeoutMs = timeoutMs;
_caller = Thread.currentThread().getName();
SimpleTimer.getInstance().addEvent(this, timeoutMs);
checkEstablish();
if (_log.shouldLog(Log.DEBUG))
_log.debug("veriyReachability complete, established? " + _established);
return _established;
}
/**
* Blocking call to establish a socket connection to the peer. After either
* the timeout has expired or the socket has been created, the socket and/or
* its status can be accessed via couldEstablish() and getSocket(),
* respectively. If the socket could not be established in the given time
* frame, the socket is closed.
*
*/
public void establishConnection(long timeoutMs) {
_timeoutMs = timeoutMs;
_caller = Thread.currentThread().getName();
SimpleTimer.getInstance().addEvent(this, timeoutMs);
doEstablish();
if (_log.shouldLog(Log.DEBUG))
_log.debug("EstablishConnection complete, established? " + _established);
}
/**
* Called when the timeout was reached - depending on our configuration and
* whether a connection was established, we may want to tear down the socket.
*
*/
public void timeReached() {
long duration = System.currentTimeMillis() - _created;
if (!_keepOpen) {
if (_socket != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(_caller + ": timeReached(), dont keep open, and we have a socket. kill it ("
+ duration + "ms, delay " + _timeoutMs + ")");
try { _socket.close(); } catch (IOException ioe) {}
_socket = null;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug(_caller + ": timeReached(), dont keep open, but we don't have a socket. noop");
}
} else {
if (_established) {
// noop
if (_log.shouldLog(Log.DEBUG))
_log.debug(_caller + ": timeReached(), keep open, and we have an established socket. noop");
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug(_caller + ": timeReached(), keep open, but we havent established yet. kill the socket! ("
+ duration + "ms, delay " + _timeoutMs + ")");
if (_socket != null) try { _socket.close(); } catch (IOException ioe) {}
_socket = null;
}
}
}
/**
* Create the socket with the intent of keeping it open
*
*/
private void doEstablish() {
try {
_socket = new Socket(_host, _port);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Socket created");
if (_socket == null) return;
OutputStream os = _socket.getOutputStream();
os.write(I2P_FLAG);
os.flush();
if (_log.shouldLog(Log.DEBUG))
_log.debug("I2P flag sent");
if (_socket == null) return;
int val = _socket.getInputStream().read();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Value read: [" + val + "] == flag? [" + I2P_FLAG + "]");
if (val != I2P_FLAG) {
if (_socket != null)
_socket.close();
_socket = null;
}
_established = true;
return;
} catch (UnknownHostException uhe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Error establishing connection to " + _host + ':' + _port, uhe);
if (_socket != null) try { _socket.close(); } catch (IOException ioe2) {}
_socket = null;
return;
} catch (IOException ioe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Error establishing connection to " + _host + ':' + _port + ": "+ ioe.getMessage());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Error establishing", ioe);
if (_socket != null) try { _socket.close(); } catch (IOException ioe2) {}
_socket = null;
return;
} catch (Exception e) {
if (_log.shouldLog(Log.WARN))
_log.warn("Unknown error establishing connection to " + _host + ':' + _port + ": " + e.getMessage());
if (_socket != null) try { _socket.close(); } catch (IOException ioe2) {}
_socket = null;
return;
}
}
/**
* Try to establish the connection, but don't actually send the I2P flag. The
* other side will timeout waiting for it and consider it a dropped connection,
* but since they will have sent us the I2P flag first we will still know they are
* reachable.
*
*/
private void checkEstablish() {
try {
_socket = new Socket(_host, _port);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Socket created (but we're not sending the flag, since we're just testing them)");
if (_socket == null) return;
OutputStream os = _socket.getOutputStream();
os.write(NOT_I2P_FLAG);
os.flush();
if (_socket == null) return;
int val = _socket.getInputStream().read();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Value read: [" + val + "] == flag? [" + I2P_FLAG + "]");
if (_socket == null) return;
_socket.close();
_socket = null;
_established = (val == I2P_FLAG);
return;
} catch (UnknownHostException uhe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Error establishing connection to " + _host + ':' + _port, uhe);
if (_socket != null) try { _socket.close(); } catch (IOException ioe) {}
return;
} catch (IOException ioe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Error establishing connection to " + _host + ':' + _port + ": "+ ioe.getMessage());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Error establishing", ioe);
if (_socket != null) try { _socket.close(); } catch (IOException ioe2) {}
_socket = null;
return;
} catch (Exception e) {
if (_log.shouldLog(Log.WARN))
_log.warn("Unknown error establishing connection to " + _host + ':' + _port + ": " + e.getMessage());
if (_socket != null) try { _socket.close(); } catch (IOException ioe) {}
_socket = null;
return;
}
}
}

View File

@ -10,6 +10,7 @@ package net.i2p.router.transport.tcp;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Properties;
import net.i2p.data.DataHelper;
import net.i2p.data.RouterAddress;
@ -44,17 +45,18 @@ public class TCPAddress {
}
public TCPAddress() {
_host = null;
_port = -1;
_addr = null;
_host = null;
_port = -1;
_addr = null;
}
public TCPAddress(InetAddress addr, int port) {
if (addr != null)
_host = addr.getHostAddress();
_addr = addr;
_port = port;
_addr = addr;
_port = port;
}
public TCPAddress(RouterAddress addr) {
if (addr == null) throw new IllegalArgumentException("Null router address");
String host = addr.getOptions().getProperty(PROP_HOST);
@ -80,6 +82,21 @@ public class TCPAddress {
}
}
public RouterAddress toRouterAddress() {
if ( (_host == null) || (_port <= 0) )
return null;
RouterAddress addr = new RouterAddress();
Properties props = new Properties();
props.setProperty(PROP_HOST, _host);
props.setProperty(PROP_PORT, ""+_port);
addr.setOptions(props);
addr.setTransportStyle(TCPTransport.STYLE);
return addr;
}
public String getHost() { return _host; }
public void setHost(String host) { _host = host; }
public InetAddress getAddress() { return _addr; }
@ -88,44 +105,46 @@ public class TCPAddress {
public void setPort(int port) { _port = port; }
public boolean isPubliclyRoutable() {
if (_host == null) return false;
try {
InetAddress addr = InetAddress.getByName(_host);
byte quad[] = addr.getAddress();
if (quad[0] == (byte)127) return false;
if (quad[0] == (byte)10) return false;
if ( (quad[0] == (byte)172) && (quad[1] >= (byte)16) && (quad[1] <= (byte)31) ) return false;
if ( (quad[0] == (byte)192) && (quad[1] == (byte)168) ) return false;
if (quad[0] >= (byte)224) return false; // no multicast
return true; // or at least possible to be true
} catch (Throwable t) {
_log.error("Error checking routability", t);
return false;
}
if (_host == null) return false;
try {
InetAddress addr = InetAddress.getByName(_host);
byte quad[] = addr.getAddress();
if (quad[0] == (byte)127) return false;
if (quad[0] == (byte)10) return false;
if ( (quad[0] == (byte)172) && (quad[1] >= (byte)16) && (quad[1] <= (byte)31) ) return false;
if ( (quad[0] == (byte)192) && (quad[1] == (byte)168) ) return false;
if (quad[0] >= (byte)224) return false; // no multicast
return true; // or at least possible to be true
} catch (Throwable t) {
if (_log.shouldLog(Log.WARN))
_log.warn("Error checking routability", t);
return false;
}
}
public String toString() { return _host + ":" + _port; }
public int hashCode() {
int rv = 0;
rv += _port;
if (_addr != null) rv += _addr.getHostAddress().hashCode();
else
if (_host != null) rv += _host.hashCode();
return rv;
int rv = 0;
rv += _port;
if (_addr != null)
rv += _addr.getHostAddress().hashCode();
else
if (_host != null) rv += _host.hashCode();
return rv;
}
public boolean equals(Object val) {
if ( (val != null) && (val instanceof TCPAddress) ) {
TCPAddress addr = (TCPAddress)val;
if ( (_addr != null) && (_addr.getHostAddress() != null) ) {
return DataHelper.eq(getAddress().getHostAddress(), addr.getAddress().getHostAddress()) &&
(getPort() == addr.getPort());
if ( (val != null) && (val instanceof TCPAddress) ) {
TCPAddress addr = (TCPAddress)val;
if ( (_addr != null) && (_addr.getHostAddress() != null) ) {
return DataHelper.eq(getAddress().getHostAddress(), addr.getAddress().getHostAddress())
&& (getPort() == addr.getPort());
} else {
return DataHelper.eq(getHost(), addr.getHost()) &&
(getPort() == addr.getPort());
return DataHelper.eq(getHost(), addr.getHost())
&& (getPort() == addr.getPort());
}
}
return false;
}
return false;
}
}

View File

@ -1,666 +1,210 @@
package net.i2p.router.transport.tcp;
/*
* free (adj.): unencumbered; not under the control of others
* Written by jrandom in 2003 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.math.BigInteger;
import java.net.InetAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import net.i2p.crypto.AESInputStream;
import net.i2p.crypto.AESOutputStream;
import net.i2p.crypto.DHSessionKeyBuilder;
import net.i2p.data.ByteArray;
import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.data.RouterAddress;
import net.i2p.data.RouterIdentity;
import net.i2p.data.RouterInfo;
import net.i2p.data.SessionKey;
import net.i2p.data.Signature;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.i2np.I2NPMessageReader;
import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.BandwidthLimitedInputStream;
import net.i2p.router.transport.BandwidthLimitedOutputStream;
import net.i2p.router.transport.FIFOBandwidthLimiter;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
import net.i2p.util.NativeBigInteger;
/**
* Wraps a connection - this contains a reader thread (via I2NPMessageReader) and
* a writer thread (ConnectionRunner). The writer reads the pool of outbound
* messages and writes them in order, while the reader fires off events
* Central choke point for a single TCP connection to a single peer.
*
*/
class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
public class TCPConnection {
private Log _log;
protected static int _idCounter = 0;
protected int _id;
protected DHSessionKeyBuilder _builder;
protected Socket _socket;
protected String _remoteHost;
protected int _remotePort;
protected I2NPMessageReader _reader;
protected InputStream _in;
protected OutputStream _out;
protected RouterIdentity _remoteIdentity;
protected RouterInfo _remoteInfo;
protected TCPTransport _transport;
protected ConnectionRunner _runner;
protected List _toBeSent;
protected SessionKey _key;
protected ByteArray _extraBytes;
protected byte[] _iv;
private RouterContext _context;
private RouterIdentity _ident;
private TCPAddress _remoteAddress;
private List _pendingMessages;
private InputStream _in;
private OutputStream _out;
private Socket _socket;
private TCPTransport _transport;
private ConnectionRunner _runner;
private I2NPMessageReader _reader;
private long _started;
private boolean _closed;
private boolean _weInitiated;
private long _created;
protected RouterContext _context;
protected TCPAddress _remoteAddress;
public TCPConnection(RouterContext context, Socket s, boolean locallyInitiated) throws IOException {
_context = context;
_log = context.logManager().getLog(TCPConnection.class);
_context.statManager().createRateStat("tcp.queueSize", "How many messages were already in the queue when a new message was added (only when it wasnt empty)?",
"TCP Transport", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("tcp.writeTimeLarge", "How long it takes to write a message that is over 2K?",
"TCP Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("tcp.writeTimeSmall", "How long it takes to write a message that is under 2K?",
"TCP Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("tcp.writeTimeSlow", "How long it takes to write a message (ignoring messages transferring in under a second)?",
"TCP Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
_id = ++_idCounter;
_weInitiated = locallyInitiated;
public TCPConnection(RouterContext ctx) {
_context = ctx;
_log = ctx.logManager().getLog(TCPConnection.class);
_pendingMessages = new ArrayList(4);
_ident = null;
_remoteAddress = null;
_in = null;
_out = null;
_socket = null;
_transport = null;
_started = -1;
_closed = false;
_socket = s;
_created = -1;
_toBeSent = new ArrayList();
try {
_in = _socket.getInputStream();
_out = _socket.getOutputStream();
} catch (IOException ioe) {
_log.error("Error getting streams for the connection", ioe);
}
_builder = new DHSessionKeyBuilder();
_extraBytes = null;
// sun keeps the socket's InetAddress around after its been closed, but kaffe (and the rest of classpath)
// doesn't, so we've got to check & cache it here if we want to log it later. (kaffe et al are acting per
// spec, btw)
try {
InetAddress addr = s.getInetAddress();
if (addr != null) {
_remoteHost = addr.getHostAddress();
}
_remotePort = s.getPort();
if (locallyInitiated)
_remoteAddress = new TCPAddress(_remoteHost, _remotePort);
} catch (NullPointerException npe) {
throw new IOException("kaffe is being picky since the socket closed too fast...");
}
if (_log.shouldLog(Log.INFO))
_log.info("Connected with peer: " + _remoteHost + ":" + _remotePort);
_runner = new ConnectionRunner(_context, this);
}
/** Who are we talking with (or null if not identified) */
public RouterIdentity getRemoteRouterIdentity() { return _ident; }
/** What is the peer's TCP address (using the IP address not hostname) */
public TCPAddress getRemoteAddress() { return _remoteAddress; }
/** Who are we talking with (or null if not identified) */
public void setRemoteRouterIdentity(RouterIdentity ident) { _ident = ident; }
/** What is the peer's TCP address (using the IP address not hostname) */
public void setRemoteAddress(TCPAddress addr) { _remoteAddress = addr; }
/** how long has this connection been around for, or -1 if it isn't established yet */
public long getLifetime() {
if (_created > 0)
return _context.clock().now() - _created;
else
return -1;
}
protected boolean weInitiatedConnection() { return _weInitiated; }
public RouterIdentity getRemoteRouterIdentity() { return _remoteIdentity; }
int getId() { return _id; }
int getPendingMessageCount() {
synchronized (_toBeSent) {
return _toBeSent.size();
}
}
protected void exchangeKey() throws IOException, DataFormatException {
BigInteger myPub = _builder.getMyPublicValue();
byte myPubBytes[] = myPub.toByteArray();
DataHelper.writeLong(_out, 2, myPubBytes.length);
_out.write(myPubBytes);
int rlen = (int)DataHelper.readLong(_in, 2);
byte peerPubBytes[] = new byte[rlen];
int read = DataHelper.read(_in, peerPubBytes);
if (_log.shouldLog(Log.DEBUG))
_log.debug("rlen: " + rlen + " peerBytes: " + DataHelper.toString(peerPubBytes) + " read: " + read);
BigInteger peerPub = new NativeBigInteger(1, peerPubBytes);
_builder.setPeerPublicValue(peerPub);
_key = _builder.getSessionKey();
_extraBytes = _builder.getExtraBytes();
_iv = new byte[16];
System.arraycopy(_extraBytes.getData(), 0, _iv, 0, Math.min(_extraBytes.getData().length, _iv.length));
if (_log.shouldLog(Log.DEBUG))
_log.debug("Session key: " + _key.toBase64() + " extra bytes: " + _extraBytes.getData().length);
}
protected boolean identifyStationToStation() throws IOException, DataFormatException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(10*1024);
_context.router().getRouterInfo().writeBytes(baos);
Hash keyHash = _context.sha().calculateHash(_key.getData());
keyHash.writeBytes(baos);
Signature sig = _context.dsa().sign(baos.toByteArray(), _context.keyManager().getSigningPrivateKey());
sig.writeBytes(baos);
byte encr[] = _context.AESEngine().safeEncrypt(baos.toByteArray(), _key, _iv, 10*1024);
DataHelper.writeLong(_out, 2, encr.length);
_out.write(encr);
// we've identified ourselves, now read who they are
int rlen = (int)DataHelper.readLong(_in, 2);
byte pencr[] = new byte[rlen];
int read = DataHelper.read(_in, pencr);
if (read != rlen)
throw new DataFormatException("Not enough data in peer ident");
byte decr[] = _context.AESEngine().safeDecrypt(pencr, _key, _iv);
if (decr == null)
throw new DataFormatException("Unable to decrypt - failed exchange?");
ByteArrayInputStream bais = new ByteArrayInputStream(decr);
RouterInfo peer = new RouterInfo();
peer.readBytes(bais);
_remoteIdentity = peer.getIdentity();
Hash peerKeyHash = new Hash();
peerKeyHash.readBytes(bais);
if (!peerKeyHash.equals(keyHash)) {
if (_log.shouldLog(Log.ERROR))
_log.error("Peer tried to spoof!");
return false;
}
Signature rsig = new Signature();
rsig.readBytes(bais);
byte signedData[] = new byte[decr.length - rsig.getData().length];
System.arraycopy(decr, 0, signedData, 0, signedData.length);
boolean valid = _context.dsa().verifySignature(rsig, signedData, _remoteIdentity.getSigningPublicKey());
_remoteInfo = peer;
return valid;
}
protected final static int ESTABLISHMENT_TIMEOUT = 10*1000; // 10 second lag (not necessarily for the entire establish)
public RouterIdentity establishConnection() {
BigInteger myPub = _builder.getMyPublicValue();
try {
_socket.setSoTimeout(ESTABLISHMENT_TIMEOUT);
exchangeKey();
// key exchanged. now say who we are and prove it
boolean ok = identifyStationToStation();
if (!ok)
throw new DataFormatException("Station to station identification failed! MITM?");
else {
if (_log.shouldLog(Log.INFO))
_log.info("TCP connection " + _id + " established with "
+ _remoteIdentity.getHash().toBase64());
_in = new AESInputStream(_context, new BandwidthLimitedInputStream(_context, _in, _remoteIdentity), _key, _iv);
_out = new AESOutputStream(_context, new BandwidthLimitedOutputStream(_context, _out, _remoteIdentity), _key, _iv);
_socket.setSoTimeout(0);
for (Iterator iter = _remoteInfo.getAddresses().iterator(); iter.hasNext(); ) {
RouterAddress curAddr = (RouterAddress)iter.next();
if (TCPTransport.STYLE.equals(curAddr.getTransportStyle())) {
_remoteAddress = new TCPAddress(curAddr);
break;
}
}
if (_remoteAddress == null) {
throw new DataFormatException("wtf, peer " + _remoteIdentity.calculateHash().toBase64()
+ " unreachable? we already verified!");
}
established();
return _remoteIdentity;
}
} catch (IOException ioe) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error establishing connection", ioe);
closeConnection();
return null;
} catch (DataFormatException dfe) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error establishing connection", dfe);
closeConnection();
return null;
} catch (Throwable t) {
if (_log.shouldLog(Log.ERROR))
_log.error("jrandom is paranoid so we're catching it all during establishConnection", t);
closeConnection();
return null;
}
}
protected void established() { _created = _context.clock().now(); }
/**
* Actually start processing the messages on the connection (and reading
* from the peer, of course). This call should not block.
*
*/
public void runConnection() {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Run connection");
_runner = new ConnectionRunner();
Thread t = new I2PThread(_runner);
t.setName("Run Conn [" + _id + "]");
t.setDaemon(true);
t.start();
_reader = new I2NPMessageReader(_context, _in, this, "TCP Read [" + _id + ":" + _transport.getListenPort() + "]");
String name = "TCP Read [" + _ident.calculateHash().toBase64().substring(0,6) + "]";
_reader = new I2NPMessageReader(_context, _in, new MessageHandler(_transport, this), name);
_reader.startReading();
_runner.startRunning();
_started = _context.clock().now();
}
public void setTransport(TCPTransport trans) { _transport = trans; }
/** dont bitch about expiring messages if they don't even last 60 seconds */
private static final long MIN_MESSAGE_LIFETIME_FOR_PENALTY = 60*1000;
public void addMessage(OutNetMessage msg) {
msg.timestamp("TCPConnection.addMessage");
int totalPending = 0;
boolean fail = false;
long beforeAdd = _context.clock().now();
StringBuffer pending = new StringBuffer(64);
List removed = null;
synchronized (_toBeSent) {
for (int i = 0; i < _toBeSent.size(); i++) {
OutNetMessage cur = (OutNetMessage)_toBeSent.get(i);
if (cur.getExpiration() < beforeAdd) {
if (cur.getLifetime() > MIN_MESSAGE_LIFETIME_FOR_PENALTY) {
fail = true;
break;
} else {
// yeah, it expired, so drop it, but it wasn't our
// fault (since it was almost expired when we got it
if (removed == null)
removed = new ArrayList(2);
removed.add(cur);
_toBeSent.remove(i);
i--;
}
}
}
if (!fail) {
_toBeSent.add(msg);
}
totalPending = _toBeSent.size();
pending.append(totalPending).append(": ");
if (fail) {
for (int i = 0; i < totalPending; i++) {
OutNetMessage cur = (OutNetMessage)_toBeSent.get(i);
pending.append(cur.getMessageSize()).append(" byte ");
pending.append(cur.getMessageType()).append(" message added");
pending.append(" added ").append(cur.getLifetime()).append(" ms ago, ");
}
}
// the ConnectionRunner.getNext does a wait() until we have messages
_toBeSent.notifyAll();
}
long afterAdd = _context.clock().now();
if (totalPending >= 2)
_context.statManager().addRateData("tcp.queueSize", totalPending-1, 0);
if (removed != null) {
if (_log.shouldLog(Log.WARN))
_log.warn("messages expired on the queue to " + _remoteIdentity.getHash().toBase64()
+ " but they weren't that old: " + removed.size());
for (int i = 0; i < removed.size(); i++) {
OutNetMessage cur = (OutNetMessage)removed.get(i);
msg.timestamp("TCPConnection.addMessage expired but not our fault");
_transport.afterSend(cur, false, false);
}
}
if (fail) {
if (_log.shouldLog(Log.ERROR))
_log.error("messages expired on the queue to " + _remoteIdentity.getHash().toBase64() + ": " + totalPending);
if (_log.shouldLog(Log.WARN))
_log.warn("messages expired on the queue to " + _remoteIdentity.getHash().toBase64() + ": " + pending.toString());
if (_out instanceof BandwidthLimitedOutputStream) {
BandwidthLimitedOutputStream o = (BandwidthLimitedOutputStream)_out;
FIFOBandwidthLimiter.Request req = o.getCurrentRequest();
if (req != null) {
if (_log.shouldLog(Log.ERROR))
_log.error("When the messages timed out, our outbound con requested "
+ req.getTotalOutboundRequested() + " bytes (" + req.getPendingOutboundRequested()
+ " pending) after waiting " + (_context.clock().now() - req.getRequestTime()) + "ms");
}
}
// do we really want to give them a comm error because they're so.damn.slow reading their stream?
_context.profileManager().commErrorOccurred(_remoteIdentity.getHash());
msg.timestamp("TCPConnection.addMessage saw an expired queued message");
_transport.afterSend(msg, false, false);
// should we really be closing a connection if they're that slow?
// yeah, i think we should.
closeConnection();
} else {
long diff = afterAdd - beforeAdd;
if (diff > 500) {
if (_log.shouldLog(Log.WARN))
_log.warn("Lock contention adding a message: " + diff + "ms to "
+ _remoteIdentity.getHash().toBase64() + ": " + totalPending);
}
msg.timestamp("TCPConnection.addMessage after toBeSent.add and notify");
if (_log.shouldLog(Log.DEBUG))
_log.debug("Add message with toBeSent.size = " + totalPending + " to " + _remoteIdentity.getHash().toBase64());
if (totalPending <= 0) {
if (_log.shouldLog(Log.ERROR))
_log.error("WTF, total pending after adding " + msg.getMessage().getClass().getName() + " <= 0! " + msg);
}
}
}
void closeConnection() {
/**
* Disconnect from the peer immediately. This stops any related helper
* threads, closes all streams, and fails all pending messages. This can
* be called multiple times safely.
*
*/
public synchronized void closeConnection() {
if (_log.shouldLog(Log.INFO))
_log.info("Connection closed", new Exception("Closed by"));
if (_closed) return;
_closed = true;
if (_remoteIdentity != null) {
if (_log.shouldLog(Log.WARN))
_log.warn("Closing the connection to " + _remoteIdentity.getHash().toBase64(),
new Exception("Closed by"));
} else {
if (_socket != null) {
if (_log.shouldLog(Log.WARN)) {
_log.warn("Closing the unestablished connection with "
+ _remoteHost + ":"
+ _remotePort, new Exception("Closed by"));
}
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Closing the unestablished connection", new Exception("Closed by"));
}
}
if (_reader != null) _reader.stopReading();
if (_runner != null) _runner.stopRunning();
_closed = true;
if (_runner != null)
_runner.stopRunning();
if (_reader != null)
_reader.stopReading();
if (_in != null) try { _in.close(); } catch (IOException ioe) {}
if (_out != null) try { _out.close(); } catch (IOException ioe) {}
if (_socket != null) try { _socket.close(); } catch (IOException ioe) {}
if (_toBeSent != null) {
long now = _context.clock().now();
synchronized (_toBeSent) {
for (Iterator iter = _toBeSent.iterator(); iter.hasNext(); ) {
OutNetMessage msg = (OutNetMessage)iter.next();
msg.timestamp("TCPTransport.closeConnection caused fail");
if (_log.shouldLog(Log.WARN))
_log.warn("Connection closed to " + _remoteIdentity.getHash().toBase64()
+ " while the message was sitting on the TCP Connection's queue! too slow by: "
+ (now-msg.getExpiration()) + "ms: " + msg);
_transport.afterSend(msg, false, false);
}
_toBeSent.clear();
}
List msgs = clearPendingMessages();
for (int i = 0; i < msgs.size(); i++) {
OutNetMessage msg = (OutNetMessage)msgs.get(0);
_transport.afterSend(msg, false, true, -1);
}
_transport.connectionClosed(this);
}
List getPendingMessages() {
synchronized (_toBeSent) {
return new ArrayList(_toBeSent);
}
}
public void disconnected(I2NPMessageReader reader) {
if (_log.shouldLog(Log.WARN))
_log.warn("Remote disconnected: " + _remoteIdentity.getHash().toBase64());
closeConnection();
}
public void messageReceived(I2NPMessageReader reader, I2NPMessage message, long msToReceive) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Message received from " + _remoteIdentity.getHash().toBase64());
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream(2*1024);
message.writeBytes(baos);
int size = baos.size();
// this is called by the I2NPMessageReader's thread, so it delays the reading from this peer only
//_log.debug("Delaying inbound for size " + size);
//BandwidthLimiter.getInstance().delayInbound(_remoteIdentity, size);
_transport.messageReceived(message, _remoteIdentity, null, msToReceive, size);
} catch (DataFormatException dfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("How did we read a message that is poorly formatted...", dfe);
} catch (IOException ioe) {
if (_log.shouldLog(Log.WARN))
_log.warn("How did we read a message that can't be written to memory...", ioe);
}
}
public void readError(I2NPMessageReader reader, Exception error) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error reading from stream to " + _remoteIdentity.getHash().toBase64() + ": " + error.getMessage());
if (_log.shouldLog(Log.WARN))
_log.warn("Error reading from stream to " + _remoteIdentity.getHash().toBase64(), error);
}
/**
* If we are taking an absurdly long time to send out a message, drop it
* since we're overloaded.
* Pull off any unsent OutNetMessages from the queue
*
*/
private static final long MAX_LIFETIME_BEFORE_OUTBOUND_EXPIRE = 15*1000;
public List clearPendingMessages() {
List rv = null;
synchronized (_pendingMessages) {
rv = new ArrayList(_pendingMessages);
_pendingMessages.clear();
}
return rv;
}
/**
* Add the given message to the outbound queue, notifying our
* runners that we want to send it.
*
*/
public void addMessage(OutNetMessage msg) {
synchronized (_pendingMessages) {
_pendingMessages.add(msg);
_pendingMessages.notifyAll();
}
}
class ConnectionRunner implements Runnable {
private boolean _running;
public void run() {
_running = true;
while (_running && !_closed) {
OutNetMessage nextMessage = getNext();
if (nextMessage != null) {
boolean sent = doSend(nextMessage);
if (!sent) {
_running = false;
}
}
}
closeConnection();
}
private OutNetMessage getNext() {
OutNetMessage msg = null;
// _running is kept seperate from _closed, since _running refers
// to the ConnectionRunner, while _closed refers to the TCPConnection
// (in case we want to pause running, etc). they both need to be
// checked here, since the connection may be closed before this
// thread even gets started up
while ( (msg == null) && (_running) && (!_closed) ) {
synchronized (_toBeSent) {
if (_toBeSent.size() <= 0) {
try {
_toBeSent.wait();
} catch (InterruptedException ie) {}
}
boolean ancientFound = locked_expireOldMessages();
if (ancientFound) {
_running = false;
return null;
}
if (_toBeSent.size() > 0) {
msg = (OutNetMessage)_toBeSent.remove(0);
}
}
}
return msg;
}
/**
* Fail any messages that have expired on the queue
*
* @return true if any of the messages expired are really really old
* (indicating a hung connection)
*/
private boolean locked_expireOldMessages() {
/**
* Blocking call to retrieve the next pending message. As a side effect,
* this fails messages on the queue that have expired, and in turn never
* returns an expired message.
*
* @return next message or null if the connection has been closed.
*/
OutNetMessage getNextMessage() {
OutNetMessage msg = null;
while ( (msg == null) && (!_closed) ) {
List expired = null;
long now = _context.clock().now();
List timedOut = null;
for (int i = 0; i < _toBeSent.size(); i++) {
OutNetMessage cur = (OutNetMessage)_toBeSent.get(i);
if (cur.getExpiration() < now) {
if (timedOut == null)
timedOut = new ArrayList(2);
timedOut.add(cur);
_toBeSent.remove(i);
i--;
} else {
long lifetime = cur.timestamp("TCPConnection.runner.locked_expireOldMessages still ok with "
+ (i) + " ahead and " + (_toBeSent.size()-i-1)
+ " behind on the queue");
if (lifetime > MAX_LIFETIME_BEFORE_OUTBOUND_EXPIRE) {
cur.timestamp("TCPConnection.runner.locked_expireOldMessages lifetime too long - " + lifetime);
if (timedOut == null)
timedOut = new ArrayList(2);
timedOut.add(cur);
_toBeSent.remove(i);
synchronized (_pendingMessages) {
for (int i = 0; i < _pendingMessages.size(); i++) {
OutNetMessage cur = (OutNetMessage)_pendingMessages.get(i);
if (cur.getExpiration() < now) {
if (expired == null)
expired = new ArrayList(1);
expired.add(cur);
_pendingMessages.remove(i);
i--;
}
}
}
boolean reallySlowFound = false;
if (timedOut != null) {
for (int i = 0; i < timedOut.size(); i++) {
OutNetMessage failed = (OutNetMessage)timedOut.get(i);
if (_log.shouldLog(Log.WARN))
_log.warn("Message " + i + "/" + timedOut.size()
+ " timed out while sitting on the TCP Connection's queue! was too slow by: "
+ (now-failed.getExpiration()) + "ms to "
+ _remoteIdentity.getHash().toBase64() + ": " + failed);
failed.timestamp("TCPConnection.runner.locked_expireOldMessages expired with " + _toBeSent.size() + " left");
_transport.afterSend(failed, false, false);
if (failed.getLifetime() >= MIN_MESSAGE_LIFETIME_FOR_PENALTY)
reallySlowFound = true;
if (_pendingMessages.size() > 0) {
msg = (OutNetMessage)_pendingMessages.remove(0);
} else {
if (expired == null) {
try {
_pendingMessages.wait();
} catch (InterruptedException ie) {}
}
}
}
return reallySlowFound;
}
/**
* send the message
*
* @return true if the message was sent ok, false if the connection b0rked
*/
private boolean doSend(OutNetMessage msg) {
msg.timestamp("TCPConnection.runner.doSend fetched");
long afterExpire = _context.clock().now();
long remaining = msg.getExpiration() - afterExpire;
if (remaining < 0) {
if (_log.shouldLog(Log.WARN))
_log.warn("Message " + msg.getMessageType() + "/" + msg.getMessageId()
+ " expired before doSend (too slow by " + remaining + "ms)");
_transport.afterSend(msg, false, false);
return true;
}
byte data[] = msg.getMessageData();
if (data == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("message " + msg.getMessageType() + "/" + msg.getMessageId()
+ " expired before it could be sent");
_transport.afterSend(msg, false, false);
return true;
}
msg.timestamp("TCPConnection.runner.doSend before sending "
+ data.length + " bytes");
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending " + data.length + " bytes to "
+ _remoteIdentity.getHash().toBase64());
long exp = msg.getMessage().getMessageExpiration().getTime();
long beforeWrite = 0;
try {
synchronized (_out) {
beforeWrite = _context.clock().now();
_out.write(data);
_out.flush();
if (expired != null) {
for (int i = 0; i < expired.size(); i++) {
OutNetMessage cur = (OutNetMessage)expired.get(i);
sent(cur, false, 0);
}
} catch (IOException ioe) {
if (_log.shouldLog(Log.ERROR))
_log.error("IO error writing out a " + data.length + " byte message to "
+ _remoteIdentity.getHash().toBase64());
return false;
}
long end = _context.clock().now();
long timeLeft = exp - end;
msg.timestamp("TCPConnection.runner.doSend sent and flushed " + data.length + " bytes");
if (_log.shouldLog(Log.INFO))
_log.info("Message " + msg.getMessageType()
+ " (expiring in " + timeLeft + "ms) sent to "
+ _remoteIdentity.getHash().toBase64() + " from "
+ _context.routerHash().toBase64()
+ " over connection " + _id + " with " + data.length
+ " bytes in " + (end - afterExpire) + "ms (write took "
+ (end - beforeWrite) + "ms, prepare took "
+ (beforeWrite - afterExpire) + "ms)");
long lifetime = msg.getLifetime();
if (lifetime > 10*1000) {
if (_log.shouldLog(Log.WARN))
_log.warn("The processing of the message took way too long (" + lifetime
+ "ms) - time left (" + timeLeft + ") to "
+ _remoteIdentity.getHash().toBase64() + "\n" + msg.toString());
}
_transport.afterSend(msg, true, (end-beforeWrite));
if (_log.shouldLog(Log.DEBUG))
_log.debug("doSend - message sent completely: "
+ msg.getMessageSize() + " byte " + msg.getMessageType() + " message to "
+ _remoteIdentity.getHash().toBase64());
if (end - afterExpire > 1000) {
if (_log.shouldLog(Log.WARN))
_log.warn("Actual sending took too long ( " + (end-afterExpire)
+ "ms) sending " + data.length + " bytes to "
+ _remoteIdentity.getHash().toBase64());
}
if (data.length > 2*1024)
_context.statManager().addRateData("tcp.writeTimeLarge", end - beforeWrite, end - beforeWrite);
else
_context.statManager().addRateData("tcp.writeTimeSmall", end - beforeWrite, end - beforeWrite);
if (end-beforeWrite > 1*1024)
_context.statManager().addRateData("tcp.writeTimeSlow", end - beforeWrite, end - beforeWrite);
return true;
}
public void stopRunning() {
_running = false;
// stop the wait(...)
synchronized (_toBeSent) {
_toBeSent.notifyAll();
}
}
return msg;
}
/** How long has this connection been active for? */
public long getLifetime() { return _context.clock().now() - _started; }
void setTransport(TCPTransport transport) { _transport = transport; }
/**
* Configure where this connection should read its data from.
* This should have any necessary bandwidth limiting and
* encryption filters already wrapped in it.
*
*/
void setInputStream(InputStream in) { _in = in; }
/**
* Configure where this connection should write its data to.
* This should have any necessary bandwidth limiting and
* encryption filters already wrapped in it.
*
*/
void setOutputStream(OutputStream out) { _out = out; }
/**
* Configure what underlying socket this connection uses.
* This is only referenced when closing the connection, and
* only if it was set.
*/
void setSocket(Socket socket) { _socket = socket; }
/** Where this connection should write its data to. */
OutputStream getOutputStream() { return _out; }
/** Have we been closed already? */
boolean getIsClosed() { return _closed; }
/**
* The message was sent.
*
* @param msg message in question
* @param ok was the message sent ok?
* @param time how long did it take to write the message?
*/
void sent(OutNetMessage msg, boolean ok, long time) {
_transport.afterSend(msg, ok, true, time);
}
}

View File

@ -0,0 +1,43 @@
package net.i2p.router.transport.tcp;
import net.i2p.data.RouterInfo;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
/**
* Build new outbound connections, one at a time. All the heavy lifting is in
* {@link ConnectionBuilder#establishConnection}
*
*/
public class TCPConnectionEstablisher implements Runnable {
private Log _log;
private RouterContext _context;
private TCPTransport _transport;
public TCPConnectionEstablisher(RouterContext ctx, TCPTransport transport) {
_context = ctx;
_transport = transport;
_log = ctx.logManager().getLog(TCPConnectionEstablisher.class);
}
public void run() {
while (true) {
RouterInfo info = _transport.getNextPeer();
ConnectionBuilder cb = new ConnectionBuilder(_context, _transport, info);
TCPConnection con = cb.establishConnection();
if (con != null) {
_transport.connectionEstablished(con);
} else {
_transport.addConnectionErrorMessage(cb.getError());
_context.shitlist().shitlistRouter(info.getIdentity().getHash(), "Unable to contact");
}
// this removes the _pending block on the address and
// identity we attempted to contact. if the peer changed
// identities, any additional _pending blocks will also have
// been cleared above with .connectionEstablished
_transport.establishmentComplete(info);
}
}
}

View File

@ -31,41 +31,52 @@ import net.i2p.util.SimpleTimer;
class TCPListener {
private Log _log;
private TCPTransport _transport;
private TCPAddress _myAddress;
private ServerSocket _socket;
private ListenerRunner _listener;
private RouterContext _context;
private List _pendingSockets;
private List _handlers;
/**
* How many concurrent connection attempts from peers we will try to
* deal with at once.
*/
private static final int CONCURRENT_HANDLERS = 3;
/**
* When things really suck, how long should we wait between attempts to
* listen to the socket?
*/
private final static int MAX_FAIL_DELAY = 5*60*1000;
/** if we're not making progress in 10s, drop 'em */
private final static long HANDLE_TIMEOUT = 10*1000;
/** id generator for the connections */
private static volatile int __handlerId = 0;
public TCPListener(RouterContext context, TCPTransport transport) {
_context = context;
_log = context.logManager().getLog(TCPListener.class);
_myAddress = null;
_transport = transport;
_pendingSockets = new ArrayList(10);
_handlers = new ArrayList(CONCURRENT_HANDLERS);
}
public void setAddress(TCPAddress address) { _myAddress = address; }
public TCPAddress getAddress() { return _myAddress; }
private static final int CONCURRENT_HANDLERS = 3;
public void startListening() {
for (int i = 0; i < CONCURRENT_HANDLERS; i++) {
SocketHandler handler = new SocketHandler();
_handlers.add(handler);
Thread t = new I2PThread(handler);
t.setName("Handler" + i+" [" + _myAddress.getPort()+"]");
TCPAddress addr = _transport.getMyAddress();
if (addr != null) {
_listener = new ListenerRunner(addr);
Thread t = new I2PThread(_listener, "Listener [" + addr.getPort()+"]");
t.setDaemon(true);
t.start();
for (int i = 0; i < CONCURRENT_HANDLERS; i++) {
SocketHandler handler = new SocketHandler();
_handlers.add(handler);
Thread th = new I2PThread(handler, "Handler " + addr.getPort() + ": " + i);
th.setDaemon(true);
th.start();
}
}
_listener = new ListenerRunner();
Thread t = new I2PThread(_listener);
t.setName("Listener [" + _myAddress.getPort()+"]");
t.setDaemon(true);
t.start();
}
public void stopListening() {
@ -75,11 +86,13 @@ class TCPListener {
h.stopHandling();
}
_handlers.clear();
if (_socket != null)
if (_socket != null) {
try {
_socket.close();
_socket = null;
} catch (IOException ioe) {}
}
}
private InetAddress getInetAddress(String host) {
@ -96,13 +109,13 @@ class TCPListener {
}
}
private final static int MAX_FAIL_DELAY = 5*60*1000;
class ListenerRunner implements Runnable {
private boolean _isRunning;
private int _nextFailDelay = 1000;
public ListenerRunner() {
private TCPAddress _myAddress;
public ListenerRunner(TCPAddress address) {
_isRunning = true;
_myAddress = address;
}
public void stopListening() { _isRunning = false; }
@ -111,34 +124,36 @@ class TCPListener {
_log.info("Beginning TCP listener");
int curDelay = 0;
while ( (_isRunning) && (curDelay < MAX_FAIL_DELAY) ) {
while (_isRunning) {
try {
if (_transport.getListenAddressIsValid()) {
_socket = new ServerSocket(_myAddress.getPort(), 5, getInetAddress(_myAddress.getHost()));
} else {
if (_transport.shouldListenToAllInterfaces()) {
_socket = new ServerSocket(_myAddress.getPort());
} else {
InetAddress listenAddr = getInetAddress(_myAddress.getHost());
_socket = new ServerSocket(_myAddress.getPort(), 5, listenAddr);
}
if (_log.shouldLog(Log.INFO))
_log.info("Begin looping for host " + _myAddress.getHost() + ":" + _myAddress.getPort());
curDelay = 0;
loop();
} catch (IOException ioe) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error listening to tcp connection " + _myAddress.getHost() + ":"
if (_log.shouldLog(Log.WARN))
_log.warn("Error listening to tcp connection " + _myAddress.getHost() + ":"
+ _myAddress.getPort(), ioe);
}
if (_socket != null) {
stopListening();
try { _socket.close(); } catch (IOException ioe) {}
_socket = null;
}
if (_log.shouldLog(Log.ERROR))
_log.error("Error listening, waiting " + _nextFailDelay + "ms before we try again");
if (_log.shouldLog(Log.WARN))
_log.warn("Error listening, waiting " + _nextFailDelay + "ms before we try again");
try { Thread.sleep(_nextFailDelay); } catch (InterruptedException ie) {}
curDelay += _nextFailDelay;
_nextFailDelay *= 5;
if (_nextFailDelay > MAX_FAIL_DELAY)
_nextFailDelay = MAX_FAIL_DELAY;
}
if (_log.shouldLog(Log.ERROR))
_log.error("CANCELING TCP LISTEN. delay = " + curDelay);
@ -242,60 +257,31 @@ class TCPListener {
}
}
/** if we're not making progress in 30s, drop 'em */
private final static long HANDLE_TIMEOUT = 10*1000;
private static volatile int __handlerId = 0;
private class TimedHandler implements SimpleTimer.TimedEvent {
private int _handlerId;
private Socket _socket;
private boolean _wasSuccessful;
private boolean _receivedIdentByte;
public TimedHandler(Socket socket) {
_socket = socket;
_wasSuccessful = false;
_handlerId = ++__handlerId;
_receivedIdentByte = false;
}
public int getHandlerId() { return _handlerId; }
public void handle() {
SimpleTimer.getInstance().addEvent(TimedHandler.this, HANDLE_TIMEOUT);
try {
OutputStream os = _socket.getOutputStream();
os.write(SocketCreator.I2P_FLAG);
os.flush();
if (_log.shouldLog(Log.DEBUG))
_log.debug("listener: I2P flag sent");
int val = _socket.getInputStream().read();
if (_log.shouldLog(Log.DEBUG))
_log.debug("listener: Value read: [" + val + "] == flag? [" + SocketCreator.I2P_FLAG + "]");
if (val == -1)
throw new UnsupportedOperationException("Peer disconnected while we were looking for the I2P flag");
if (val != SocketCreator.I2P_FLAG) {
throw new UnsupportedOperationException("Peer connecting to us didn't send the right I2P byte [" + val + "]");
}
_receivedIdentByte = true;
TCPConnection c = new RestrictiveTCPConnection(_context, _socket, false);
_transport.handleConnection(c, null);
ConnectionHandler ch = new ConnectionHandler(_context, _transport, _socket);
TCPConnection con = ch.receiveConnection();
if (con != null) {
_wasSuccessful = true;
_transport.connectionEstablished(con);
} else if (ch.getTestComplete()) {
// not a connection, but we verified the test
_wasSuccessful = true;
} catch (UnsupportedOperationException uoe) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Failed to state they wanted to connect as I2P", uoe);
_wasSuccessful = false;
} catch (IOException ioe) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Error listening to the peer", ioe);
_wasSuccessful = false;
} catch (Throwable t) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error handling", t);
_wasSuccessful = false;
}
if (!_wasSuccessful)
_transport.addConnectionErrorMessage(ch.getError());
}
public boolean wasSuccessful() { return _wasSuccessful; }
public boolean receivedIdentByte() { return _receivedIdentByte; }
/**
* Called after a timeout period - if we haven't already established the
@ -307,13 +293,8 @@ class TCPListener {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handle successful");
} else {
if (receivedIdentByte()) {
if (_log.shouldLog(Log.WARN))
_log.warn("Unable to handle in the time allotted");
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Peer didn't send the ident byte, so either they were testing us, or portscanning");
}
if (_log.shouldLog(Log.WARN))
_log.warn("Unable to handle in the time allotted");
try { _socket.close(); } catch (IOException ioe) {}
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,140 @@
<html><body>
<p>Implements the transport for communicating with other routers via TCP/IP.</p>
<h1>Connection protocol</h1>
<p>The protocol used to establish the connection between the peers is
implemented in the {@link net.i2p.router.transport.tcp.ConnectionBuilder}
for "Alice", the initiator, and in
{@link net.i2p.router.transport.tcp.ConnectionHandler} for "Bob", the
receiving peer. <i>(+ implies concatenation)</i></p>
<h2>Common case:</h2>
<p><b>1) </b> <i>Alice to Bob</i>: <br />
<code>#bytesFollowing + #versions + v1 [+ v2 [etc]] + tag? + tagData + properties</code></p>
<p><b>2) </b> <i>Bob to Alice</i>: <br />
<code>#bytesFollowing + versionOk + #bytesIP + IP + tagOk? + nonce + properties</code></p>
<ul>
<li><code>#bytesFollowing</code> is a 2 byte unsigned integer specifying how many
bytes there are (after the current pair) in the line sent. 0xFFFF is reserved</li>
<li><code>#versions</code> is a 1 byte unsigned integer specifying how many
acceptable 1 byte version numbers follow (preferred value first).</li>
<li><code>v1</code> (etc) is a 1 byte unsigned integer specifying a protocol
version. The value 0x0 is not allowed.</li>
<li><code>tag?</code> is a 1 byte value specifying whether a tag follows - 0x0 means
no tag follows, 0x1 means a 32 byte tag follows.</li>
<li><code>tagData</code> is a 32 byte tag, if necessary</li>
<li><code>properties</code> is a name=value mapping, formatted as the other I2P
mappings (via {@link net.i2p.data.DataHelper#readProperties})</li>
<li><code>versionOk</code> is a 1 byte value specifying the protocol version
that is agreed upon, or 0x0 if no compatible protocol versions are available.</li>
<li><code>#bytesIP</code> is a 2 byte unsigned integer specifying how many bytes
following make up the IP address</li>
<li><code>IP</code> is made up of <code>#bytesIP</code> bytes formatting the
peer who established the connection's IP address as a string (e.g. "192.168.1.1")</li>
<li><code>tagOk?</code> is a 1 byte value specifying whether the tag provided
is available for use - 0x0 means no, 0x1 means yes.</li>
<li><code>nonce</code> is a 4 byte random value</li>
</ul>
<p>Whether or not the <code>tagData</code> is specified by Alice and is accepted
by Bob determines which of the scenarios below are used. In addition, the IP
address provided by Bob gives Alice the opportunity to fire up a socket listener
on that interface and include it in her list of reachable addresses. The
<code>properties</code> mappings are left for future expansion.</p>
<h2>Connection establishment with a valid tag:</h2>
<p>With a valid <code>tag</code> and <code>nonce</code> received, both Alice and
Bob load up the previously negotiated <code>sessionKey</code> and set the
<code>iv</code> to the first 16 bytes of <code>H(tag + nonce)</code>. The
remainder of the communication is AES256 encrypted per
{@link net.i2p.crypto.AESInputStream} and {@link net.i2p.crypto.AESOutputStream}</p>
<p><b>3) </b> <i>Alice to Bob</i>: <br />
<code>H(nonce)</code></p>
<p><b>4) </b> <i>Bob to Alice</i>: <br />
<code>H(tag)</code></p>
<p><b>5) </b> If the hashes are not correct, disconnect immediately and do not
consume the tag</p>
<p><b>6) </b> <i>Alice to Bob</i>: <br />
<code>routerInfo + currentTime + H(routerInfo + currentTime + nonce + tag)</code></p>
<p><b>7) </b> Bob should now verify that he can establish a connection to her through one of the
routerAddresses specified in her RouterInfo. The testing process is described below.</p>
<p><b>8) </b> <i>Bob to Alice</i>: <br />
<code>routerInfo + status + properties + H(routerInfo + status + properties + nonce + tag)</code></p>
<p><b>9) </b> If the <code>status</code> is ok, both Alice and Bob consume the
<code>tagData</code>, updating the next tag to be <code>H(E(nonce + tag, sessionKey))</code>.
Otherwise, both sides disconnect and do not consume the tag. In addition, on error the
<code>properties</code> mapping has a more detailed reason under the key "MESSAGE".</p>
<ul>
<li><code>H(x)</code> is the SHA256 hash of x, formatted per {@link net.i2p.data.Hash#writeBytes}.</li>
<li><code>routerInfo</code> is the serialization of the local router's info
per {@link net.i2p.data.RouterInfo#writeBytes}.</li>
<li><code>currentTime</code> is what the local router thinks the current network time
is, formatted per {@link net.i2p.data.DataHelper#writeDate}.</li>
<li><code>status</code> is a 1 byte value:<ul>
<li><b>0x0</b> means OK</li>
<li><b>0x1</b> means Alice was not reachable</li>
<li><b>0x2</b> means the clock was skewed (Bob's current time may be available
in the properties mapping under "SKEW", formatted as "yyyyMMddhhmmssSSS",
per {@link java.text.SimpleDateFormat}).</li>
<li><b>0x3</b> means the signature is invalid (only used by steps 9 and 11 below)</li>
<li>Other values are currently undefined (yet fatal) errors</li>
</ul></li>
</ul>
<h2>Connection establishment without a vald tag:</h2>
<p><b>3) </b> <i>Alice to Bob</i> <br />
X</p>
<p><b>4) </b> <i>Bob to Alice</i> <br />
Y</p>
<p><b>5) </b> Both sides complete the Diffie-Hellman exchange, setting the
<code>sessionKey</code> to the first 32 bytes of the result (e.g. (X^y mod p)),
<code>iv</code> to the next 16 bytes, and the <code>nextTag</code> to the 32
bytes after that. The rest of the data is AES256 encrypted with those settings per
{@link net.i2p.crypto.AESInputStream} and {@link net.i2p.crypto.AESOutputStream}</p>
<p><b>6) </b> <i>Alice to Bob</i> <br />
<code>H(nonce)</code></p>
<p><b>7) </b> <i>Bob to Alice</i> <br />
<code>H(nextTag)</code></p>
<p><b>8) </b> If they disagree, disconnect immediately and do not persist the tags or keys</p>
<p><b>9) </b> <i>Alice to Bob</i> <br />
<code>routerInfo + currentTime
+ S(routerInfo + currentTime + nonce + nextTag, routerIdent.signingKey)</code></p>
<p><b>10) </b> Bob should now verify that he can establish a connection to her through one of the
routerAddresses specified in her RouterInfo. The testing process is described below.</p>
<p><b>11) </b> <i>Bob to Alice</i> <br />
<code>routerInfo + status + properties
+ S(routerInfo + status + properties + nonce + nextTag, routerIdent.signingKey)</code></p>
<p><b>12) </b> If the signature matches on both sides and <code>status</code> is ok, both sides
save the <code>sessionKey</code> negotiated as well as the <code>nextTag</code>.
Otherwise, the keys and tags are discarded and both sides drop the connection.</p>
<ul>
<li><code>X</code> is a 256 byte unsigned integer in 2s complement, representing
</code>g^x mod p</code> (where <code>g</code> and <code>p</code> are defined
in {@link net.i2p.crypto.CryptoConstants} and x is a randomly chosen value</li>
<li><code>Y</code> is a 256 byte unsigned integer in 2s complement, representing
</code>g^y mod p</code> (where <code>g</code> and <code>p</code> are defined
in {@link net.i2p.crypto.CryptoConstants} and y is a randomly chosen value</li>
<li><code>S(val, key)</code> is the DSA signature of the <code>val</code> using the
given signing <code>key</code> (in this case, the router's signing keys to provide
authentication that they are who they say they are). The signature is formatted
per {@link net.i2p.data.Signature}.</li>
</ul>
<h2>Peer testing</h2>
<p>As mentioned in steps 7 and 10 above, Bob should verify that Alice is reachable
to prevent a restricted route from being formed (he may decide not to do this once
I2P supports restricted routes)</p>
<p><b>1) </b> <i>Bob to Alice</i> <br />
<code>0xFFFF + #versions + v1 [+ v2 [etc]] + properties</p>
<p><b>2) </b> <i>Alice to Bob</i> <br />
<code>0xFFFF + versionOk + #bytesIP + IP + currentTime + properties</code></p>
<p><b>3) </b> Both sides close the socket</p>
</body></html>