2004-09-16 jrandom

* Refactor the TCP transport to deal with changing identities gracefully,
      and to prevent some wasted effort by keeping track of what host+port
      combinations we are connected to (rather than just the identities).  Also
      catch a few configuration errors earlier.
    * Removed no longer relevent methods from the Transport API that were
      exposing ideas that probably shouldn't be exposed.
    * Removed the 0.4.0.1 specific files from i2pupdate.zip (relating to script
      updates)
This commit is contained in:
jrandom
2004-09-16 23:55:12 +00:00
committed by zzz
parent 67064012c9
commit 05acf32f39
9 changed files with 219 additions and 47 deletions

View File

@ -217,24 +217,15 @@
</target> </target>
<target name="updater" depends="build"> <target name="updater" depends="build">
<delete dir="pkg-temp" /> <delete dir="pkg-temp" />
<copy file="build/heartbeat.jar" todir="pkg-temp/lib/" />
<copy file="build/i2p.jar" todir="pkg-temp/lib/" /> <copy file="build/i2p.jar" todir="pkg-temp/lib/" />
<copy file="build/i2ptunnel.jar" todir="pkg-temp/lib/" /> <copy file="build/i2ptunnel.jar" todir="pkg-temp/lib/" />
<copy file="build/mstreaming.jar" todir="pkg-temp/lib/" /> <copy file="build/mstreaming.jar" todir="pkg-temp/lib/" />
<copy file="build/router.jar" todir="pkg-temp/lib/" /> <copy file="build/router.jar" todir="pkg-temp/lib/" />
<copy file="build/routerconsole.jar" todir="pkg-temp/lib/" /> <copy file="build/routerconsole.jar" todir="pkg-temp/lib/" />
<copy file="build/sam.jar" todir="pkg-temp/lib/" />
<copy file="build/systray.jar" todir="pkg-temp/lib" />
<copy file="build/i2ptunnel.war" todir="pkg-temp/webapps/" /> <copy file="build/i2ptunnel.war" todir="pkg-temp/webapps/" />
<copy file="build/routerconsole.war" todir="pkg-temp/webapps/" /> <copy file="build/routerconsole.war" todir="pkg-temp/webapps/" />
<copy file="history.txt" todir="pkg-temp/" /> <copy file="history.txt" todir="pkg-temp/" />
<copy file="hosts.txt" todir="pkg-temp/" /> <copy file="hosts.txt" todir="pkg-temp/" />
<copy file="installer/resources/install_i2p_service_winnt.bat" todir="pkg-temp/" />
<copy file="installer/resources/uninstall_i2p_service_winnt.bat" todir="pkg-temp/" />
<copy file="installer/resources/i2prouter" todir="pkg-temp/" />
<copy file="installer/resources/i2prouter.bat" todir="pkg-temp/" />
<copy file="installer/resources/i2prouter_win9x.bat" todir="pkg-temp/" />
<copy file="installer/resources/wrapper.config" todir="pkg-temp/" />
<zip destfile="i2pupdate.zip" basedir="pkg-temp" /> <zip destfile="i2pupdate.zip" basedir="pkg-temp" />
</target> </target>
<taskdef name="izpack" classpath="${basedir}/installer/lib/izpack/standalone-compiler.jar" classname="com.izforge.izpack.ant.IzPackTask" /> <taskdef name="izpack" classpath="${basedir}/installer/lib/izpack/standalone-compiler.jar" classname="com.izforge.izpack.ant.IzPackTask" />

View File

