2004-11-27 jrandom

* Some cleanup and bugfixes for the IP address detection code where we
      only consider connections that have actually sent and received messages
      recently as active, rather than the mere presence of a TCP socket as
      activity.
This commit is contained in:
jrandom
2004-11-27 21:02:06 +00:00
committed by zzz
parent 35e94a7f65
commit 72be9b5f04
7 changed files with 65 additions and 11 deletions

View File

@ -325,8 +325,8 @@ public class Connection {
_occurredEventCount++;
} else {
_occurredTime = now;
if (_occurredEventCount > 5) {
_log.log(Log.CRIT, "More than 5 events (" + _occurredEventCount + ") in a second on "
if (_occurredEventCount > 10) {
_log.log(Log.CRIT, "More than 10 events (" + _occurredEventCount + ") in a second on "
+ toString() + ": scheduler = " + sched);
}
_occurredEventCount = 0;

View File

@ -1,4 +1,10 @@
$Id: history.txt,v 1.84 2004/11/26 22:54:18 jrandom Exp $
$Id: history.txt,v 1.85 2004/11/27 00:17:06 jrandom Exp $
2004-11-27 jrandom
* Some cleanup and bugfixes for the IP address detection code where we
only consider connections that have actually sent and received messages
recently as active, rather than the mere presence of a TCP socket as
activity.
2004-11-27 jrandom
* Removed the I2PTunnel inactivity timeout thread, since the new streaming

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.89 $ $Date: 2004/11/26 22:54:17 $";
public final static String ID = "$Revision: 1.90 $ $Date: 2004/11/27 00:17:06 $";
public final static String VERSION = "0.4.2";
public final static long BUILD = 2;
public final static long BUILD = 3;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -30,6 +30,7 @@ public class MessageHandler implements I2NPMessageReader.I2NPMessageEventListene
}
public void messageReceived(I2NPMessageReader reader, I2NPMessage message, long msToRead, int size) {
_con.messageReceived();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Just received message " + message.getUniqueId() + " from "
+ _identHash.toBase64().substring(0,6)

View File

@ -40,6 +40,8 @@ public class TCPConnection {
private RateStat _sendRate;
private long _started;
private boolean _closed;
private long _lastRead;
private long _lastWrite;
public TCPConnection(RouterContext ctx) {
_context = ctx;
@ -53,6 +55,8 @@ public class TCPConnection {
_transport = null;
_started = -1;
_closed = false;
_lastRead = 0;
_lastWrite = 0;
_runner = new ConnectionRunner(_context, this);
_context.statManager().createRateStat("tcp.probabalisticDropQueueSize", "How many bytes were queued to be sent when a message as dropped probabalistically?", "TCP", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l } );
_context.statManager().createRateStat("tcp.queueSize", "How many bytes were queued on a connection?", "TCP", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l } );
@ -359,6 +363,19 @@ public class TCPConnection {
boolean getIsClosed() { return _closed; }
RouterContext getRouterContext() { return _context; }
boolean getIsActive() {
if ( (_lastRead <= 0) || (_lastWrite <= 0) ) return false;
long recent = (_lastRead > _lastWrite ? _lastRead : _lastWrite);
long howLongAgo = _context.clock().now() - recent;
if (howLongAgo < 1*60*1000)
return true;
else
return false;
}
void messageReceived() {
_lastRead = _context.clock().now();
}
/**
* The message was sent.
*
@ -370,5 +387,7 @@ public class TCPConnection {
_transport.afterSend(msg, ok, true, time);
if (ok)
_sendRate.addData(msg.getMessageSize(), msg.getLifetime());
if (ok)
_lastWrite = _context.clock().now();
}
}

View File

@ -137,7 +137,7 @@ class TCPListener {
public void run() {
if (_log.shouldLog(Log.INFO))
_log.info("Beginning TCP listener");
_log.info("Beginning TCP listener on " + _myAddress);
int curDelay = 0;
while (_isRunning) {

View File

@ -349,6 +349,8 @@ public class TCPTransport extends TransportImpl {
if (_myAddress != null) {
if (addr.getAddress().equals(_myAddress.getAddress())) {
// ignore, since there is no change
if (_log.shouldLog(Log.INFO))
_log.info("Not updating our local address, as it hasnt changed from " + address);
return;
}
}
@ -363,6 +365,8 @@ public class TCPTransport extends TransportImpl {
} else {
// either we have explicitly specified our IP address, or
// we are already connected to some people.
if (_log.shouldLog(Log.DEBUG))
_log.debug("Not allowing address update");
}
}
}
@ -505,11 +509,22 @@ public class TCPTransport extends TransportImpl {
*
*/
private boolean allowAddressUpdate() {
boolean addressSpecified = (null != _context.getProperty(LISTEN_ADDRESS));
if (addressSpecified)
return false;
int connectedPeers = countActivePeers();
return (connectedPeers == 0);
boolean addressSpecified = (null != _context.getProperty(LISTEN_ADDRESS));
if (addressSpecified) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Not allowing address update, sicne we have one specified (#cons=" + connectedPeers + ")");
return false;
}
if (connectedPeers <= 0) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allowing address update, since the # of connected peers is " + connectedPeers);
return true;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Not allowing address update, since the # of connected peers is " + connectedPeers);
return false;
}
}
/**
@ -544,9 +559,22 @@ public class TCPTransport extends TransportImpl {
*
*/
public int countActivePeers() {
int numActive = 0;
int numInactive = 0;
synchronized (_connectionLock) {
return _connectionsByIdent.size();
if (_connectionsByIdent.size() <= 0) return 0;
for (Iterator iter = _connectionsByIdent.values().iterator(); iter.hasNext(); ) {
TCPConnection con = (TCPConnection)iter.next();
if (con.getIsActive())
numActive++;
else
numInactive++;
}
}
if ( (numInactive > 0) && (_log.shouldLog(Log.DEBUG)) )
_log.debug("Inactive peers: " + numInactive + " active: " + numActive);
return numActive;
}
/**