flush the protocol flag explicitly

make the tcp connection handler nonblocking by adding another (very short lived) thread - this prevents a peer connecting to us that is very slow (or unconnectable) from forcing other cons to timeout
completely ripped out the fscking bandwidth limiter until i get it more reliable
gave threads more explicit names (for the sim)
logging
This commit is contained in:
jrandom
2004-06-25 18:14:12 +00:00
committed by zzz
parent a019399c3c
commit 57801202fd
5 changed files with 73 additions and 32 deletions

View File

@ -223,7 +223,7 @@ class RestrictiveTCPConnection extends TCPConnection {
SocketCreator creator = new SocketCreator(peer.getHost(), peer.getPort(), false);
I2PThread sockCreator = new I2PThread(creator);
sockCreator.setDaemon(true);
sockCreator.setName("PeerCallback");
sockCreator.setName("PeerCallback:" + _transport.getListenPort());
sockCreator.setPriority(I2PThread.MIN_PRIORITY);
sockCreator.start();
@ -294,8 +294,12 @@ class RestrictiveTCPConnection extends TCPConnection {
if (_log.shouldLog(Log.INFO))
_log.info("TCP connection " + _id + " established with " + _remoteIdentity.getHash().toBase64());
_in = new AESInputStream(_context, new BandwidthLimitedInputStream(_context, _in, _remoteIdentity), _key, _iv);
_out = new AESOutputStream(_context, new BufferedOutputStream(new BandwidthLimitedOutputStream(_context, _out, _remoteIdentity), BUF_SIZE), _key, _iv);
//_in = new AESInputStream(_context, new BandwidthLimitedInputStream(_context, _in, _remoteIdentity), _key, _iv);
//_out = new AESOutputStream(_context, new BufferedOutputStream(new BandwidthLimitedOutputStream(_context, _out, _remoteIdentity), BUF_SIZE), _key, _iv);
//_in = new BandwidthLimitedInputStream(_context, _in, _remoteIdentity);
//_out = new BufferedOutputStream(new BandwidthLimitedOutputStream(_context, _out, _remoteIdentity), BUF_SIZE);
_in = new AESInputStream(_context, _in, _key, _iv);
_out = new AESOutputStream(_context, _out, _key, _iv);
_socket.setSoTimeout(0);
success = _context.clock().now();
established();

View File

@ -1,6 +1,7 @@
package net.i2p.router.transport.tcp;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
@ -31,6 +32,8 @@ class SocketCreator implements Runnable {
/** the first byte sent and received must be 0x2A */
public final static int I2P_FLAG = 0x2A;
/** sent if we arent trying to talk */
private final static int NOT_I2P_FLAG = 0x2B;
public void run() {
if (_keepOpen) {
@ -45,7 +48,9 @@ class SocketCreator implements Runnable {
_socket = new Socket(_host, _port);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Socket created");
_socket.getOutputStream().write(I2P_FLAG);
OutputStream os = _socket.getOutputStream();
os.write(I2P_FLAG);
os.flush();
if (_log.shouldLog(Log.DEBUG))
_log.debug("I2P flag sent");
int val = _socket.getInputStream().read();
@ -86,6 +91,11 @@ class SocketCreator implements Runnable {
_socket = new Socket(_host, _port);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Socket created (but we're not sending the flag, since we're just testing them)");
OutputStream os = _socket.getOutputStream();
os.write(NOT_I2P_FLAG);
os.flush();
int val = _socket.getInputStream().read();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Value read: [" + val + "] == flag? [" + I2P_FLAG + "]");

View File

@ -244,7 +244,7 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
t.setName("Run Conn [" + _id + "]");
t.setDaemon(true);
t.start();
_reader = new I2NPMessageReader(_context, _in, this, "TCP Read [" + _id + "]");
_reader = new I2NPMessageReader(_context, _in, this, "TCP Read [" + _id + ":" + _transport.getListenPort() + "]");
_reader.startReading();
}
@ -351,7 +351,8 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
OutNetMessage msg = (OutNetMessage)iter.next();
msg.timestamp("TCPTransport.closeConnection caused fail");
if (_log.shouldLog(Log.WARN))
_log.warn("Connection closed while the message was sitting on the TCP Connection's queue! too slow by: "
_log.warn("Connection closed to " + _remoteIdentity.getHash().toBase64()
+ " while the message was sitting on the TCP Connection's queue! too slow by: "
+ (now-msg.getExpiration()) + "ms: " + msg);
_transport.afterSend(msg, false, false);
}

View File

@ -9,6 +9,7 @@ package net.i2p.router.transport.tcp;
*/
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
@ -124,26 +125,8 @@ class TCPListener {
if (_log.shouldLog(Log.INFO))
_log.info("Connection handled on " + _myAddress.getHost() + ":" + _myAddress.getPort() + " with " + s.getInetAddress().toString() + ":" + s.getPort());
TimedHandler h = new TimedHandler(s);
I2PThread t = new I2PThread(h);
t.setDaemon(true);
t.start();
synchronized (h) {
h.wait(HANDLE_TIMEOUT);
}
if (h.wasSuccessful()) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handle successful");
} else {
if (h.receivedIdentByte()) {
if (_log.shouldLog(Log.ERROR))
_log.error("Unable to handle in the time allotted");
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Peer didn't send the ident byte, so either they were testing us, or portscanning");
}
try { s.close(); } catch (IOException ioe) {}
}
handle(s);
} catch (SocketException se) {
_log.error("Error handling a connection - closed?", se);
return;
@ -154,8 +137,48 @@ class TCPListener {
}
}
private void handle(Socket s) {
I2PThread t = new I2PThread(new BlockingHandler(s));
t.setDaemon(true);
t.setName("BlockingHandler:"+_transport.getListenPort());
t.start();
}
private class BlockingHandler implements Runnable {
private Socket _handledSocket;
public BlockingHandler(Socket socket) {
_handledSocket = socket;
}
public void run() {
TimedHandler h = new TimedHandler(_handledSocket);
I2PThread t = new I2PThread(h);
t.setDaemon(true);
t.start();
try {
synchronized (h) {
h.wait(HANDLE_TIMEOUT);
}
} catch (InterruptedException ie) {
// got through early...
}
if (h.wasSuccessful()) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handle successful");
} else {
if (h.receivedIdentByte()) {
if (_log.shouldLog(Log.ERROR))
_log.error("Unable to handle in the time allotted");
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Peer didn't send the ident byte, so either they were testing us, or portscanning");
}
try { _handledSocket.close(); } catch (IOException ioe) {}
}
}
}
/** if we're not making progress in 30s, drop 'em */
private final static long HANDLE_TIMEOUT = 30*1000;
private final static long HANDLE_TIMEOUT = 10*1000;
private static volatile int __handlerId = 0;
private class TimedHandler implements Runnable {
@ -170,9 +193,11 @@ class TCPListener {
_receivedIdentByte = false;
}
public void run() {
Thread.currentThread().setName("TimedHandler"+_handlerId);
Thread.currentThread().setName("TimedHandler"+_handlerId + ':' + _transport.getListenPort());
try {
_socket.getOutputStream().write(SocketCreator.I2P_FLAG);
OutputStream os = _socket.getOutputStream();
os.write(SocketCreator.I2P_FLAG);
os.flush();
if (_log.shouldLog(Log.DEBUG))
_log.debug("listener: I2P flag sent");
int val = _socket.getInputStream().read();

View File

@ -101,6 +101,7 @@ public class TCPTransport extends TransportImpl {
boolean getListenAddressIsValid() { return _listenAddressIsValid; }
SigningPrivateKey getMySigningKey() { return _context.keyManager().getSigningPrivateKey(); }
int getListenPort() { return _listenPort; }
/** fetch all of our TCP listening addresses */
TCPAddress[] getMyAddresses() {
@ -235,7 +236,7 @@ public class TCPTransport extends TransportImpl {
SocketCreator creator = new SocketCreator(host, port);
I2PThread sockCreator = new I2PThread(creator);
sockCreator.setDaemon(true);
sockCreator.setName("SocketCreator");
sockCreator.setName("SocketCreator_:" + _listenPort);
sockCreator.setPriority(I2PThread.MIN_PRIORITY);
sockCreator.start();
@ -432,7 +433,7 @@ public class TCPTransport extends TransportImpl {
}
if (_log.shouldLog(Log.INFO))
_log.info("Connection established with " + ident);
_log.info("Connection established with " + ident + " after " + (afterEstablish-start) + "ms");
if (target != null) {
if (!target.getIdentity().equals(ident)) {
_context.statManager().updateFrequency("tcp.acceptFailureFrequency");
@ -574,7 +575,7 @@ public class TCPTransport extends TransportImpl {
public int getId() { return _id; }
public void run() {
Thread.currentThread().setName("Conn Establisher" + _id);
Thread.currentThread().setName("Conn Establisher" + _id + ':' + _listenPort);
while (_running) {
try {