@ -1,4 +1,14 @@
$Id: history.txt,v 1.16 2004/09/08 21:26:43 jrandom Exp $ $Id: history.txt,v 1.17 2004/09/12 22:08:17 jrandom Exp $
2004-09-16 jrandom
* Refactor the TCP transport to deal with changing identities gracefully,
and to prevent some wasted effort by keeping track of what host+port
combinations we are connected to (rather than just the identities). Also
catch a few configuration errors earlier.
* Removed no longer relevent methods from the Transport API that were
exposing ideas that probably shouldn't be exposed.
* Removed the 0.4.0.1 specific files from i2pupdate.zip (relating to script
updates)
2004-09-13 jrandom 2004-09-13 jrandom
* Update for the SDK reconnection to deal with overflow. * Update for the SDK reconnection to deal with overflow.

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
* *
*/ */
public class RouterVersion { public class RouterVersion {
public final static String ID = "$Revision: 1.29 $ $Date: 2004/09/08 21:26:43 $"; public final static String ID = "$Revision: 1.30 $ $Date: 2004/09/12 22:08:16 $";
public final static String VERSION = "0.4.0.1"; public final static String VERSION = "0.4.0.1";
public final static long BUILD = 1; public final static long BUILD = 2;
public static void main(String args[]) { public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION); System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID); System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -31,9 +31,7 @@ public interface Transport {
public void send(OutNetMessage msg); public void send(OutNetMessage msg);
public RouterAddress startListening(); public RouterAddress startListening();
public void stopListening(); public void stopListening();
public void rotateAddresses();
public Set getCurrentAddresses(); public Set getCurrentAddresses();
public void addAddressInfo(Properties infoForNewAddress);
public void setListener(TransportEventListener listener); public void setListener(TransportEventListener listener);
public String getStyle(); public String getStyle();

View File

@ -67,6 +67,7 @@ public class TransportBid {
*/ */
public Date getExpiration() { return _bidExpiration; } public Date getExpiration() { return _bidExpiration; }
public void setExpiration(Date expirationDate) { _bidExpiration = expirationDate; } public void setExpiration(Date expirationDate) { _bidExpiration = expirationDate; }
public void setExpiration(long expirationDate) { setExpiration(new Date(expirationDate)); }
/** /**
* Specifies the transport that offered this bid * Specifies the transport that offered this bid

View File

@ -15,11 +15,13 @@ import java.io.IOException;
import java.math.BigInteger; import java.math.BigInteger;
import java.net.Socket; import java.net.Socket;
import java.util.Date; import java.util.Date;
import java.util.Iterator;
import net.i2p.crypto.AESInputStream; import net.i2p.crypto.AESInputStream;
import net.i2p.crypto.AESOutputStream; import net.i2p.crypto.AESOutputStream;
import net.i2p.data.DataFormatException; import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper; import net.i2p.data.DataHelper;
import net.i2p.data.RouterAddress;
import net.i2p.data.RouterIdentity; import net.i2p.data.RouterIdentity;
import net.i2p.router.Router; import net.i2p.router.Router;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
@ -298,6 +300,18 @@ class RestrictiveTCPConnection extends TCPConnection {
_out = new AESOutputStream(_context, new BufferedOutputStream(new BandwidthLimitedOutputStream(_context, _out, _remoteIdentity), BUF_SIZE), _key, _iv); _out = new AESOutputStream(_context, new BufferedOutputStream(new BandwidthLimitedOutputStream(_context, _out, _remoteIdentity), BUF_SIZE), _key, _iv);
_socket.setSoTimeout(0); _socket.setSoTimeout(0);
success = _context.clock().now(); success = _context.clock().now();
for (Iterator iter = _remoteInfo.getAddresses().iterator(); iter.hasNext(); ) {
RouterAddress curAddr = (RouterAddress)iter.next();
if (TCPTransport.STYLE.equals(curAddr.getTransportStyle())) {
_remoteAddress = new TCPAddress(curAddr);
break;
}
}
if (_remoteAddress == null) {
throw new DataFormatException("wtf, no TCP addresses? we already verified!");
}
established(); established();
return _remoteIdentity; return _remoteIdentity;

View File

@ -9,6 +9,7 @@ package net.i2p.router.transport.tcp;
*/ */
import java.net.InetAddress; import java.net.InetAddress;
import java.net.UnknownHostException;
import net.i2p.data.DataHelper; import net.i2p.data.DataHelper;
import net.i2p.data.RouterAddress; import net.i2p.data.RouterAddress;
@ -28,10 +29,19 @@ public class TCPAddress {
public final static String PROP_HOST = "host"; public final static String PROP_HOST = "host";
public TCPAddress(String host, int port) { public TCPAddress(String host, int port) {
_host = host; try {
if (host != null) {
InetAddress iaddr = InetAddress.getByName(host);
_host = iaddr.getHostAddress();
_addr = iaddr;
}
_port = port; _port = port;
} catch (UnknownHostException uhe) {
_host = null;
_port = -1;
_addr = null; _addr = null;
} }
}
public TCPAddress() { public TCPAddress() {
_host = null; _host = null;
@ -40,13 +50,19 @@ public class TCPAddress {
} }
public TCPAddress(InetAddress addr, int port) { public TCPAddress(InetAddress addr, int port) {
_host = addr.getHostName(); if (addr != null)
_host = addr.getHostAddress();
_addr = addr; _addr = addr;
_port = port; _port = port;
} }
public TCPAddress(RouterAddress addr) { public TCPAddress(RouterAddress addr) {
if (addr == null) throw new IllegalArgumentException("Null router address"); if (addr == null) throw new IllegalArgumentException("Null router address");
_host = addr.getOptions().getProperty(PROP_HOST); String host = addr.getOptions().getProperty(PROP_HOST);
try {
InetAddress iaddr = InetAddress.getByName(host);
_host = iaddr.getHostAddress();
_addr = iaddr;
String port = addr.getOptions().getProperty(PROP_PORT); String port = addr.getOptions().getProperty(PROP_PORT);
if ( (port != null) && (port.trim().length() > 0) ) { if ( (port != null) && (port.trim().length() > 0) ) {
try { try {
@ -58,6 +74,10 @@ public class TCPAddress {
} else { } else {
_port = -1; _port = -1;
} }
} catch (UnknownHostException uhe) {
_host = null;
_port = -1;
}
} }
public String getHost() { return _host; } public String getHost() { return _host; }
@ -84,6 +104,8 @@ public class TCPAddress {
} }
} }
public String toString() { return _host + ":" + _port; }
public int hashCode() { public int hashCode() {
int rv = 0; int rv = 0;
rv += _port; rv += _port;
@ -96,13 +118,14 @@ public class TCPAddress {
public boolean equals(Object val) { public boolean equals(Object val) {
if ( (val != null) && (val instanceof TCPAddress) ) { if ( (val != null) && (val instanceof TCPAddress) ) {
TCPAddress addr = (TCPAddress)val; TCPAddress addr = (TCPAddress)val;
if (getAddress().getHostAddress() != null) if ( (_addr != null) && (_addr.getHostAddress() != null) ) {
return DataHelper.eq(getAddress().getHostAddress(), addr.getAddress().getHostAddress()) && return DataHelper.eq(getAddress().getHostAddress(), addr.getAddress().getHostAddress()) &&
(getPort() == addr.getPort()); (getPort() == addr.getPort());
else } else {
return DataHelper.eq(getHost(), addr.getHost()) && return DataHelper.eq(getHost(), addr.getHost()) &&
(getPort() == addr.getPort()); (getPort() == addr.getPort());
} }
}
return false; return false;
} }
} }

View File

@ -14,6 +14,7 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.math.BigInteger; import java.math.BigInteger;
import java.net.InetAddress;
import java.net.Socket; import java.net.Socket;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
@ -26,6 +27,7 @@ import net.i2p.data.ByteArray;
import net.i2p.data.DataFormatException; import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper; import net.i2p.data.DataHelper;
import net.i2p.data.Hash; import net.i2p.data.Hash;
import net.i2p.data.RouterAddress;
import net.i2p.data.RouterIdentity; import net.i2p.data.RouterIdentity;
import net.i2p.data.RouterInfo; import net.i2p.data.RouterInfo;
import net.i2p.data.SessionKey; import net.i2p.data.SessionKey;
@ -70,6 +72,7 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
private boolean _weInitiated; private boolean _weInitiated;
private long _created; private long _created;
protected RouterContext _context; protected RouterContext _context;
protected TCPAddress _remoteAddress;
public TCPConnection(RouterContext context, Socket s, boolean locallyInitiated) throws IOException { public TCPConnection(RouterContext context, Socket s, boolean locallyInitiated) throws IOException {
_context = context; _context = context;
@ -101,8 +104,13 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
// doesn't, so we've got to check & cache it here if we want to log it later. (kaffe et al are acting per // doesn't, so we've got to check & cache it here if we want to log it later. (kaffe et al are acting per
// spec, btw) // spec, btw)
try { try {
_remoteHost = s.getInetAddress() + ""; InetAddress addr = s.getInetAddress();
if (addr != null) {
_remoteHost = addr.getHostAddress();
}
_remotePort = s.getPort(); _remotePort = s.getPort();
if (locallyInitiated)
_remoteAddress = new TCPAddress(_remoteHost, _remotePort);
} catch (NullPointerException npe) { } catch (NullPointerException npe) {
throw new IOException("kaffe is being picky since the socket closed too fast..."); throw new IOException("kaffe is being picky since the socket closed too fast...");
} }
@ -110,6 +118,8 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
_log.info("Connected with peer: " + _remoteHost + ":" + _remotePort); _log.info("Connected with peer: " + _remoteHost + ":" + _remotePort);
} }
public TCPAddress getRemoteAddress() { return _remoteAddress; }
/** how long has this connection been around for, or -1 if it isn't established yet */ /** how long has this connection been around for, or -1 if it isn't established yet */
public long getLifetime() { public long getLifetime() {
if (_created > 0) if (_created > 0)
@ -215,6 +225,18 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
_in = new AESInputStream(_context, new BandwidthLimitedInputStream(_context, _in, _remoteIdentity), _key, _iv); _in = new AESInputStream(_context, new BandwidthLimitedInputStream(_context, _in, _remoteIdentity), _key, _iv);
_out = new AESOutputStream(_context, new BandwidthLimitedOutputStream(_context, _out, _remoteIdentity), _key, _iv); _out = new AESOutputStream(_context, new BandwidthLimitedOutputStream(_context, _out, _remoteIdentity), _key, _iv);
_socket.setSoTimeout(0); _socket.setSoTimeout(0);
for (Iterator iter = _remoteInfo.getAddresses().iterator(); iter.hasNext(); ) {
RouterAddress curAddr = (RouterAddress)iter.next();
if (TCPTransport.STYLE.equals(curAddr.getTransportStyle())) {
_remoteAddress = new TCPAddress(curAddr);
break;
}
}
if (_remoteAddress == null) {
throw new DataFormatException("wtf, peer " + _remoteIdentity.calculateHash().toBase64()
+ " unreachable? we already verified!");
}
established(); established();
return _remoteIdentity; return _remoteIdentity;
} }

View File

@ -42,12 +42,17 @@ public class TCPTransport extends TransportImpl {
private Log _log; private Log _log;
public final static String STYLE = "TCP"; public final static String STYLE = "TCP";
private List _listeners; private List _listeners;
private Map _connections; // routerIdentity --> List of TCPConnection /** RouterIdentity to List of TCPConnections */
private Map _connections;
/** TCPAddress (w/ IP not hostname) to List of TCPConnections */
private Map _connectionAddresses;
private String _listenHost; private String _listenHost;
private int _listenPort; private int _listenPort;
private RouterAddress _address; private RouterAddress _address;
private TCPAddress _tcpAddress;
private boolean _listenAddressIsValid; private boolean _listenAddressIsValid;
private Map _msgs; // H(ident) --> PendingMessages for unestablished connections /** H(ident) to PendingMessages for unestablished connections */
private Map _msgs;
private boolean _running; private boolean _running;
private int _numConnectionEstablishers; private int _numConnectionEstablishers;
@ -76,6 +81,7 @@ public class TCPTransport extends TransportImpl {
_listeners = new ArrayList(); _listeners = new ArrayList();
_connections = new HashMap(); _connections = new HashMap();
_connectionAddresses = new HashMap();
_msgs = new HashMap(); _msgs = new HashMap();
_address = address; _address = address;
if (address != null) { if (address != null) {
@ -86,6 +92,7 @@ public class TCPTransport extends TransportImpl {
} catch (NumberFormatException nfe) { } catch (NumberFormatException nfe) {
_log.error("Invalid port: " + portStr + " Address: \n" + address, nfe); _log.error("Invalid port: " + portStr + " Address: \n" + address, nfe);
} }
_tcpAddress = new TCPAddress(_listenHost, _listenPort);
} }
_listenAddressIsValid = false; _listenAddressIsValid = false;
try { try {
@ -189,6 +196,38 @@ public class TCPTransport extends TransportImpl {
RouterAddress addr = (RouterAddress)iter.next(); RouterAddress addr = (RouterAddress)iter.next();
startEstablish = _context.clock().now(); startEstablish = _context.clock().now();
if (getStyle().equals(addr.getTransportStyle())) { if (getStyle().equals(addr.getTransportStyle())) {
TCPAddress tcpAddr = new TCPAddress(addr);
synchronized (_connectionAddresses) {
if (_connectionAddresses.containsKey(tcpAddr)) {
if (_log.shouldLog(Log.WARN))
_log.warn("We already have a connection to another router at " + tcpAddr);
_context.shitlist().shitlistRouter(target.getIdentity().getHash(), "Duplicate TCP address (changed identities?)");
_context.netDb().fail(target.getIdentity().getHash());
return false;
}
}
if (tcpAddr.equals(_tcpAddress)) {
if (_log.shouldLog(Log.WARN))
_log.warn("Peer " + target.getIdentity().getHash().toBase64()
+ " has OUR address [" + tcpAddr + "]");
_context.profileManager().commErrorOccurred(target.getIdentity().getHash());
_context.shitlist().shitlistRouter(target.getIdentity().getHash(), "Points at us");
_context.netDb().fail(target.getIdentity().getHash());
return false;
}
if (!tcpAddr.isPubliclyRoutable() && false) {
if (_log.shouldLog(Log.WARN))
_log.warn("Peer " + target.getIdentity().getHash().toBase64()
+ " has an unroutable address [" + tcpAddr + "]");
_context.profileManager().commErrorOccurred(target.getIdentity().getHash());
_context.shitlist().shitlistRouter(target.getIdentity().getHash(), "Unroutable address");
_context.netDb().fail(target.getIdentity().getHash());
return false;
}
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Establishing a connection with address " + addr); _log.debug("Establishing a connection with address " + addr);
Socket s = createSocket(addr); Socket s = createSocket(addr);
@ -384,6 +423,7 @@ public class TCPTransport extends TransportImpl {
allCons.add(con); allCons.add(con);
} }
} }
_connectionAddresses.clear();
} }
for (Iterator iter = allCons.iterator(); iter.hasNext(); ) { for (Iterator iter = allCons.iterator(); iter.hasNext(); ) {
TCPConnection con = (TCPConnection)iter.next(); TCPConnection con = (TCPConnection)iter.next();
@ -420,6 +460,13 @@ public class TCPTransport extends TransportImpl {
_connections.remove(iter.next()); _connections.remove(iter.next());
} }
} }
TCPAddress address = con.getRemoteAddress();
if (address != null) {
synchronized (_connectionAddresses) {
_connectionAddresses.remove(address);
}
}
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info(buf.toString()); _log.info(buf.toString());
//if (con.getRemoteRouterIdentity() != null) //if (con.getRemoteRouterIdentity() != null)
@ -429,6 +476,30 @@ public class TCPTransport extends TransportImpl {
con.setTransport(this); con.setTransport(this);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Before establishing connection"); _log.debug("Before establishing connection");
TCPAddress remAddr = con.getRemoteAddress();
if (remAddr != null) {
synchronized (_connectionAddresses) {
if (_connectionAddresses.containsKey(remAddr)) {
if (_log.shouldLog(Log.WARN))
_log.warn("refusing connection from " + remAddr + " as it is a dup");
con.closeConnection();
return false;
}
}
if (_tcpAddress.equals(remAddr)) {
if (_log.shouldLog(Log.WARN))
_log.warn("refusing connection to ourselves...");
_context.shitlist().shitlistRouter(target.getIdentity().getHash(), "Our old address");
_context.netDb().fail(target.getIdentity().getHash());
con.closeConnection();
return false;
}
} else {
//if (_log.shouldLog(Log.WARN))
// _log.warn("Why do we not have a remoteAddress for " + con, new Exception("hrm"));
}
long start = _context.clock().now(); long start = _context.clock().now();
RouterIdentity ident = con.establishConnection(); RouterIdentity ident = con.establishConnection();
long afterEstablish = _context.clock().now(); long afterEstablish = _context.clock().now();
@ -440,18 +511,26 @@ public class TCPTransport extends TransportImpl {
return false; return false;
} }
if (ident.equals(_context.router().getRouterInfo().getIdentity())) {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping established connection with *cough* ourselves: listenHost=["
+ _tcpAddress.getHost() + "] listenPort=[" +_tcpAddress.getPort()+ "] remoteHost=["
+ remAddr.getHost() + "] remPort=[" + remAddr.getPort() + "]");
con.closeConnection();
return false;
}
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Connection established with " + ident + " after " + (afterEstablish-start) + "ms"); _log.info("Connection established with " + ident + " after " + (afterEstablish-start) + "ms");
if (target != null) { if (target != null) {
if (!target.getIdentity().equals(ident)) { if (!target.getIdentity().equals(ident)) {
_context.statManager().updateFrequency("tcp.acceptFailureFrequency"); //_context.statManager().updateFrequency("tcp.acceptFailureFrequency");
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.WARN))
_log.error("Target changed identities!!! was " + target.getIdentity().getHash().toBase64() + ", now is " + ident.getHash().toBase64() + "! DROPPING CONNECTION"); _log.warn("Target changed identities! was " + target.getIdentity().getHash().toBase64() + ", now is " + ident.getHash().toBase64() + "!");
con.closeConnection();
// remove the old ref, since they likely just created a new identity // remove the old ref, since they likely just created a new identity
_context.netDb().fail(target.getIdentity().getHash()); _context.netDb().fail(target.getIdentity().getHash());
_context.shitlist().shitlistRouter(target.getIdentity().getHash(), "Peer changed identities"); _context.shitlist().shitlistRouter(target.getIdentity().getHash(), "Peer changed identities");
return false; //return false;
} else { } else {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Target is the same as who we connected with"); _log.debug("Target is the same as who we connected with");
@ -459,14 +538,34 @@ public class TCPTransport extends TransportImpl {
} }
if (ident != null) { if (ident != null) {
Set toClose = new HashSet(4); Set toClose = new HashSet(4);
List toAdd = new LinkedList(); List toAdd = new ArrayList(1);
List cons = null;
synchronized (_connections) { synchronized (_connections) {
if (!_connections.containsKey(ident)) if (!_connections.containsKey(ident))
_connections.put(ident, new ArrayList(2)); _connections.put(ident, new ArrayList(2));
List cons = (List)_connections.get(ident); cons = (List)_connections.get(ident);
if (cons.size() > 0) { if (cons.size() > 0) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN)) {
_log.warn("Attempted to open additional connections with " + ident.getHash() + ": closing older connections", new Exception("multiple cons")); _log.warn("Attempted to open additional connections with " + ident.getHash() + ": closing older connections", new Exception("multiple cons"));
StringBuffer buf = new StringBuffer(128);
if (remAddr == null)
remAddr = con.getRemoteAddress();
buf.append("Connection address: [").append(remAddr.toString()).append(']');
synchronized (_connectionAddresses) {
if (_connectionAddresses.containsKey(remAddr)) {
buf.append(" NOT KNOWN in: ");
} else {
buf.append(" KNOWN IN: ");
}
for (Iterator iter = _connectionAddresses.keySet().iterator(); iter.hasNext(); ) {
TCPAddress curAddr = (TCPAddress)iter.next();
buf.append('[').append(curAddr.toString()).append("] ");
}
}
_log.warn(buf.toString());
}
while (cons.size() > 0) { while (cons.size() > 0) {
TCPConnection oldCon = (TCPConnection)cons.remove(0); TCPConnection oldCon = (TCPConnection)cons.remove(0);
toAdd.addAll(oldCon.getPendingMessages()); toAdd.addAll(oldCon.getPendingMessages());
@ -487,6 +586,10 @@ public class TCPTransport extends TransportImpl {
} }
} }
synchronized (_connectionAddresses) {
_connectionAddresses.put(con.getRemoteAddress(), cons);
}
if (toAdd.size() > 0) { if (toAdd.size() > 0) {
for (Iterator iter = toAdd.iterator(); iter.hasNext(); ) { for (Iterator iter = toAdd.iterator(); iter.hasNext(); ) {
OutNetMessage msg = (OutNetMessage)iter.next(); OutNetMessage msg = (OutNetMessage)iter.next();
@ -616,7 +719,9 @@ public class TCPTransport extends TransportImpl {
refetchedCon = _context.clock().now(); refetchedCon = _context.clock().now();
if (con == null) { if (con == null) {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error("Connection established but we can't find the connection? wtf! peer = " + pending.getPeer()); _log.error("Connection established to " + pending.getPeer().toBase64() + " but they aren't who we wanted");
_context.shitlist().shitlistRouter(pending.getPeer(), "Old address of a new peer");
failPending(pending);
} else { } else {
_context.shitlist().unshitlistRouter(pending.getPeer()); _context.shitlist().unshitlistRouter(pending.getPeer());
sendPending(con, pending); sendPending(con, pending);
@ -690,7 +795,7 @@ public class TCPTransport extends TransportImpl {
_log.debug("Adding a pending to existing " + target.toBase64()); _log.debug("Adding a pending to existing " + target.toBase64());
} }
int level = Log.INFO; int level = Log.INFO;
if (msgs.getMessageCount() > 1) if (msgs.getMessageCount() > 2)
level = Log.WARN; level = Log.WARN;
if (_log.shouldLog(level)) if (_log.shouldLog(level))
_log.log(level, "Add message to " + target.toBase64() + ", making a total of " + msgs.getMessageCount() + " for them, with another " + (_msgs.size() -1) + " peers pending establishment"); _log.log(level, "Add message to " + target.toBase64() + ", making a total of " + msgs.getMessageCount() + " for them, with another " + (_msgs.size() -1) + " peers pending establishment");
@ -754,12 +859,20 @@ public class TCPTransport extends TransportImpl {
_log.info("Connection established, now queueing up " + pending.getMessageCount() + " messages to be sent"); _log.info("Connection established, now queueing up " + pending.getMessageCount() + " messages to be sent");
synchronized (_msgs) { synchronized (_msgs) {
_msgs.remove(pending.getPeer()); _msgs.remove(pending.getPeer());
}
if (pending.getPeer().equals(con.getRemoteRouterIdentity().calculateHash())) {
OutNetMessage msg = null; OutNetMessage msg = null;
while ( (msg = pending.getNextMessage()) != null) { while ( (msg = pending.getNextMessage()) != null) {
msg.timestamp("TCPTransport.sendPending to con.addMessage"); msg.timestamp("TCPTransport.sendPending to con.addMessage");
con.addMessage(msg); con.addMessage(msg);
} }
} else {
// we connected to someone we didn't try to connect to...
OutNetMessage msg = null;
while ( (msg = pending.getNextMessage()) != null) {
msg.timestamp("TCPTransport.sendPending connected to a different peer");
afterSend(msg, false, false);
}
} }
} }
@ -790,7 +903,7 @@ public class TCPTransport extends TransportImpl {
private ConnEstablisher _establisher; private ConnEstablisher _establisher;
public PendingMessages(RouterInfo peer) { public PendingMessages(RouterInfo peer) {
_messages = new LinkedList(); _messages = new ArrayList(2);
_peerInfo = peer; _peerInfo = peer;
_peer = peer.getIdentity().getHash(); _peer = peer.getIdentity().getHash();
_establisher = null; _establisher = null;