more tcp transport updates (getting closer to the old functionality)

* avoid bad peers
* shitlist appropriately
* include the bandwidth limiter
* add socket timeouts
* deal with *cough* closing connections
* javadocs
This commit is contained in:
jrandom
2004-09-26 18:11:39 +00:00
committed by zzz
parent 0f54ba59fb
commit 63355ecd5b
8 changed files with 74 additions and 21 deletions

View File

@ -260,14 +260,12 @@ public class TransportManager implements TransportEventListener {
buf.append(addr.toString()).append("\n\n");
}
buf.append("</pre>\n");
buf.append("<ul>\n");
for (Iterator iter = _transports.iterator(); iter.hasNext(); ) {
Transport t = (Transport)iter.next();
String str = t.renderStatusHTML();
if (str != null)
buf.append("<li>").append(str).append("</li>\n");
buf.append(str);
}
buf.append("</ul>\n");
out.write(buf.toString().getBytes());
}
}

View File

@ -1,5 +1,6 @@
package net.i2p.router.transport.tcp;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@ -7,6 +8,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.math.BigInteger;
import java.net.Socket;
import java.net.SocketException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@ -25,6 +27,8 @@ import net.i2p.data.RouterInfo;
import net.i2p.data.SessionKey;
import net.i2p.data.Signature;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.BandwidthLimitedInputStream;
import net.i2p.router.transport.BandwidthLimitedOutputStream;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
import net.i2p.util.NativeBigInteger;
@ -78,6 +82,8 @@ public class ConnectionBuilder {
/** If the connection hasn't been built in 10 seconds, give up */
public static final int CONNECTION_TIMEOUT = 10*1000;
public static final int WRITE_BUFFER_SIZE = 2*1024;
public ConnectionBuilder(RouterContext context, TCPTransport transport, RouterInfo info) {
_context = context;
_log = context.logManager().getLog(ConnectionBuilder.class);
@ -106,6 +112,8 @@ public class ConnectionBuilder {
createSocket();
if ( (_socket == null) || (_error != null) )
return null;
try { _socket.setSoTimeout(CONNECTION_TIMEOUT); } catch (SocketException se) {}
negotiateProtocol();
@ -123,7 +131,9 @@ public class ConnectionBuilder {
if (ok && (_error == null) ) {
establishComplete();
try { _socket.setSoTimeout(0); } catch (SocketException se) {}
TCPConnection con = new TCPConnection(_context);
con.setInputStream(_connectionIn);
con.setOutputStream(_connectionOut);
@ -209,8 +219,8 @@ public class ConnectionBuilder {
try {
// #bytesFollowing + versionOk + #bytesIP + IP + tagOk? + nonce + properties
int numBytes = (int)DataHelper.readLong(_rawIn, 2);
// 0xFF is a reserved value
if ( (numBytes <= 0) || (numBytes >= 0xFF) )
// 0xFFFF is a reserved value
if ( (numBytes <= 0) || (numBytes >= 0xFFFF) )
throw new IOException("Invalid number of bytes in response");
byte line[] = new byte[numBytes];
@ -620,9 +630,9 @@ public class ConnectionBuilder {
*
*/
private void establishComplete() {
// todo: add bw limiter
_connectionIn = _rawIn;
_connectionOut = _rawOut;
_connectionIn = new BandwidthLimitedInputStream(_context, _rawIn, _actualPeer.getIdentity());
OutputStream blos = new BandwidthLimitedOutputStream(_context, _rawOut, _actualPeer.getIdentity());
_connectionOut = new BufferedOutputStream(blos, WRITE_BUFFER_SIZE);
Hash peer = _actualPeer.getIdentity().getHash();
_context.netDb().store(peer, _actualPeer);
@ -658,8 +668,10 @@ public class ConnectionBuilder {
fail(error, null);
}
private void fail(String error, Exception e) {
if (_error == null) // only grab the first error
if (_error == null) {
// only grab the first error
_error = error;
}
if (_rawIn != null) try { _rawIn.close(); } catch (IOException ioe) {}
if (_rawOut != null) try { _rawOut.close(); } catch (IOException ioe) {}

View File

@ -1,5 +1,6 @@
package net.i2p.router.transport.tcp;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@ -7,6 +8,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketException;
import java.text.SimpleDateFormat;
@ -24,8 +26,10 @@ import net.i2p.data.Hash;
import net.i2p.data.SessionKey;
import net.i2p.data.Signature;
import net.i2p.data.RouterInfo;
import net.i2p.router.RouterContext;
import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.BandwidthLimitedInputStream;
import net.i2p.router.transport.BandwidthLimitedOutputStream;
import net.i2p.util.Log;
import net.i2p.util.NativeBigInteger;
@ -85,6 +89,7 @@ public class ConnectionHandler {
_error = null;
_agreedProtocol = -1;
InetAddress addr = _socket.getInetAddress();
try { _socket.setSoTimeout(TCPListener.HANDLE_TIMEOUT); } catch (SocketException se) {}
if (addr != null) {
_from = addr.getHostAddress();
}
@ -123,6 +128,8 @@ public class ConnectionHandler {
if (ok && (_error == null) ) {
establishComplete();
try { _socket.setSoTimeout(0); } catch (SocketException se) {}
if (_log.shouldLog(Log.INFO))
_log.info("Establishment ok... building the con");
@ -166,7 +173,7 @@ public class ConnectionHandler {
int numBytes = (int)DataHelper.readLong(_rawIn, 2);
if (numBytes <= 0)
throw new IOException("Invalid number of bytes in connection");
// 0xFF is a reserved value identifying the connection as a reachability test
// 0xFFFF is a reserved value identifying the connection as a reachability test
if (numBytes == 0xFFFF) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("ReadProtocol[Y]: test called, handle it");
@ -635,6 +642,8 @@ public class ConnectionHandler {
OutputStream out = s.getOutputStream();
InputStream in = s.getInputStream();
try { s.setSoTimeout(TCPListener.HANDLE_TIMEOUT); } catch (SocketException se) {}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Beginning verification of reachability");
@ -691,7 +700,7 @@ public class ConnectionHandler {
/**
* The peer contacting us is just testing us. Verify that we are reachable
* by following the protocol, then close the socket. This is called only
* after reading the initial 0xFF.
* after reading the initial 0xFFFF.
*
*/
private void handleTest() {
@ -721,7 +730,7 @@ public class ConnectionHandler {
if (_log.shouldLog(Log.DEBUG))
_log.debug("HandleTest: version=" + version + " opts=" +opts);
// send: 0xFF + versionOk + #bytesIP + IP + currentTime + properties
// send: 0xFFFF + versionOk + #bytesIP + IP + currentTime + properties
_rawOut.write(0xFF);
_rawOut.write(0xFF);
_rawOut.write(version);
@ -755,7 +764,6 @@ public class ConnectionHandler {
_actualPeer = null;
_testComplete = true;
}
// send: 0xFF + versionOk + #bytesIP + IP + currentTime + properties
}
/**
@ -764,9 +772,9 @@ public class ConnectionHandler {
*
*/
private void establishComplete() {
// todo: add bw limiter
_connectionIn = _rawIn;
_connectionOut = _rawOut;
_connectionIn = new BandwidthLimitedInputStream(_context, _rawIn, _actualPeer.getIdentity());
OutputStream blos = new BandwidthLimitedOutputStream(_context, _rawOut, _actualPeer.getIdentity());
_connectionOut = new BufferedOutputStream(blos, ConnectionBuilder.WRITE_BUFFER_SIZE);
Hash peer = _actualPeer.getIdentity().getHash();
_context.netDb().store(peer, _actualPeer);

View File

@ -58,7 +58,11 @@ public class TCPAddress {
}
public TCPAddress(RouterAddress addr) {
if (addr == null) throw new IllegalArgumentException("Null router address");
if (addr == null) {
_host = null;
_port = -1;
return;
}
String host = addr.getOptions().getProperty(PROP_HOST);
try {
InetAddress iaddr = InetAddress.getByName(host);

View File

@ -9,6 +9,7 @@ import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import net.i2p.data.DataHelper;
import net.i2p.data.RouterIdentity;
import net.i2p.data.RouterInfo;
import net.i2p.data.i2np.I2NPMessageReader;
@ -95,6 +96,11 @@ public class TCPConnection {
OutNetMessage msg = (OutNetMessage)msgs.get(0);
_transport.afterSend(msg, false, true, -1);
}
_context.profileManager().commErrorOccurred(_ident.getHash());
_transport.addConnectionErrorMessage("Connection closed with "
+ _ident.getHash().toBase64().substring(0,6)
+ " after " + DataHelper.formatDuration(getLifetime()));
_transport.connectionClosed(this);
}
/**

View File

@ -1,5 +1,6 @@
package net.i2p.router.transport.tcp;
import net.i2p.data.Hash;
import net.i2p.data.RouterInfo;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
@ -30,7 +31,10 @@ public class TCPConnectionEstablisher implements Runnable {
_transport.connectionEstablished(con);
} else {
_transport.addConnectionErrorMessage(cb.getError());
_context.shitlist().shitlistRouter(info.getIdentity().getHash(), "Unable to contact");
Hash peer = info.getIdentity().getHash();
_context.profileManager().commErrorOccurred(peer);
_context.shitlist().shitlistRouter(peer, "Unable to contact");
_context.netDb().fail(peer);
}
// this removes the _pending block on the address and

View File

@ -48,7 +48,7 @@ class TCPListener {
*/
private final static int MAX_FAIL_DELAY = 5*60*1000;
/** if we're not making progress in 10s, drop 'em */
private final static long HANDLE_TIMEOUT = 10*1000;
final static int HANDLE_TIMEOUT = 10*1000;
/** id generator for the connections */
private static volatile int __handlerId = 0;

View File

@ -126,6 +126,14 @@ public class TCPTransport extends TransportImpl {
}
public TransportBid bid(RouterInfo toAddress, long dataSize) {
RouterAddress addr = toAddress.getTargetAddress(STYLE);
if (addr == null)
return null;
TCPAddress tcpAddr = new TCPAddress(addr);
if ( (_myAddress != null) && (tcpAddr.equals(_myAddress)) )
return null; // dont talk to yourself
TransportBid bid = new TransportBid();
bid.setBandwidthBytes((int)dataSize);
bid.setExpiration(_context.clock().now() + 30*1000);
@ -136,6 +144,7 @@ public class TCPTransport extends TransportImpl {
if (!getIsConnected(toAddress.getIdentity()))
latency += 5000;
bid.setLatencyMs(latency);
return bid;
}
@ -247,6 +256,13 @@ public class TCPTransport extends TransportImpl {
_log.debug("Connection set to run");
}
void connectionClosed(TCPConnection con) {
synchronized (_connectionLock) {
_connectionsByIdent.remove(con.getRemoteRouterIdentity().getHash());
_connectionsByAddress.remove(con.getRemoteAddress().toString());
}
}
/**
* Blocking call from when a remote peer tells us what they think our
* IP address is. This may do absolutely nothing, or it may fire up a
@ -479,6 +495,11 @@ public class TCPTransport extends TransportImpl {
continue; // uh...
OutNetMessage msg = (OutNetMessage)msgs.get(0);
RouterAddress addr = msg.getTarget().getTargetAddress(STYLE);
if (addr == null) {
_log.error("Message target has no TCP addresses! " + msg.getTarget());
iter.remove();
continue;
}
TCPAddress tcpAddr = new TCPAddress(addr);
if (tcpAddr.getPort() <= 0)
continue; // invalid