2005-11-23 jrandom

* Removed spurious streaming lib RTO increase (it wasn't helpful)
    * Streamlined the tunnel batching to schedule batch transmissions more
      appropriately.
    * Default tunnel pool variance to 2 +0-1 hops
This commit is contained in:
jrandom
2005-11-23 16:04:52 +00:00
committed by zzz
parent 5e094b43b3
commit 2b841ad667
10 changed files with 193 additions and 43 deletions

View File

@ -72,7 +72,7 @@ public class Connection {
private long _lifetimeDupMessageSent;
private long _lifetimeDupMessageReceived;
public static final long MAX_RESEND_DELAY = 15*1000;
public static final long MAX_RESEND_DELAY = 10*1000;
public static final long MIN_RESEND_DELAY = 2*1000;
/** wait up to 5 minutes after disconnection so we can ack/close packets */
@ -992,7 +992,7 @@ public class Connection {
newWindowSize = 1;
// setRTT has its own ceiling
getOptions().setRTT(getOptions().getRTT() + 10*1000);
//getOptions().setRTT(getOptions().getRTT() + 10*1000);
getOptions().setWindowSize(newWindowSize);
if (_log.shouldLog(Log.WARN))

View File

@ -80,7 +80,7 @@ public class ConnectionPacketHandler {
if (packet.getOptionalDelay() > 60000) {
// requested choke
choke = true;
con.getOptions().setRTT(con.getOptions().getRTT() + 10*1000);
//con.getOptions().setRTT(con.getOptions().getRTT() + 10*1000);
}
}
@ -272,7 +272,7 @@ public class ConnectionPacketHandler {
oldSize = 1;
// setRTT has its own ceiling
con.getOptions().setRTT(con.getOptions().getRTT() + 10*1000);
//con.getOptions().setRTT(con.getOptions().getRTT() + 10*1000);
con.getOptions().setWindowSize(oldSize);
if (_log.shouldLog(Log.DEBUG))

View File

@ -1,4 +1,10 @@
$Id: history.txt,v 1.325 2005/11/19 23:42:17 jrandom Exp $
$Id: history.txt,v 1.326 2005/11/21 09:37:10 jrandom Exp $
2005-11-23 jrandom
* Removed spurious streaming lib RTO increase (it wasn't helpful)
* Streamlined the tunnel batching to schedule batch transmissions more
appropriately.
* Default tunnel pool variance to 2 +0-1 hops
2005-11-21 jrandom
* IE doesn't strip SPAN from <button> form fields, so add in a workaround

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.293 $ $Date: 2005/11/19 23:42:17 $";
public final static String ID = "$Revision: 1.294 $ $Date: 2005/11/21 09:37:09 $";
public final static String VERSION = "0.6.1.5";
public final static long BUILD = 4;
public final static long BUILD = 5;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -44,7 +44,7 @@ public class TunnelPoolSettings {
public static final int DEFAULT_REBUILD_PERIOD = 60*1000;
public static final int DEFAULT_DURATION = 10*60*1000;
public static final int DEFAULT_LENGTH = 2;
public static final int DEFAULT_LENGTH_VARIANCE = -1;
public static final int DEFAULT_LENGTH_VARIANCE = 1;
public static final boolean DEFAULT_ALLOW_ZERO_HOP = true;
public TunnelPoolSettings() {

View File

@ -19,14 +19,20 @@ import net.i2p.util.Log;
public class BatchedPreprocessor extends TrivialPreprocessor {
private Log _log;
private long _pendingSince;
private String _name;
public BatchedPreprocessor(I2PAppContext ctx) {
public BatchedPreprocessor(I2PAppContext ctx, String name) {
super(ctx);
_log = ctx.logManager().getLog(BatchedPreprocessor.class);
_name = name;
_pendingSince = 0;
ctx.statManager().createRateStat("tunnel.batchMultipleCount", "How many messages are batched into a tunnel message", "Tunnels", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
ctx.statManager().createRateStat("tunnel.batchDelay", "How many messages were pending when the batching waited", "Tunnels", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
ctx.statManager().createRateStat("tunnel.batchDelaySent", "How many messages were flushed when the batching delay completed", "Tunnels", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
ctx.statManager().createRateStat("tunnel.batchCount", "How many groups of messages were flushed together", "Tunnels", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
ctx.statManager().createRateStat("tunnel.batchDelayAmount", "How long we should wait before flushing the batch", "Tunnels", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
ctx.statManager().createRateStat("tunnel.batchFlushRemaining", "How many messages remain after flushing", "Tunnels", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
ctx.statManager().createRateStat("tunnel.writeDelay", "How long after a message reaches the gateway is it processed (lifetime is size)", "Tunnels", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
}
private static final int FULL_SIZE = PREPROCESSED_SIZE
@ -37,14 +43,32 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
private static final boolean DISABLE_BATCHING = false;
/* not final or private so the test code can adjust */
static long DEFAULT_DELAY = 500;
static long DEFAULT_DELAY = 100;
/** wait up to 2 seconds before sending a small message */
protected long getSendDelay() { return DEFAULT_DELAY; }
/** if we have 50 messages queued that are too small, flush them anyway */
private static final int FORCE_BATCH_FLUSH = 50;
/** how long do we want to wait before flushing */
public long getDelayAmount() { return getDelayAmount(true); }
private long getDelayAmount(boolean shouldStat) {
long rv = -1;
long defaultAmount = getSendDelay();
if (_pendingSince > 0)
rv = _pendingSince + defaultAmount - _context.clock().now();
if (rv > defaultAmount)
rv = defaultAmount;
if (shouldStat)
_context.statManager().addRateData("tunnel.batchDelayAmount", rv, 0);
return rv;
}
public boolean preprocessQueue(List pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Preprocess queue with " + pending.size() + " to send");
if (false) {
if (DISABLE_BATCHING || getSendDelay() <= 0) {
if (_log.shouldLog(Log.INFO))
_log.info("No batching, send all messages immediately");
@ -59,7 +83,11 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
}
return false;
}
}
int batchCount = 0;
int beforeLooping = pending.size();
while (pending.size() > 0) {
int allocated = 0;
for (int i = 0; i < pending.size(); i++) {
@ -78,39 +106,47 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Pushback of " + curWanted + " (message " + (i+1) + ")");
}
if (_pendingSince > 0)
_context.statManager().addRateData("tunnel.batchDelaySent", pending.size(), 0);
if (_pendingSince > 0) {
long waited = _context.clock().now() - _pendingSince;
_context.statManager().addRateData("tunnel.batchDelaySent", pending.size(), waited);
}
_pendingSince = 0;
send(pending, 0, i, sender, rec);
if (_log.shouldLog(Log.INFO))
_log.info("Allocated=" + allocated + " so we sent " + (i+1)
+ " (last complete? " + (msg.getOffset() >= msg.getData().length) + ")");
+ " (last complete? " + (msg.getOffset() >= msg.getData().length)
+ ", off=" + msg.getOffset() + ", count=" + pending.size() + ")");
for (int j = 0; j < i; j++) {
TunnelGateway.Pending cur = (TunnelGateway.Pending)pending.remove(0);
if (cur.getOffset() < cur.getData().length)
throw new IllegalArgumentException("i=" + i + " j=" + j + " off=" + cur.getOffset()
+ " len=" + cur.getData().length + " alloc=" + allocated);
notePreprocessing(cur.getMessageId(), cur.getFragmentNumber());
_context.statManager().addRateData("tunnel.writeDelay", cur.getLifetime(), cur.getData().length);
}
if (msg.getOffset() >= msg.getData().length) {
// ok, this last message fit perfectly, remove it too
TunnelGateway.Pending cur = (TunnelGateway.Pending)pending.remove(0);
notePreprocessing(cur.getMessageId(), cur.getFragmentNumber());
_context.statManager().addRateData("tunnel.writeDelay", cur.getLifetime(), cur.getData().length);
}
if (i > 0)
_context.statManager().addRateData("tunnel.batchMultipleCount", i+1, 0);
allocated = 0;
break;
// don't break - we may have enough source messages for multiple full tunnel messages
//break;
batchCount++;
}
}
display(allocated, pending, "after looping to clear " + (beforeLooping - pending.size()));
if (allocated > 0) {
// after going through the entire pending list, we still don't
// have enough data to send a full message
if ( (_pendingSince > 0) && (_pendingSince + getSendDelay() <= _context.clock().now()) ) {
if (_log.shouldLog(Log.INFO))
_log.info("Passed through pending list, with " + allocated + "/" + pending.size()
+ " left to clean up, but we've waited, so flush");
if ( (pending.size() > FORCE_BATCH_FLUSH) || ( (_pendingSince > 0) && (getDelayAmount() <= 0) ) ) {
// not even a full message, but we want to flush it anyway
if (pending.size() > 1)
@ -119,21 +155,37 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
send(pending, 0, pending.size()-1, sender, rec);
while (pending.size() > 0) {
TunnelGateway.Pending cur = (TunnelGateway.Pending)pending.remove(0);
notePreprocessing(cur.getMessageId(), cur.getFragmentNumber());
int beforeSize = pending.size();
for (int i = 0; i < pending.size(); i++) {
TunnelGateway.Pending cur = (TunnelGateway.Pending)pending.get(i);
if (cur.getOffset() >= cur.getData().length) {
pending.remove(i);
notePreprocessing(cur.getMessageId(), cur.getFragmentNumber());
_context.statManager().addRateData("tunnel.writeDelay", cur.getLifetime(), cur.getData().length);
i--;
}
}
if (pending.size() > 0) {
_pendingSince = _context.clock().now();
_context.statManager().addRateData("tunnel.batchFlushRemaining", pending.size(), beforeSize);
display(allocated, pending, "flushed, some remain");
return true;
} else {
long delayAmount = _context.clock().now() - _pendingSince;
_pendingSince = 0;
if (batchCount > 1)
_context.statManager().addRateData("tunnel.batchCount", batchCount, 0);
display(allocated, pending, "flushed " + (beforeSize) + ", no remaining after " + delayAmount);
return false;
}
_pendingSince = 0;
return false;
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Passed through pending list, with " + allocated + "/"+ pending.size()
+ " left to clean up, but we've haven't waited, so don't flush (wait="
+ (_context.clock().now() - _pendingSince) + " / " + _pendingSince + ")");
_context.statManager().addRateData("tunnel.batchDelay", pending.size(), 0);
if (_pendingSince <= 0)
_pendingSince = _context.clock().now();
if (batchCount > 1)
_context.statManager().addRateData("tunnel.batchCount", batchCount, 0);
// not yet time to send the delayed flush
display(allocated, pending, "dont flush");
return true;
}
} else {
@ -149,6 +201,29 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
return false;
}
private void display(long allocated, List pending, String title) {
if (_log.shouldLog(Log.INFO)) {
long highestDelay = 0;
StringBuffer buf = new StringBuffer();
buf.append(_name).append(": ");
buf.append(title);
buf.append(" allocated: ").append(allocated);
buf.append(" pending: ").append(pending.size());
if (_pendingSince > 0)
buf.append(" delay: ").append(getDelayAmount(false));
for (int i = 0; i < pending.size(); i++) {
TunnelGateway.Pending curPending = (TunnelGateway.Pending)pending.get(i);
buf.append(" pending[").append(i).append("]: ");
buf.append(curPending.getOffset()).append("/").append(curPending.getData().length).append('/');
buf.append(curPending.getLifetime());
if (curPending.getLifetime() > highestDelay)
highestDelay = curPending.getLifetime();
}
_log.info(buf.toString());
}
}
/**
* Preprocess the messages from the pending list, grouping items startAt
* through sendThrough (though only part of the last one may be fully

View File

@ -18,21 +18,41 @@ public class BatchedRouterPreprocessor extends BatchedPreprocessor {
*/
public static final String PROP_BATCH_FREQUENCY = "batchFrequency";
public static final String PROP_ROUTER_BATCH_FREQUENCY = "router.batchFrequency";
public static final int DEFAULT_BATCH_FREQUENCY = 500;
public static final int DEFAULT_BATCH_FREQUENCY = 100;
public BatchedRouterPreprocessor(RouterContext ctx) {
this(ctx, (HopConfig)null);
}
public BatchedRouterPreprocessor(RouterContext ctx, TunnelCreatorConfig cfg) {
super(ctx);
super(ctx, getName(cfg));
_routerContext = ctx;
_config = cfg;
}
public BatchedRouterPreprocessor(RouterContext ctx, HopConfig cfg) {
super(ctx);
super(ctx, getName(cfg));
_routerContext = ctx;
_hopConfig = cfg;
}
private static String getName(HopConfig cfg) {
if (cfg == null) return "[unknown]";
if (cfg.getReceiveTunnel() != null)
return cfg.getReceiveTunnel().getTunnelId() + "";
else if (cfg.getSendTunnel() != null)
return cfg.getSendTunnel().getTunnelId() + "";
else
return "[n/a]";
}
private static String getName(TunnelCreatorConfig cfg) {
if (cfg == null) return "[unknown]";
if (cfg.getReceiveTunnelId(0) != null)
return cfg.getReceiveTunnelId(0).getTunnelId() + "";
else if (cfg.getSendTunnelId(0) != null)
return cfg.getSendTunnelId(0).getTunnelId() + "";
else
return "[n/a]";
}
/** how long should we wait before flushing */
protected long getSendDelay() {
@ -41,9 +61,9 @@ public class BatchedRouterPreprocessor extends BatchedPreprocessor {
Properties opts = _config.getOptions();
if (opts != null)
freq = opts.getProperty(PROP_BATCH_FREQUENCY);
} else {
freq = _routerContext.getProperty(PROP_ROUTER_BATCH_FREQUENCY);
}
if (freq == null)
freq = _routerContext.getProperty(PROP_ROUTER_BATCH_FREQUENCY);
if (freq != null) {
try {

View File

@ -0,0 +1,12 @@
package net.i2p.router.tunnel;
import net.i2p.util.SimpleTimer;
/**
*
*/
class FlushTimer extends SimpleTimer {
private static final FlushTimer _instance = new FlushTimer();
public static final SimpleTimer getInstance() { return _instance; }
protected FlushTimer() { super("TunnelFlushTimer"); }
}

View File

@ -32,7 +32,10 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
_context = ctx;
_log = ctx.logManager().getLog(TrivialPreprocessor.class);
}
/** how long do we want to wait before flushing */
public long getDelayAmount() { return 0; }
/**
* Return true if there were messages remaining, and we should queue up
* a delayed flush to clear them

View File

@ -64,6 +64,8 @@ public class TunnelGateway {
_flushFrequency = 500;
_delayedFlush = new DelayedFlush();
_lastFlush = _context.clock().now();
_context.statManager().createRateStat("tunnel.lockedGatewayAdd", "How long do we block when adding a message to a tunnel gateway's queue", "Tunnels", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("tunnel.lockedGatewayCheck", "How long do we block when flushing a tunnel gateway's queue", "Tunnels", new long[] { 60*1000, 10*60*1000 });
}
/**
@ -87,11 +89,18 @@ public class TunnelGateway {
public void add(I2NPMessage msg, Hash toRouter, TunnelId toTunnel) {
_messagesSent++;
boolean delayedFlush = false;
long delayAmount = -1;
int remaining = 0;
long beforeLock = _context.clock().now();
long afterAdded = -1;
Pending cur = new PendingImpl(msg, toRouter, toTunnel);
synchronized (_queue) {
_queue.add(cur);
afterAdded = _context.clock().now();
delayedFlush = _preprocessor.preprocessQueue(_queue, _sender, _receiver);
if (delayedFlush)
delayAmount = _preprocessor.getDelayAmount();
_lastFlush = _context.clock().now();
// expire any as necessary, even if its framented
@ -104,11 +113,13 @@ public class TunnelGateway {
i--;
}
}
remaining = _queue.size();
}
if (delayedFlush) {
SimpleTimer.getInstance().addEvent(_delayedFlush, _flushFrequency);
FlushTimer.getInstance().addEvent(_delayedFlush, delayAmount);
}
_context.statManager().addRateData("tunnel.lockedGatewayAdd", afterAdded-beforeLock, remaining);
}
public int getMessagesSent() { return _messagesSent; }
@ -131,6 +142,9 @@ public class TunnelGateway {
* @return true if we should delay before preprocessing again
*/
public boolean preprocessQueue(List pending, Sender sender, Receiver receiver);
/** how long do we want to wait before flushing */
public long getDelayAmount();
}
public interface Receiver {
@ -148,8 +162,12 @@ public class TunnelGateway {
protected byte _remaining[];
protected int _offset;
protected int _fragmentNumber;
protected long _created;
public Pending(I2NPMessage message, Hash toRouter, TunnelId toTunnel) {
public Pending(I2NPMessage message, Hash toRouter, TunnelId toTunnel) {
this(message, toRouter, toTunnel, System.currentTimeMillis());
}
public Pending(I2NPMessage message, Hash toRouter, TunnelId toTunnel, long now) {
_toRouter = toRouter;
_toTunnel = toTunnel;
_messageId = message.getUniqueId();
@ -157,6 +175,7 @@ public class TunnelGateway {
_remaining = message.toByteArray();
_offset = 0;
_fragmentNumber = 0;
_created = now;
}
/** may be null */
public Hash getToRouter() { return _toRouter; }
@ -170,17 +189,15 @@ public class TunnelGateway {
public int getOffset() { return _offset; }
/** move the offset */
public void setOffset(int offset) { _offset = offset; }
public long getLifetime() { return System.currentTimeMillis()-_created; }
/** which fragment are we working on (0 for the first fragment) */
public int getFragmentNumber() { return _fragmentNumber; }
/** ok, fragment sent, increment what the next will be */
public void incrementFragmentNumber() { _fragmentNumber++; }
}
private class PendingImpl extends Pending {
private long _created;
public PendingImpl(I2NPMessage message, Hash toRouter, TunnelId toTunnel) {
super(message, toRouter, toTunnel);
_created = _context.clock().now();
super(message, toRouter, toTunnel, _context.clock().now());
}
public String toString() {
@ -203,20 +220,37 @@ public class TunnelGateway {
buf.append(" frag ").append(_fragmentNumber);
return buf.toString();
}
public long getLifetime() { return _context.clock().now()-_created; }
}
private class DelayedFlush implements SimpleTimer.TimedEvent {
public void timeReached() {
boolean wantRequeue = false;
int remaining = 0;
long beforeLock = _context.clock().now();
long afterChecked = -1;
long delayAmount = -1;
if (_queue.size() > 10000) // stay out of the synchronized block
System.out.println("foo!");
synchronized (_queue) {
if (_queue.size() > 0)
if (_queue.size() > 10000) // stay in the synchronized block
System.out.println("foo!");
afterChecked = _context.clock().now();
if (_queue.size() > 0) {
wantRequeue = _preprocessor.preprocessQueue(_queue, _sender, _receiver);
if (wantRequeue)
delayAmount = _preprocessor.getDelayAmount();
}
remaining = _queue.size();
}
if (wantRequeue)
SimpleTimer.getInstance().addEvent(_delayedFlush, _flushFrequency);
FlushTimer.getInstance().addEvent(_delayedFlush, delayAmount);
else
_lastFlush = _context.clock().now();
_context.statManager().addRateData("tunnel.lockedGatewayCheck", afterChecked-beforeLock, remaining);
}
}
}