2006-04-11 jrandom

* Throttling improvements on SSU - throttle all transmissions to a peer
      when we are retransmitting, not just retransmissions.  Also, if
      we're already retransmitting to a peer, probabalistically tail drop new
      messages targetting that peer, based on the estimated wait time before
      transmission.
    * Fixed the rounding error in the inbound tunnel drop probability.
This commit is contained in:
jrandom
2006-04-11 13:39:06 +00:00
committed by zzz
parent b4fc6ca31b
commit 83bef43fd5
5 changed files with 64 additions and 13 deletions

View File

@ -1,4 +1,12 @@
$Id: history.txt,v 1.450 2006/04/08 20:14:09 jrandom Exp $
$Id: history.txt,v 1.451 2006/04/10 00:37:29 jrandom Exp $
2006-04-11 jrandom
* Throttling improvements on SSU - throttle all transmissions to a peer
when we are retransmitting, not just retransmissions. Also, if
we're already retransmitting to a peer, probabalistically tail drop new
messages targetting that peer, based on the estimated wait time before
transmission.
* Fixed the rounding error in the inbound tunnel drop probability.
2006-04-10 jrandom
* Include a combined send/receive graph (good idea cervantes!)

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.390 $ $Date: 2006/04/08 20:14:10 $";
public final static String ID = "$Revision: 1.391 $ $Date: 2006/04/10 00:37:31 $";
public final static String VERSION = "0.6.1.14";
public final static long BUILD = 5;
public final static long BUILD = 6;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -203,6 +203,8 @@ public abstract class TransportImpl implements Transport {
+ msg.getMessageType() + " message with selector " + selector, new Exception("fail cause"));
if (msg.getOnFailedSendJob() != null)
_context.jobQueue().addJob(msg.getOnFailedSendJob());
if (msg.getOnFailedReplyJob() != null)
_context.jobQueue().addJob(msg.getOnFailedReplyJob());
if (selector != null)
_context.messageRegistry().unregisterPending(msg);
log = true;

View File

