From 83bef43fd5b93c99e0dc7b2eec6c4c2b83520b5f Mon Sep 17 00:00:00 2001 From: jrandom Date: Tue, 11 Apr 2006 13:39:06 +0000 Subject: [PATCH] 2006-04-11 jrandom * Throttling improvements on SSU - throttle all transmissions to a peer when we are retransmitting, not just retransmissions. Also, if we're already retransmitting to a peer, probabalistically tail drop new messages targetting that peer, based on the estimated wait time before transmission. * Fixed the rounding error in the inbound tunnel drop probability. --- history.txt | 10 +++- .../src/net/i2p/router/RouterVersion.java | 4 +- .../i2p/router/transport/TransportImpl.java | 2 + .../i2p/router/transport/udp/PeerState.java | 57 ++++++++++++++++--- .../i2p/router/tunnel/pool/BuildHandler.java | 4 +- 5 files changed, 64 insertions(+), 13 deletions(-) diff --git a/history.txt b/history.txt index 1e890b2aad..0dfbcc361c 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,12 @@ -$Id: history.txt,v 1.450 2006/04/08 20:14:09 jrandom Exp $ +$Id: history.txt,v 1.451 2006/04/10 00:37:29 jrandom Exp $ + +2006-04-11 jrandom + * Throttling improvements on SSU - throttle all transmissions to a peer + when we are retransmitting, not just retransmissions. Also, if + we're already retransmitting to a peer, probabalistically tail drop new + messages targetting that peer, based on the estimated wait time before + transmission. + * Fixed the rounding error in the inbound tunnel drop probability. 2006-04-10 jrandom * Include a combined send/receive graph (good idea cervantes!) diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 235bed4171..950c765cf7 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.390 $ $Date: 2006/04/08 20:14:10 $"; + public final static String ID = "$Revision: 1.391 $ $Date: 2006/04/10 00:37:31 $"; public final static String VERSION = "0.6.1.14"; - public final static long BUILD = 5; + public final static long BUILD = 6; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/transport/TransportImpl.java b/router/java/src/net/i2p/router/transport/TransportImpl.java index 1717695bd0..bff923d786 100644 --- a/router/java/src/net/i2p/router/transport/TransportImpl.java +++ b/router/java/src/net/i2p/router/transport/TransportImpl.java @@ -203,6 +203,8 @@ public abstract class TransportImpl implements Transport { + msg.getMessageType() + " message with selector " + selector, new Exception("fail cause")); if (msg.getOnFailedSendJob() != null) _context.jobQueue().addJob(msg.getOnFailedSendJob()); + if (msg.getOnFailedReplyJob() != null) + _context.jobQueue().addJob(msg.getOnFailedReplyJob()); if (selector != null) _context.messageRegistry().unregisterPending(msg); log = true; diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index b4c993197c..41665befe0 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -10,19 +10,19 @@ import java.util.Map; import java.net.InetAddress; import java.net.UnknownHostException; -import net.i2p.I2PAppContext; import net.i2p.data.Hash; import net.i2p.data.SessionKey; import net.i2p.util.Log; import net.i2p.router.RouterContext; import net.i2p.router.OutNetMessage; +import net.i2p.router.Job; /** * Contain all of the state about a UDP connection to a peer. * */ public class PeerState { - private I2PAppContext _context; + private RouterContext _context; private Log _log; /** * The peer are we talking to. This should be set as soon as this @@ -216,7 +216,7 @@ public class PeerState { /** override the default MTU */ private static final String PROP_DEFAULT_MTU = "i2np.udp.mtu"; - public PeerState(I2PAppContext ctx, UDPTransport transport) { + public PeerState(RouterContext ctx, UDPTransport transport) { _context = ctx; _log = ctx.logManager().getLog(PeerState.class); _transport = transport; @@ -278,6 +278,8 @@ public class PeerState { _context.statManager().createRateStat("udp.rejectConcurrentActive", "How many messages are currently being sent to the peer when we reject it (period is how many concurrent packets we allow)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.allowConcurrentActive", "How many messages are currently being sent to the peer when we accept it (period is how many concurrent packets we allow)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.rejectConcurrentSequence", "How many consecutive concurrency rejections have we had when we stop rejecting (period is how many concurrent packets we are on)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("udp.queueDropSize", "How many messages were queued up when it was considered full, causing a tail drop?", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("udp.queueAllowTotalLifetime", "When a peer is retransmitting and we probabalistically allow a new message, what is the sum of the pending message lifetimes? (period is the new message's lifetime)?", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); } private int getDefaultMTU() { @@ -874,7 +876,7 @@ public class PeerState { double retransPct = 0; if (_packetsTransmitted > 10) { retransPct = (double)_packetsRetransmitted/(double)_packetsTransmitted; - boolean wantLarge = retransPct < .50d; // heuristic to allow fairly lossy links to use large MTUs + boolean wantLarge = retransPct < .30d; // heuristic to allow fairly lossy links to use large MTUs if (wantLarge && _mtu != LARGE_MTU) { if (_context.random().nextLong(_mtuDecreases) <= 0) { _mtu = LARGE_MTU; @@ -997,7 +999,6 @@ public class PeerState { } public RemoteHostId getRemoteHostId() { return _remoteHostId; } - public int add(OutboundMessageState state) { if (_dead) { @@ -1009,10 +1010,50 @@ public class PeerState { _log.debug("Adding to " + _remotePeer.toBase64() + ": " + state.getMessageId()); List msgs = _outboundMessages; if (msgs == null) return 0; + int rv = 0; + boolean fail = false; synchronized (msgs) { - msgs.add(state); - return msgs.size(); + if (_retransmitter != null) { + long lifetime = _retransmitter.getLifetime(); + long totalLifetime = lifetime; + for (int i = 1; i < msgs.size(); i++) { // skip the first, as thats the retransmitter + OutboundMessageState cur = (OutboundMessageState)msgs.get(i); + totalLifetime += cur.getLifetime(); + } + long remaining = -1; + OutNetMessage omsg = state.getMessage(); + if (omsg != null) + remaining = omsg.getExpiration() - _context.clock().now(); + else + remaining = 10*1000 - state.getLifetime(); + + if (remaining <= 0) + remaining = 1; // total lifetime will exceed it anyway, guaranteeing failure + float pDrop = totalLifetime / (float)remaining; + pDrop = pDrop * pDrop * pDrop; + if (pDrop >= _context.random().nextFloat()) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Proactively tail dropping for " + _remotePeer.toBase64() + " (messages=" + msgs.size() + + " headLifetime=" + lifetime + " totalLifetime=" + totalLifetime + " curLifetime=" + state.getLifetime() + + " remaining=" + remaining + " pDrop=" + pDrop + ")"); + _context.statManager().addRateData("udp.queueDropSize", msgs.size(), totalLifetime); + fail = true; + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Probabalistically allowing for " + _remotePeer.toBase64() + " (messages=" + msgs.size() + + " headLifetime=" + lifetime + " totalLifetime=" + totalLifetime + " curLifetime=" + state.getLifetime() + + " remaining=" + remaining + " pDrop=" + pDrop + ")"); + _context.statManager().addRateData("udp.queueAllowTotalLifetime", totalLifetime, lifetime); + msgs.add(state); + } + } else { + msgs.add(state); + } + rv = msgs.size(); } + if (fail) + _transport.failed(state, false); + return rv; } /** drop all outbound messages */ public void dropOutbound() { @@ -1202,7 +1243,7 @@ public class PeerState { * mind bw/cwin throttle, etc) * */ - private static final boolean THROTTLE_INITIAL_SEND = false; + private static final boolean THROTTLE_INITIAL_SEND = true; private static final int SSU_HEADER_SIZE = 46; static final int UDP_HEADER_SIZE = 8; diff --git a/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java b/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java index 0625054ef8..186cb3a974 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java +++ b/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java @@ -603,8 +603,8 @@ class BuildHandler { _context.statManager().addRateData("tunnel.dropLoadBacklog", _inboundBuildMessages.size(), _inboundBuildMessages.size()); } else { int queueTime = estimateQueueTime(_inboundBuildMessages.size()); - float pDrop = queueTime/(BuildRequestor.REQUEST_TIMEOUT/2); - pDrop = pDrop * pDrop * pDrop; + float pDrop = queueTime/((float)BuildRequestor.REQUEST_TIMEOUT/2); + pDrop = pDrop * pDrop; float f = _context.random().nextFloat(); if (pDrop > f) { _context.statManager().addRateData("tunnel.dropLoadProactive", queueTime, _inboundBuildMessages.size());