diff --git a/build.xml b/build.xml index dad90fdb0d..57f8bddf71 100644 --- a/build.xml +++ b/build.xml @@ -217,24 +217,15 @@ - - - - - - - - - diff --git a/history.txt b/history.txt index 7a1c666222..1733c6e36d 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,14 @@ -$Id: history.txt,v 1.16 2004/09/08 21:26:43 jrandom Exp $ +$Id: history.txt,v 1.17 2004/09/12 22:08:17 jrandom Exp $ + +2004-09-16 jrandom + * Refactor the TCP transport to deal with changing identities gracefully, + and to prevent some wasted effort by keeping track of what host+port + combinations we are connected to (rather than just the identities). Also + catch a few configuration errors earlier. + * Removed no longer relevent methods from the Transport API that were + exposing ideas that probably shouldn't be exposed. + * Removed the 0.4.0.1 specific files from i2pupdate.zip (relating to script + updates) 2004-09-13 jrandom * Update for the SDK reconnection to deal with overflow. diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index ac4eb2adf8..966ba7aa18 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.29 $ $Date: 2004/09/08 21:26:43 $"; + public final static String ID = "$Revision: 1.30 $ $Date: 2004/09/12 22:08:16 $"; public final static String VERSION = "0.4.0.1"; - public final static long BUILD = 1; + public final static long BUILD = 2; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/transport/Transport.java b/router/java/src/net/i2p/router/transport/Transport.java index 3846c8ffc3..1c9a24eb5e 100644 --- a/router/java/src/net/i2p/router/transport/Transport.java +++ b/router/java/src/net/i2p/router/transport/Transport.java @@ -31,9 +31,7 @@ public interface Transport { public void send(OutNetMessage msg); public RouterAddress startListening(); public void stopListening(); - public void rotateAddresses(); public Set getCurrentAddresses(); - public void addAddressInfo(Properties infoForNewAddress); public void setListener(TransportEventListener listener); public String getStyle(); diff --git a/router/java/src/net/i2p/router/transport/TransportBid.java b/router/java/src/net/i2p/router/transport/TransportBid.java index 6c926c6ef9..bc7ff23669 100644 --- a/router/java/src/net/i2p/router/transport/TransportBid.java +++ b/router/java/src/net/i2p/router/transport/TransportBid.java @@ -67,6 +67,7 @@ public class TransportBid { */ public Date getExpiration() { return _bidExpiration; } public void setExpiration(Date expirationDate) { _bidExpiration = expirationDate; } + public void setExpiration(long expirationDate) { setExpiration(new Date(expirationDate)); } /** * Specifies the transport that offered this bid diff --git a/router/java/src/net/i2p/router/transport/tcp/RestrictiveTCPConnection.java b/router/java/src/net/i2p/router/transport/tcp/RestrictiveTCPConnection.java index 857b8aa85f..5621095fc6 100644 --- a/router/java/src/net/i2p/router/transport/tcp/RestrictiveTCPConnection.java +++ b/router/java/src/net/i2p/router/transport/tcp/RestrictiveTCPConnection.java @@ -15,11 +15,13 @@ import java.io.IOException; import java.math.BigInteger; import java.net.Socket; import java.util.Date; +import java.util.Iterator; import net.i2p.crypto.AESInputStream; import net.i2p.crypto.AESOutputStream; import net.i2p.data.DataFormatException; import net.i2p.data.DataHelper; +import net.i2p.data.RouterAddress; import net.i2p.data.RouterIdentity; import net.i2p.router.Router; import net.i2p.router.RouterContext; @@ -298,6 +300,18 @@ class RestrictiveTCPConnection extends TCPConnection { _out = new AESOutputStream(_context, new BufferedOutputStream(new BandwidthLimitedOutputStream(_context, _out, _remoteIdentity), BUF_SIZE), _key, _iv); _socket.setSoTimeout(0); success = _context.clock().now(); + + for (Iterator iter = _remoteInfo.getAddresses().iterator(); iter.hasNext(); ) { + RouterAddress curAddr = (RouterAddress)iter.next(); + if (TCPTransport.STYLE.equals(curAddr.getTransportStyle())) { + _remoteAddress = new TCPAddress(curAddr); + break; + } + } + if (_remoteAddress == null) { + throw new DataFormatException("wtf, no TCP addresses? we already verified!"); + } + established(); return _remoteIdentity; diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPAddress.java b/router/java/src/net/i2p/router/transport/tcp/TCPAddress.java index d081451e64..e21956a4c0 100644 --- a/router/java/src/net/i2p/router/transport/tcp/TCPAddress.java +++ b/router/java/src/net/i2p/router/transport/tcp/TCPAddress.java @@ -9,6 +9,7 @@ package net.i2p.router.transport.tcp; */ import java.net.InetAddress; +import java.net.UnknownHostException; import net.i2p.data.DataHelper; import net.i2p.data.RouterAddress; @@ -28,9 +29,18 @@ public class TCPAddress { public final static String PROP_HOST = "host"; public TCPAddress(String host, int port) { - _host = host; - _port = port; - _addr = null; + try { + if (host != null) { + InetAddress iaddr = InetAddress.getByName(host); + _host = iaddr.getHostAddress(); + _addr = iaddr; + } + _port = port; + } catch (UnknownHostException uhe) { + _host = null; + _port = -1; + _addr = null; + } } public TCPAddress() { @@ -40,24 +50,34 @@ public class TCPAddress { } public TCPAddress(InetAddress addr, int port) { - _host = addr.getHostName(); + if (addr != null) + _host = addr.getHostAddress(); _addr = addr; _port = port; } public TCPAddress(RouterAddress addr) { if (addr == null) throw new IllegalArgumentException("Null router address"); - _host = addr.getOptions().getProperty(PROP_HOST); - String port = addr.getOptions().getProperty(PROP_PORT); - if ( (port != null) && (port.trim().length() > 0) ) { - try { - _port = Integer.parseInt(port); - } catch (NumberFormatException nfe) { - _log.error("Invalid port [" + port + "]", nfe); - _port = -1; - } - } else { - _port = -1; - } + String host = addr.getOptions().getProperty(PROP_HOST); + try { + InetAddress iaddr = InetAddress.getByName(host); + _host = iaddr.getHostAddress(); + _addr = iaddr; + + String port = addr.getOptions().getProperty(PROP_PORT); + if ( (port != null) && (port.trim().length() > 0) ) { + try { + _port = Integer.parseInt(port); + } catch (NumberFormatException nfe) { + _log.error("Invalid port [" + port + "]", nfe); + _port = -1; + } + } else { + _port = -1; + } + } catch (UnknownHostException uhe) { + _host = null; + _port = -1; + } } public String getHost() { return _host; } @@ -84,6 +104,8 @@ public class TCPAddress { } } + public String toString() { return _host + ":" + _port; } + public int hashCode() { int rv = 0; rv += _port; @@ -96,12 +118,13 @@ public class TCPAddress { public boolean equals(Object val) { if ( (val != null) && (val instanceof TCPAddress) ) { TCPAddress addr = (TCPAddress)val; - if (getAddress().getHostAddress() != null) + if ( (_addr != null) && (_addr.getHostAddress() != null) ) { return DataHelper.eq(getAddress().getHostAddress(), addr.getAddress().getHostAddress()) && (getPort() == addr.getPort()); - else + } else { return DataHelper.eq(getHost(), addr.getHost()) && (getPort() == addr.getPort()); + } } return false; } diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java b/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java index 73e30f5ac9..33906cac64 100644 --- a/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java +++ b/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java @@ -14,6 +14,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.math.BigInteger; +import java.net.InetAddress; import java.net.Socket; import java.util.ArrayList; import java.util.Iterator; @@ -26,6 +27,7 @@ import net.i2p.data.ByteArray; import net.i2p.data.DataFormatException; import net.i2p.data.DataHelper; import net.i2p.data.Hash; +import net.i2p.data.RouterAddress; import net.i2p.data.RouterIdentity; import net.i2p.data.RouterInfo; import net.i2p.data.SessionKey; @@ -70,6 +72,7 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener { private boolean _weInitiated; private long _created; protected RouterContext _context; + protected TCPAddress _remoteAddress; public TCPConnection(RouterContext context, Socket s, boolean locallyInitiated) throws IOException { _context = context; @@ -101,8 +104,13 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener { // doesn't, so we've got to check & cache it here if we want to log it later. (kaffe et al are acting per // spec, btw) try { - _remoteHost = s.getInetAddress() + ""; + InetAddress addr = s.getInetAddress(); + if (addr != null) { + _remoteHost = addr.getHostAddress(); + } _remotePort = s.getPort(); + if (locallyInitiated) + _remoteAddress = new TCPAddress(_remoteHost, _remotePort); } catch (NullPointerException npe) { throw new IOException("kaffe is being picky since the socket closed too fast..."); } @@ -110,6 +118,8 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener { _log.info("Connected with peer: " + _remoteHost + ":" + _remotePort); } + public TCPAddress getRemoteAddress() { return _remoteAddress; } + /** how long has this connection been around for, or -1 if it isn't established yet */ public long getLifetime() { if (_created > 0) @@ -215,6 +225,18 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener { _in = new AESInputStream(_context, new BandwidthLimitedInputStream(_context, _in, _remoteIdentity), _key, _iv); _out = new AESOutputStream(_context, new BandwidthLimitedOutputStream(_context, _out, _remoteIdentity), _key, _iv); _socket.setSoTimeout(0); + + for (Iterator iter = _remoteInfo.getAddresses().iterator(); iter.hasNext(); ) { + RouterAddress curAddr = (RouterAddress)iter.next(); + if (TCPTransport.STYLE.equals(curAddr.getTransportStyle())) { + _remoteAddress = new TCPAddress(curAddr); + break; + } + } + if (_remoteAddress == null) { + throw new DataFormatException("wtf, peer " + _remoteIdentity.calculateHash().toBase64() + + " unreachable? we already verified!"); + } established(); return _remoteIdentity; } diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java index 89f64ff63a..f3b25cf476 100644 --- a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java +++ b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java @@ -42,12 +42,17 @@ public class TCPTransport extends TransportImpl { private Log _log; public final static String STYLE = "TCP"; private List _listeners; - private Map _connections; // routerIdentity --> List of TCPConnection + /** RouterIdentity to List of TCPConnections */ + private Map _connections; + /** TCPAddress (w/ IP not hostname) to List of TCPConnections */ + private Map _connectionAddresses; private String _listenHost; private int _listenPort; private RouterAddress _address; + private TCPAddress _tcpAddress; private boolean _listenAddressIsValid; - private Map _msgs; // H(ident) --> PendingMessages for unestablished connections + /** H(ident) to PendingMessages for unestablished connections */ + private Map _msgs; private boolean _running; private int _numConnectionEstablishers; @@ -76,6 +81,7 @@ public class TCPTransport extends TransportImpl { _listeners = new ArrayList(); _connections = new HashMap(); + _connectionAddresses = new HashMap(); _msgs = new HashMap(); _address = address; if (address != null) { @@ -86,6 +92,7 @@ public class TCPTransport extends TransportImpl { } catch (NumberFormatException nfe) { _log.error("Invalid port: " + portStr + " Address: \n" + address, nfe); } + _tcpAddress = new TCPAddress(_listenHost, _listenPort); } _listenAddressIsValid = false; try { @@ -189,6 +196,38 @@ public class TCPTransport extends TransportImpl { RouterAddress addr = (RouterAddress)iter.next(); startEstablish = _context.clock().now(); if (getStyle().equals(addr.getTransportStyle())) { + + TCPAddress tcpAddr = new TCPAddress(addr); + synchronized (_connectionAddresses) { + if (_connectionAddresses.containsKey(tcpAddr)) { + if (_log.shouldLog(Log.WARN)) + _log.warn("We already have a connection to another router at " + tcpAddr); + _context.shitlist().shitlistRouter(target.getIdentity().getHash(), "Duplicate TCP address (changed identities?)"); + _context.netDb().fail(target.getIdentity().getHash()); + return false; + } + } + + if (tcpAddr.equals(_tcpAddress)) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Peer " + target.getIdentity().getHash().toBase64() + + " has OUR address [" + tcpAddr + "]"); + _context.profileManager().commErrorOccurred(target.getIdentity().getHash()); + _context.shitlist().shitlistRouter(target.getIdentity().getHash(), "Points at us"); + _context.netDb().fail(target.getIdentity().getHash()); + return false; + } + + if (!tcpAddr.isPubliclyRoutable() && false) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Peer " + target.getIdentity().getHash().toBase64() + + " has an unroutable address [" + tcpAddr + "]"); + _context.profileManager().commErrorOccurred(target.getIdentity().getHash()); + _context.shitlist().shitlistRouter(target.getIdentity().getHash(), "Unroutable address"); + _context.netDb().fail(target.getIdentity().getHash()); + return false; + } + if (_log.shouldLog(Log.DEBUG)) _log.debug("Establishing a connection with address " + addr); Socket s = createSocket(addr); @@ -384,6 +423,7 @@ public class TCPTransport extends TransportImpl { allCons.add(con); } } + _connectionAddresses.clear(); } for (Iterator iter = allCons.iterator(); iter.hasNext(); ) { TCPConnection con = (TCPConnection)iter.next(); @@ -420,6 +460,13 @@ public class TCPTransport extends TransportImpl { _connections.remove(iter.next()); } } + + TCPAddress address = con.getRemoteAddress(); + if (address != null) { + synchronized (_connectionAddresses) { + _connectionAddresses.remove(address); + } + } if (_log.shouldLog(Log.INFO)) _log.info(buf.toString()); //if (con.getRemoteRouterIdentity() != null) @@ -429,6 +476,30 @@ public class TCPTransport extends TransportImpl { con.setTransport(this); if (_log.shouldLog(Log.DEBUG)) _log.debug("Before establishing connection"); + TCPAddress remAddr = con.getRemoteAddress(); + if (remAddr != null) { + synchronized (_connectionAddresses) { + if (_connectionAddresses.containsKey(remAddr)) { + if (_log.shouldLog(Log.WARN)) + _log.warn("refusing connection from " + remAddr + " as it is a dup"); + con.closeConnection(); + return false; + } + } + + if (_tcpAddress.equals(remAddr)) { + if (_log.shouldLog(Log.WARN)) + _log.warn("refusing connection to ourselves..."); + _context.shitlist().shitlistRouter(target.getIdentity().getHash(), "Our old address"); + _context.netDb().fail(target.getIdentity().getHash()); + con.closeConnection(); + return false; + } + } else { + //if (_log.shouldLog(Log.WARN)) + // _log.warn("Why do we not have a remoteAddress for " + con, new Exception("hrm")); + } + long start = _context.clock().now(); RouterIdentity ident = con.establishConnection(); long afterEstablish = _context.clock().now(); @@ -440,18 +511,26 @@ public class TCPTransport extends TransportImpl { return false; } + if (ident.equals(_context.router().getRouterInfo().getIdentity())) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Dropping established connection with *cough* ourselves: listenHost=[" + + _tcpAddress.getHost() + "] listenPort=[" +_tcpAddress.getPort()+ "] remoteHost=[" + + remAddr.getHost() + "] remPort=[" + remAddr.getPort() + "]"); + con.closeConnection(); + return false; + } + if (_log.shouldLog(Log.INFO)) _log.info("Connection established with " + ident + " after " + (afterEstablish-start) + "ms"); if (target != null) { if (!target.getIdentity().equals(ident)) { - _context.statManager().updateFrequency("tcp.acceptFailureFrequency"); - if (_log.shouldLog(Log.ERROR)) - _log.error("Target changed identities!!! was " + target.getIdentity().getHash().toBase64() + ", now is " + ident.getHash().toBase64() + "! DROPPING CONNECTION"); - con.closeConnection(); + //_context.statManager().updateFrequency("tcp.acceptFailureFrequency"); + if (_log.shouldLog(Log.WARN)) + _log.warn("Target changed identities! was " + target.getIdentity().getHash().toBase64() + ", now is " + ident.getHash().toBase64() + "!"); // remove the old ref, since they likely just created a new identity _context.netDb().fail(target.getIdentity().getHash()); _context.shitlist().shitlistRouter(target.getIdentity().getHash(), "Peer changed identities"); - return false; + //return false; } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Target is the same as who we connected with"); @@ -459,14 +538,34 @@ public class TCPTransport extends TransportImpl { } if (ident != null) { Set toClose = new HashSet(4); - List toAdd = new LinkedList(); + List toAdd = new ArrayList(1); + List cons = null; synchronized (_connections) { if (!_connections.containsKey(ident)) _connections.put(ident, new ArrayList(2)); - List cons = (List)_connections.get(ident); + cons = (List)_connections.get(ident); if (cons.size() > 0) { - if (_log.shouldLog(Log.WARN)) + if (_log.shouldLog(Log.WARN)) { _log.warn("Attempted to open additional connections with " + ident.getHash() + ": closing older connections", new Exception("multiple cons")); + + StringBuffer buf = new StringBuffer(128); + if (remAddr == null) + remAddr = con.getRemoteAddress(); + buf.append("Connection address: [").append(remAddr.toString()).append(']'); + synchronized (_connectionAddresses) { + if (_connectionAddresses.containsKey(remAddr)) { + buf.append(" NOT KNOWN in: "); + } else { + buf.append(" KNOWN IN: "); + } + for (Iterator iter = _connectionAddresses.keySet().iterator(); iter.hasNext(); ) { + TCPAddress curAddr = (TCPAddress)iter.next(); + buf.append('[').append(curAddr.toString()).append("] "); + } + } + _log.warn(buf.toString()); + } + while (cons.size() > 0) { TCPConnection oldCon = (TCPConnection)cons.remove(0); toAdd.addAll(oldCon.getPendingMessages()); @@ -487,6 +586,10 @@ public class TCPTransport extends TransportImpl { } } + synchronized (_connectionAddresses) { + _connectionAddresses.put(con.getRemoteAddress(), cons); + } + if (toAdd.size() > 0) { for (Iterator iter = toAdd.iterator(); iter.hasNext(); ) { OutNetMessage msg = (OutNetMessage)iter.next(); @@ -616,7 +719,9 @@ public class TCPTransport extends TransportImpl { refetchedCon = _context.clock().now(); if (con == null) { if (_log.shouldLog(Log.ERROR)) - _log.error("Connection established but we can't find the connection? wtf! peer = " + pending.getPeer()); + _log.error("Connection established to " + pending.getPeer().toBase64() + " but they aren't who we wanted"); + _context.shitlist().shitlistRouter(pending.getPeer(), "Old address of a new peer"); + failPending(pending); } else { _context.shitlist().unshitlistRouter(pending.getPeer()); sendPending(con, pending); @@ -690,7 +795,7 @@ public class TCPTransport extends TransportImpl { _log.debug("Adding a pending to existing " + target.toBase64()); } int level = Log.INFO; - if (msgs.getMessageCount() > 1) + if (msgs.getMessageCount() > 2) level = Log.WARN; if (_log.shouldLog(level)) _log.log(level, "Add message to " + target.toBase64() + ", making a total of " + msgs.getMessageCount() + " for them, with another " + (_msgs.size() -1) + " peers pending establishment"); @@ -754,12 +859,20 @@ public class TCPTransport extends TransportImpl { _log.info("Connection established, now queueing up " + pending.getMessageCount() + " messages to be sent"); synchronized (_msgs) { _msgs.remove(pending.getPeer()); - + } + if (pending.getPeer().equals(con.getRemoteRouterIdentity().calculateHash())) { OutNetMessage msg = null; while ( (msg = pending.getNextMessage()) != null) { msg.timestamp("TCPTransport.sendPending to con.addMessage"); con.addMessage(msg); } + } else { + // we connected to someone we didn't try to connect to... + OutNetMessage msg = null; + while ( (msg = pending.getNextMessage()) != null) { + msg.timestamp("TCPTransport.sendPending connected to a different peer"); + afterSend(msg, false, false); + } } } @@ -790,7 +903,7 @@ public class TCPTransport extends TransportImpl { private ConnEstablisher _establisher; public PendingMessages(RouterInfo peer) { - _messages = new LinkedList(); + _messages = new ArrayList(2); _peerInfo = peer; _peer = peer.getIdentity().getHash(); _establisher = null;