@ -10,19 +10,19 @@ import java.util.Map;
import java.net.InetAddress;
import java.net.UnknownHostException;
import net.i2p.I2PAppContext;
import net.i2p.data.Hash;
import net.i2p.data.SessionKey;
import net.i2p.util.Log;
import net.i2p.router.RouterContext;
import net.i2p.router.OutNetMessage;
import net.i2p.router.Job;
/**
* Contain all of the state about a UDP connection to a peer.
*
*/
public class PeerState {
private I2PAppContext _context;
private RouterContext _context;
private Log _log;
/**
* The peer are we talking to. This should be set as soon as this
@ -216,7 +216,7 @@ public class PeerState {
/** override the default MTU */
private static final String PROP_DEFAULT_MTU = "i2np.udp.mtu";
public PeerState(I2PAppContext ctx, UDPTransport transport) {
public PeerState(RouterContext ctx, UDPTransport transport) {
_context = ctx;
_log = ctx.logManager().getLog(PeerState.class);
_transport = transport;
@ -278,6 +278,8 @@ public class PeerState {
_context.statManager().createRateStat("udp.rejectConcurrentActive", "How many messages are currently being sent to the peer when we reject it (period is how many concurrent packets we allow)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.allowConcurrentActive", "How many messages are currently being sent to the peer when we accept it (period is how many concurrent packets we allow)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.rejectConcurrentSequence", "How many consecutive concurrency rejections have we had when we stop rejecting (period is how many concurrent packets we are on)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.queueDropSize", "How many messages were queued up when it was considered full, causing a tail drop?", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.queueAllowTotalLifetime", "When a peer is retransmitting and we probabalistically allow a new message, what is the sum of the pending message lifetimes? (period is the new message's lifetime)?", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
}
private int getDefaultMTU() {
@ -874,7 +876,7 @@ public class PeerState {
double retransPct = 0;
if (_packetsTransmitted > 10) {
retransPct = (double)_packetsRetransmitted/(double)_packetsTransmitted;
boolean wantLarge = retransPct < .50d; // heuristic to allow fairly lossy links to use large MTUs
boolean wantLarge = retransPct < .30d; // heuristic to allow fairly lossy links to use large MTUs
if (wantLarge && _mtu != LARGE_MTU) {
if (_context.random().nextLong(_mtuDecreases) <= 0) {
_mtu = LARGE_MTU;
@ -997,7 +999,6 @@ public class PeerState {
}
public RemoteHostId getRemoteHostId() { return _remoteHostId; }
public int add(OutboundMessageState state) {
if (_dead) {
@ -1009,10 +1010,50 @@ public class PeerState {
_log.debug("Adding to " + _remotePeer.toBase64() + ": " + state.getMessageId());
List msgs = _outboundMessages;
if (msgs == null) return 0;
int rv = 0;
boolean fail = false;
synchronized (msgs) {
msgs.add(state);
return msgs.size();
if (_retransmitter != null) {
long lifetime = _retransmitter.getLifetime();
long totalLifetime = lifetime;
for (int i = 1; i < msgs.size(); i++) { // skip the first, as thats the retransmitter
OutboundMessageState cur = (OutboundMessageState)msgs.get(i);
totalLifetime += cur.getLifetime();
}
long remaining = -1;
OutNetMessage omsg = state.getMessage();
if (omsg != null)
remaining = omsg.getExpiration() - _context.clock().now();
else
remaining = 10*1000 - state.getLifetime();
if (remaining <= 0)
remaining = 1; // total lifetime will exceed it anyway, guaranteeing failure
float pDrop = totalLifetime / (float)remaining;
pDrop = pDrop * pDrop * pDrop;
if (pDrop >= _context.random().nextFloat()) {
if (_log.shouldLog(Log.WARN))
_log.warn("Proactively tail dropping for " + _remotePeer.toBase64() + " (messages=" + msgs.size()
+ " headLifetime=" + lifetime + " totalLifetime=" + totalLifetime + " curLifetime=" + state.getLifetime()
+ " remaining=" + remaining + " pDrop=" + pDrop + ")");
_context.statManager().addRateData("udp.queueDropSize", msgs.size(), totalLifetime);
fail = true;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Probabalistically allowing for " + _remotePeer.toBase64() + " (messages=" + msgs.size()
+ " headLifetime=" + lifetime + " totalLifetime=" + totalLifetime + " curLifetime=" + state.getLifetime()
+ " remaining=" + remaining + " pDrop=" + pDrop + ")");
_context.statManager().addRateData("udp.queueAllowTotalLifetime", totalLifetime, lifetime);
msgs.add(state);
}
} else {
msgs.add(state);
}
rv = msgs.size();
}
if (fail)
_transport.failed(state, false);
return rv;
}
/** drop all outbound messages */
public void dropOutbound() {
@ -1202,7 +1243,7 @@ public class PeerState {
* mind bw/cwin throttle, etc)
*
*/
private static final boolean THROTTLE_INITIAL_SEND = false;
private static final boolean THROTTLE_INITIAL_SEND = true;
private static final int SSU_HEADER_SIZE = 46;
static final int UDP_HEADER_SIZE = 8;

View File

@ -603,8 +603,8 @@ class BuildHandler {
_context.statManager().addRateData("tunnel.dropLoadBacklog", _inboundBuildMessages.size(), _inboundBuildMessages.size());
} else {
int queueTime = estimateQueueTime(_inboundBuildMessages.size());
float pDrop = queueTime/(BuildRequestor.REQUEST_TIMEOUT/2);
pDrop = pDrop * pDrop * pDrop;
float pDrop = queueTime/((float)BuildRequestor.REQUEST_TIMEOUT/2);
pDrop = pDrop * pDrop;
float f = _context.random().nextFloat();
if (pDrop > f) {
_context.statManager().addRateData("tunnel.dropLoadProactive", queueTime, _inboundBuildMessages.size());