* Tunnels: Implement random discard to enforce share limit

This commit is contained in:
zzz
2008-10-10 17:34:25 +00:00
parent 4c2d4144d1
commit 855293d673
8 changed files with 115 additions and 9 deletions

View File

@ -53,7 +53,7 @@ public class StatManager {
"tunnel.buildRatio.*,tunnel.buildFailure,tunnel.buildSuccess,tunnel.corruptMessage," +
"tunnel.decryptRequestTime,tunnel.fragmentedDropped,tunnel.participatingMessageCount,"+
"tunnel.participatingTunnels,tunnel.testFailedTime,tunnel.testSuccessTime," +
"udp.sendPacketSize,udp.packetsRetransmitted" ;
"tunnel.participatingBandwidth,udp.sendPacketSize,udp.packetsRetransmitted" ;
/**
* The stat manager should only be constructed and accessed through the

View File

@ -1,3 +1,17 @@
2008-10-10 zzz
* Profiles: Reduce reject penalty in
capacity calculation to avoid a congestion collapse
* Throttle: Change reject to BANDWIDTH from CRIT on shutdown
for improved anonymity
* Tunnels: Implement random discard to enforce share limit
* Tunnel Tests: Add time for outbound delay, to avoid
congestion collapse
* UDPPacketReader: Adjust logging
* build files: Change to source=1.5, target=1.5
* configpeer.jsp: Table cleanup
* i2psnark: Change default tunnel length from 1+1 to 2+0
* peers.jsp: Change <,> to in,out for UDP
2008-10-09 sponge
* Update version to -3
* BOB database threadlocking fixes

View File

@ -17,7 +17,7 @@ import net.i2p.CoreVersion;
public class RouterVersion {
public final static String ID = "$Revision: 1.548 $ $Date: 2008-06-07 23:00:00 $";
public final static String VERSION = "0.6.4";
public final static long BUILD = 3;
public final static long BUILD = 4;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -28,6 +28,8 @@ public class HopConfig {
private Map _options;
private long _messagesProcessed;
private long _oldMessagesProcessed;
private long _messagesSent;
private long _oldMessagesSent;
/** IV length for {@link #getReplyIV} */
public static final int REPLY_IV_LENGTH = 16;
@ -44,6 +46,8 @@ public class HopConfig {
_options = null;
_messagesProcessed = 0;
_oldMessagesProcessed = 0;
_messagesSent = 0;
_oldMessagesSent = 0;
}
/** what tunnel ID are we receiving on? */
@ -115,6 +119,7 @@ public class HopConfig {
public void setOptions(Map options) { _options = options; }
/** take note of a message being pumped through this tunnel */
/** "processed" is for incoming and "sent" is for outgoing (could be dropped in between) */
public void incrementProcessedMessages() { _messagesProcessed++; }
public long getProcessedMessagesCount() { return _messagesProcessed; }
public long getRecentMessagesCount() {
@ -122,6 +127,13 @@ public class HopConfig {
_oldMessagesProcessed = _messagesProcessed;
return rv;
}
public void incrementSentMessages() { _messagesSent++; }
public long getSentMessagesCount() { return _messagesSent; }
public long getRecentSentMessagesCount() {
long rv = _messagesSent - _oldMessagesSent;
_oldMessagesSent = _messagesSent;
return rv;
}
public String toString() {
StringBuffer buf = new StringBuffer(64);

View File

@ -22,6 +22,8 @@ public class InboundGatewayReceiver implements TunnelGateway.Receiver {
return receiveEncrypted(encrypted, false);
}
public long receiveEncrypted(byte[] encrypted, boolean alreadySearched) {
if (!alreadySearched)
_config.incrementProcessedMessages();
if (_target == null) {
_target = _context.netDb().lookupRouterInfoLocally(_config.getSendTo());
if (_target == null) {
@ -33,7 +35,9 @@ public class InboundGatewayReceiver implements TunnelGateway.Receiver {
}
}
_config.incrementProcessedMessages();
if (_context.tunnelDispatcher().shouldDropParticipatingMessage())
return -1;
_config.incrementSentMessages();
TunnelDataMessage msg = new TunnelDataMessage(_context);
msg.setData(encrypted);
msg.setTunnelId(_config.getSendTunnel());

View File

@ -41,6 +41,11 @@ public class OutboundTunnelEndpoint {
+ " to be forwarded on to "
+ (toRouter != null ? toRouter.toBase64().substring(0,4) : "")
+ (toTunnel != null ? toTunnel.getTunnelId() + "" : ""));
// don't drop it if we are the target
if ((!_context.routerHash().equals(toRouter)) &&
_context.tunnelDispatcher().shouldDropParticipatingMessage())
return;
_config.incrementSentMessages();
_outDistributor.distribute(msg, toRouter, toTunnel);
}
}

View File

@ -18,6 +18,8 @@ import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.router.Service;
import net.i2p.router.peermanager.PeerProfile;
import net.i2p.stat.Rate;
import net.i2p.stat.RateStat;
import net.i2p.util.Log;
/**
@ -110,6 +112,12 @@ public class TunnelDispatcher implements Service {
ctx.statManager().createRateStat("tunnel.participatingBandwidth",
"Participating traffic", "Tunnels",
new long[] { 60*1000l, 60*10*1000l });
ctx.statManager().createRateStat("tunnel.participatingBandwidthOut",
"Participating traffic", "Tunnels",
new long[] { 60*1000l, 60*10*1000l });
ctx.statManager().createRateStat("tunnel.participatingMessageDropped",
"Dropped for exceeding share limit", "Tunnels",
new long[] { 60*1000l, 60*10*1000l });
ctx.statManager().createRateStat("tunnel.participatingMessageCount",
"How many messages are sent through a participating tunnel?", "Tunnels",
new long[] { 60*1000l, 60*10*1000l, 60*60*1000l });
@ -550,6 +558,7 @@ public class TunnelDispatcher implements Service {
int size = participating.size();
long count = 0;
long bw = 0;
long bwOut = 0;
long tcount = 0;
long tooYoung = _context.clock().now() - 60*1000;
long tooOld = tooYoung - 9*60*1000;
@ -557,6 +566,7 @@ public class TunnelDispatcher implements Service {
HopConfig cfg = (HopConfig)participating.get(i);
long c = cfg.getRecentMessagesCount();
bw += c;
bwOut += cfg.getRecentSentMessagesCount();
long created = cfg.getCreation();
if (created > tooYoung || created < tooOld)
continue;
@ -567,9 +577,64 @@ public class TunnelDispatcher implements Service {
count = count * 30 / tcount;
_context.statManager().addRateData("tunnel.participatingMessageCount", count, 20*1000);
_context.statManager().addRateData("tunnel.participatingBandwidth", bw*1024/20, 20*1000);
_context.statManager().addRateData("tunnel.participatingBandwidthOut", bwOut*1024/20, 20*1000);
_context.statManager().addRateData("tunnel.participatingTunnels", size, 0);
}
/**
* Implement random early discard (RED) to enforce the share bandwidth limit.
* For now, this does not enforce the available bandwidth,
* we leave that to Throttle.
* This is similar to the code in ../RouterThrottleImpl.java
* We drop in proportion to how far over the limit we are.
* Perhaps an exponential function would be better?
*/
public boolean shouldDropParticipatingMessage() {
RateStat rs = _context.statManager().getRate("tunnel.participatingBandwidth");
if (rs == null)
return false;
Rate r = rs.getRate(60*1000);
if (r == null)
return false;
// weight current period higher
long count = r.getLastEventCount() + (3 * r.getCurrentEventCount());
int bw = 0;
if (count > 0)
bw = (int) ((r.getLastTotalValue() + (3 * r.getCurrentTotalValue())) / count);
else
bw = (int) r.getLifetimeAverageValue();
int usedIn = Math.min(_context.router().get1sRateIn(), _context.router().get15sRateIn());
usedIn = Math.min(usedIn, bw);
if (usedIn <= 0)
return false;
int usedOut = Math.min(_context.router().get1sRate(true), _context.router().get15sRate(true));
usedOut = Math.min(usedOut, bw);
if (usedOut <= 0)
return false;
int used = Math.min(usedIn, usedOut);
int maxKBps = Math.min(_context.bandwidthLimiter().getInboundKBytesPerSecond(),
_context.bandwidthLimiter().getOutboundKBytesPerSecond());
float share = (float) _context.router().getSharePercentage();
// start dropping at 95% of the limit
float maxBps = maxKBps * share * 1024f * 0.95f;
float pctDrop = (used - maxBps) / used;
if (pctDrop <= 0)
return false;
float rand = _context.random().nextFloat();
boolean reject = rand <= pctDrop;
if (reject) {
if (_log.shouldLog(Log.WARN)) {
int availBps = (int) (((maxKBps*1024)*share) - used);
_log.warn("Drop part. msg. avail/max/used " + availBps + "/" + (int) maxBps + "/"
+ used + " %Drop = " + pctDrop);
}
_context.statManager().addRateData("tunnel.participatingMessageDropped", 1, 0);
}
return reject;
}
private static final int DROP_BASE_INTERVAL = 40 * 1000;
private static final int DROP_RANDOM_BOOST = 10 * 1000;
@ -685,9 +750,9 @@ public class TunnelDispatcher implements Service {
// already scheduled for the future, and before this expiration
}
}
if (_log.shouldLog(Log.INFO)) {
if (_log.shouldLog(Log.DEBUG)) {
long now = getContext().clock().now();
_log.info("Scheduling leave in " + DataHelper.formatDuration(dropTime.longValue()-now) +": " + cfg);
_log.debug("Scheduling leave in " + DataHelper.formatDuration(dropTime.longValue()-now) +": " + cfg);
}
}

View File

@ -75,6 +75,7 @@ public class TunnelParticipant {
}
if ( (_config != null) && (_config.getSendTo() != null) ) {
_config.incrementProcessedMessages();
RouterInfo ri = _nextHopCache;
if (ri == null)
ri = _context.netDb().lookupRouterInfoLocally(_config.getSendTo());
@ -82,10 +83,10 @@ public class TunnelParticipant {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Send off to nextHop directly (" + _config.getSendTo().toBase64().substring(0,4)
+ " for " + msg);
_config.incrementProcessedMessages();
send(_config, msg, ri);
if (_config != null)
incrementThroughput(_config.getReceiveFrom());
// see comments below
//if (_config != null)
// incrementThroughput(_config.getReceiveFrom());
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Lookup the nextHop (" + _config.getSendTo().toBase64().substring(0,4)
@ -109,6 +110,7 @@ public class TunnelParticipant {
* influence who we spend our time profiling is dangerous, so this will be disabled for
* now.
*/
/****
private void incrementThroughput(Hash prev) {
if (true) return;
long now = System.currentTimeMillis();
@ -123,6 +125,7 @@ public class TunnelParticipant {
_periodMessagesTransferred++;
}
}
****/
public int getCompleteCount() {
if (_handler != null)
@ -147,6 +150,9 @@ public class TunnelParticipant {
}
private void send(HopConfig config, TunnelDataMessage msg, RouterInfo ri) {
if (_context.tunnelDispatcher().shouldDropParticipatingMessage())
return;
_config.incrementSentMessages();
long oldId = msg.getUniqueId();
long newId = _context.random().nextLong(I2NPMessage.MAX_ID_VALUE);
_context.messageHistory().wrap("TunnelDataMessage", oldId, "TunnelDataMessage", newId);
@ -221,4 +227,4 @@ public class TunnelParticipant {
return "inbound endpoint";
}
}
}
}