2004-10-06 jrandom

* Implement an active queue management scheme on the TCP transports,
      dropping messages probabalistically as the queue fills up.  The
      estimated queue capacity is determined by the rate at which messages
      have been sent to the peer (averaged at 1, 5, and 60m periods).  As
      we exceed 1/2 of the estimated capacity, we drop messages throughout
      the queue probabalistically with regards to their size.  This is based
      on RFC 2309's RED, with the minimum threshold set to 1/2 the
      estimated connection capacity.  We may want to consider using a send
      rate and queue size measured across all connections, to deal with our
      own local bandwidth saturation, but we'll try the per-con metrics first.
This commit is contained in:
jrandom
2004-10-06 21:03:51 +00:00
committed by zzz
parent 39d4e5ea81
commit 38c422bbc0
4 changed files with 111 additions and 4 deletions

View File

@ -1,4 +1,16 @@
$Id: history.txt,v 1.35 2004/10/05 20:12:04 jrandom Exp $
$Id: history.txt,v 1.36 2004/10/06 08:23:38 jrandom Exp $
2004-10-06 jrandom
* Implement an active queue management scheme on the TCP transports,
dropping messages probabalistically as the queue fills up. The
estimated queue capacity is determined by the rate at which messages
have been sent to the peer (averaged at 1, 5, and 60m periods). As
we exceed 1/2 of the estimated capacity, we drop messages throughout
the queue probabalistically with regards to their size. This is based
on RFC 2309's RED, with the minimum threshold set to 1/2 the
estimated connection capacity. We may want to consider using a send
rate and queue size measured across all connections, to deal with our
own local bandwidth saturation, but we'll try the per-con metrics first.
2004-10-06 jrandom
* Enable explicit disabling of the systray entirely for windows machines

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.43 $ $Date: 2004/10/05 10:39:19 $";
public final static String ID = "$Revision: 1.44 $ $Date: 2004/10/05 20:12:03 $";
public final static String VERSION = "0.4.1.1";
public final static long BUILD = 9;
public final static long BUILD = 10;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -16,6 +16,8 @@ import net.i2p.data.RouterInfo;
import net.i2p.data.i2np.I2NPMessageReader;
import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
import net.i2p.stat.RateStat;
import net.i2p.stat.Rate;
import net.i2p.util.Log;
/**
@ -35,6 +37,7 @@ public class TCPConnection {
private TCPTransport _transport;
private ConnectionRunner _runner;
private I2NPMessageReader _reader;
private RateStat _sendRate;
private long _started;
private boolean _closed;
@ -51,6 +54,9 @@ public class TCPConnection {
_started = -1;
_closed = false;
_runner = new ConnectionRunner(_context, this);
_context.statManager().createRateStat("tcp.probabalisticDropQueueSize", "How many bytes were queued to be sent when a message as dropped probabalistically?", "TCP", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l } );
_context.statManager().createRateStat("tcp.queueSize", "How many bytes were queued on a connection?", "TCP", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l } );
_context.statManager().createRateStat("tcp.sendBps", "How fast are we sending data to a peer?", "TCP", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l } );
}
/** Who are we talking with (or null if not identified) */
@ -72,7 +78,11 @@ public class TCPConnection {
*
*/
public void runConnection() {
String name = "TCP Read [" + _ident.calculateHash().toBase64().substring(0,6) + "]";
String peer = _ident.calculateHash().toBase64().substring(0,6);
String name = "TCP Read [" + peer + "]";
_sendRate = new RateStat("tcp.sendRatePeer", "How many bytes are in the messages sent to " + peer, peer, new long[] { 60*1000, 5*60*1000, 60*60*1000 });
_reader = new I2NPMessageReader(_context, _in, new MessageHandler(_transport, this), name);
_reader.startReading();
_runner.startRunning();
@ -143,10 +153,87 @@ public class TCPConnection {
msg.timestamp("TCPConnection.addMessage");
synchronized (_pendingMessages) {
_pendingMessages.add(msg);
locked_throttle();
_pendingMessages.notifyAll();
}
}
/**
* Implement a probabalistic dropping of messages on the queue to the
* peer along the lines of RFC2309.
*
*/
private void locked_throttle() {
int bytesQueued = 0;
long earliestExpiration = -1;
for (int i = 0; i < _pendingMessages.size(); i++) {
OutNetMessage msg = (OutNetMessage)_pendingMessages.get(i);
bytesQueued += (int)msg.getMessageSize();
if ( (earliestExpiration < 0) || (msg.getExpiration() < earliestExpiration) )
earliestExpiration = msg.getExpiration();
}
if (bytesQueued > 0)
_context.statManager().addRateData("tcp.queueSize", bytesQueued, _pendingMessages.size());
long sendRate = getSendRate();
long bytesSendableUntilFirstExpire = sendRate * (earliestExpiration - _context.clock().now()) / 1000;
// try to keep the queue less than half full
long excessQueued = bytesQueued - (bytesSendableUntilFirstExpire/2);
if ( (excessQueued > 0) && (_pendingMessages.size() > 1) && (_transport != null) )
locked_probabalisticDrop(excessQueued);
}
/** how many Bps we are sending data to the peer (or 2KBps if we don't know) */
public long getSendRate() {
if (_sendRate == null) return 2*1024;
_sendRate.coallesceStats();
Rate r = _sendRate.getRate(60*1000);
if (r == null) {
return 2*1024;
} else if (r.getLastEventCount() <= 2) {
r = _sendRate.getRate(5*60*1000);
if (r.getLastEventCount() <= 2)
r = _sendRate.getRate(60*60*1000);
}
if (r.getLastEventCount() <= 2) {
return 2*1024;
} else {
long bps = (long)(r.getLastTotalValue() * 1000 / r.getLastTotalEventTime());
_context.statManager().addRateData("tcp.sendBps", bps, 0);
return bps;
}
}
/**
* Probabalistically drop messages in relation to their size vs how much
* we've exceeded our target queue usage.
*/
private void locked_probabalisticDrop(long excessBytesQueued) {
for (int i = 0; i < _pendingMessages.size() && excessBytesQueued > 0; i++) {
OutNetMessage msg = (OutNetMessage)_pendingMessages.get(i);
int p = getDropProbability(msg.getMessageSize(), excessBytesQueued);
if (_context.random().nextInt(100) > p) {
_pendingMessages.remove(i);
i--;
msg.timestamp("Probabalistically dropped due to queue size " + excessBytesQueued);
sent(msg, false, -1);
_context.statManager().addRateData("tcp.probabalisticDropQueueSize", excessBytesQueued, msg.getLifetime());
// since we've already dropped down this amount, lets reduce the
// number of additional messages dropped
excessBytesQueued -= msg.getMessageSize();
}
}
}
private int getDropProbability(long msgSize, long excessBytesQueued) {
if (msgSize > excessBytesQueued)
return 100;
return (int)(100.0*(msgSize/excessBytesQueued));
}
/**
* Blocking call to retrieve the next pending message. As a side effect,
* this fails messages on the queue that have expired, and in turn never
@ -244,5 +331,7 @@ public class TCPConnection {
*/
void sent(OutNetMessage msg, boolean ok, long time) {
_transport.afterSend(msg, ok, true, time);
if (ok)
_sendRate.addData(msg.getMessageSize(), msg.getLifetime());
}
}

View File

@ -698,6 +698,12 @@ public class TCPTransport extends TransportImpl {
buf.append("<li>");
buf.append(con.getRemoteRouterIdentity().getHash().toBase64().substring(0,6));
buf.append(": up for ").append(DataHelper.formatDuration(con.getLifetime()));
buf.append(" transferring at ");
long bps = con.getSendRate();
if (bps < 1024)
buf.append(bps).append("Bps");
else
buf.append((int)(bps/1024)).append("KBps");
buf.append("</li>\n");
}
buf.append("</ul>\n");