2005-02-26 jrandom

* Further streaming lib caching improvements
    * Reduce the minimum RTT (used to calculate retry timeouts), but also
      increase the RTT on resends.
    * Lower the default message size to 4KB from 16KB to further reduce the
      chance of failed fragmentation.
    * Extend tunnel rebuild throttling to include fallback rebuilds
    * If there are less than 20 routers known, don't drop the last 20 (to help
      avoid dropping all peers under catastrophic failures)
    * New stats for end to end messages - "client.leaseSetFoundLocally",
      "client.leaseSetFoundRemoteTime", and "client.leaseSetFailedRemoteTime"
This commit is contained in:
jrandom
2005-02-26 19:16:46 +00:00
committed by zzz
parent 4cec9da0a6
commit 238389fc7f
15 changed files with 84 additions and 39 deletions

View File

@ -76,7 +76,7 @@ public class Connection {
private long _lifetimeDupMessageReceived; private long _lifetimeDupMessageReceived;
public static final long MAX_RESEND_DELAY = 60*1000; public static final long MAX_RESEND_DELAY = 60*1000;
public static final long MIN_RESEND_DELAY = 20*1000; public static final long MIN_RESEND_DELAY = 10*1000;
/** wait up to 5 minutes after disconnection so we can ack/close packets */ /** wait up to 5 minutes after disconnection so we can ack/close packets */
public static int DISCONNECT_TIMEOUT = 5*60*1000; public static int DISCONNECT_TIMEOUT = 5*60*1000;
@ -870,6 +870,8 @@ public class Connection {
_log.warn("Congestion resending packet " + _packet.getSequenceNum() + ": new windowSize " + newWindowSize _log.warn("Congestion resending packet " + _packet.getSequenceNum() + ": new windowSize " + newWindowSize
+ ") for " + Connection.this.toString()); + ") for " + Connection.this.toString());
// setRTT has its own ceiling
getOptions().setRTT(getOptions().getRTT() + 30*1000);
getOptions().setWindowSize(newWindowSize); getOptions().setWindowSize(newWindowSize);
windowAdjusted(); windowAdjusted();
} }

View File

@ -328,6 +328,7 @@ public class ConnectionManager {
} }
_outboundQueue.enqueue(packet); _outboundQueue.enqueue(packet);
packet.releasePayload();
if (blocking) { if (blocking) {
synchronized (req) { synchronized (req) {

View File

@ -81,7 +81,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
super.init(opts); super.init(opts);
setConnectDelay(getInt(opts, PROP_CONNECT_DELAY, -1)); setConnectDelay(getInt(opts, PROP_CONNECT_DELAY, -1));
setProfile(getInt(opts, PROP_PROFILE, PROFILE_BULK)); setProfile(getInt(opts, PROP_PROFILE, PROFILE_BULK));
setMaxMessageSize(getInt(opts, PROP_MAX_MESSAGE_SIZE, 16*1024)); setMaxMessageSize(getInt(opts, PROP_MAX_MESSAGE_SIZE, 4*1024));
setRTT(getInt(opts, PROP_INITIAL_RTT, 30*1000)); setRTT(getInt(opts, PROP_INITIAL_RTT, 30*1000));
setReceiveWindow(getInt(opts, PROP_INITIAL_RECEIVE_WINDOW, 1)); setReceiveWindow(getInt(opts, PROP_INITIAL_RECEIVE_WINDOW, 1));
setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 1000)); setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 1000));

View File

@ -77,7 +77,7 @@ public class ConnectionPacketHandler {
+ ": dropping " + packet); + ": dropping " + packet);
ack(con, packet.getAckThrough(), packet.getNacks(), null, false); ack(con, packet.getAckThrough(), packet.getNacks(), null, false);
con.getOptions().setChoke(5*1000); con.getOptions().setChoke(5*1000);
_cache.release(packet.getPayload()); packet.releasePayload();
return; return;
} }
con.getOptions().setChoke(0); con.getOptions().setChoke(0);
@ -219,6 +219,8 @@ public class ConnectionPacketHandler {
+ con.getLastCongestionSeenAt() + " (#resends: " + numResends + con.getLastCongestionSeenAt() + " (#resends: " + numResends
+ ") for " + con); + ") for " + con);
// setRTT has its own ceiling
con.getOptions().setRTT(con.getOptions().getRTT() + 30*1000);
con.getOptions().setWindowSize(oldSize); con.getOptions().setWindowSize(oldSize);
congested = true; congested = true;

View File

@ -209,6 +209,8 @@ public class Packet {
/** get the actual payload of the message. may be null */ /** get the actual payload of the message. may be null */
public ByteArray getPayload() { return _payload; } public ByteArray getPayload() { return _payload; }
public void setPayload(ByteArray payload) { public void setPayload(ByteArray payload) {
if ( (_payload != null) && (_payload != payload) )
_cache.release(_payload);
_payload = payload; _payload = payload;
if ( (payload != null) && (payload.getValid() > MAX_PAYLOAD_SIZE) ) if ( (payload != null) && (payload.getValid() > MAX_PAYLOAD_SIZE) )
throw new IllegalArgumentException("Too large payload: " + payload.getValid()); throw new IllegalArgumentException("Too large payload: " + payload.getValid());
@ -216,6 +218,11 @@ public class Packet {
public int getPayloadSize() { public int getPayloadSize() {
return (_payload == null ? 0 : _payload.getValid()); return (_payload == null ? 0 : _payload.getValid());
} }
public void releasePayload() {
if (_payload != null)
_cache.release(_payload);
_payload = null;
}
/** is a particular flag set on this packet? */ /** is a particular flag set on this packet? */
public boolean isFlagSet(int flag) { return 0 != (_flags & flag); } public boolean isFlagSet(int flag) { return 0 != (_flags & flag); }

View File

@ -155,12 +155,14 @@ public class PacketHandler {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Receive a syn packet with the wrong IDs, sending reset: " + packet); _log.warn("Receive a syn packet with the wrong IDs, sending reset: " + packet);
sendReset(packet); sendReset(packet);
packet.releasePayload();
} else { } else {
if (!con.getResetSent()) { if (!con.getResetSent()) {
// someone is sending us a packet on the wrong stream // someone is sending us a packet on the wrong stream
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Received a packet on the wrong stream: " + packet + " connection: " + con); _log.warn("Received a packet on the wrong stream: " + packet + " connection: " + con);
} }
packet.releasePayload();
} }
} }
} }
@ -187,6 +189,7 @@ public class PacketHandler {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Echo packet received with no stream IDs: " + packet); _log.warn("Echo packet received with no stream IDs: " + packet);
} }
packet.releasePayload();
} else { } else {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet received on an unknown stream (and not an ECHO): " + packet); _log.debug("Packet received on an unknown stream (and not an ECHO): " + packet);
@ -221,6 +224,7 @@ public class PacketHandler {
+ buf.toString() + " sendId: " + buf.toString() + " sendId: "
+ (sendId != null ? Base64.encode(sendId) : " unknown")); + (sendId != null ? Base64.encode(sendId) : " unknown"));
} }
packet.releasePayload();
} }
} }
} }

View File

@ -82,32 +82,24 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
_numSends++; _numSends++;
_lastSend = _context.clock().now(); _lastSend = _context.clock().now();
} }
public void ackReceived() { public void ackReceived() {
ByteArray ba = null;
synchronized (this) { synchronized (this) {
if (_ackOn <= 0) if (_ackOn <= 0)
_ackOn = _context.clock().now(); _ackOn = _context.clock().now();
ba = getPayload(); releasePayload();
setPayload(null);
notifyAll(); notifyAll();
} }
SimpleTimer.getInstance().removeEvent(_resendEvent); SimpleTimer.getInstance().removeEvent(_resendEvent);
if (ba != null)
_cache.release(ba);
} }
public void cancelled() { public void cancelled() {
ByteArray ba = null;
synchronized (this) { synchronized (this) {
_cancelledOn = _context.clock().now(); _cancelledOn = _context.clock().now();
ba = getPayload(); releasePayload();
setPayload(null);
notifyAll(); notifyAll();
} }
SimpleTimer.getInstance().removeEvent(_resendEvent); SimpleTimer.getInstance().removeEvent(_resendEvent);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Cancelled! " + toString(), new Exception("cancelled")); _log.debug("Cancelled! " + toString(), new Exception("cancelled"));
if (ba != null)
_cache.release(ba);
} }
/** how long after packet creation was it acked? */ /** how long after packet creation was it acked? */
@ -144,10 +136,12 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
int window = _connection.getOptions().getWindowSize(); int window = _connection.getOptions().getWindowSize();
boolean accepted = _connection.packetSendChoke(maxWaitMs); boolean accepted = _connection.packetSendChoke(maxWaitMs);
long after = _context.clock().now(); long after = _context.clock().now();
if (accepted) if (accepted) {
_acceptedOn = after; _acceptedOn = after;
else } else {
_acceptedOn = -1; _acceptedOn = -1;
releasePayload();
}
int afterQueued = _connection.getUnackedPacketsSent(); int afterQueued = _connection.getUnackedPacketsSent();
if ( (after - before > 1000) && (_log.shouldLog(Log.DEBUG)) ) if ( (after - before > 1000) && (_log.shouldLog(Log.DEBUG)) )
_log.debug("Took " + (after-before) + "ms to get " _log.debug("Took " + (after-before) + "ms to get "
@ -162,11 +156,11 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
long expiration = _context.clock().now()+maxWaitMs; long expiration = _context.clock().now()+maxWaitMs;
while (true) { while (true) {
long timeRemaining = expiration - _context.clock().now(); long timeRemaining = expiration - _context.clock().now();
if ( (timeRemaining <= 0) && (maxWaitMs > 0) ) return; if ( (timeRemaining <= 0) && (maxWaitMs > 0) ) break;
try { try {
synchronized (this) { synchronized (this) {
if (_ackOn > 0) return; if (_ackOn > 0) break;
if (_cancelledOn > 0) return; if (_cancelledOn > 0) break;
if (timeRemaining > 60*1000) if (timeRemaining > 60*1000)
timeRemaining = 60*1000; timeRemaining = 60*1000;
else if (timeRemaining <= 0) else if (timeRemaining <= 0)
@ -175,6 +169,8 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
} }
} catch (InterruptedException ie) {} } catch (InterruptedException ie) {}
} }
if (!writeSuccessful())
releasePayload();
} }
public boolean writeAccepted() { return _acceptedOn > 0 && _cancelledOn <= 0; } public boolean writeAccepted() { return _acceptedOn > 0 && _cancelledOn <= 0; }

View File

@ -25,7 +25,6 @@ class PacketQueue {
private I2PSession _session; private I2PSession _session;
private ConnectionManager _connectionManager; private ConnectionManager _connectionManager;
private ByteCache _cache = ByteCache.getInstance(64, 36*1024); private ByteCache _cache = ByteCache.getInstance(64, 36*1024);
private ByteCache _packetCache = ByteCache.getInstance(128, Packet.MAX_PAYLOAD_SIZE);
public PacketQueue(I2PAppContext context, I2PSession session, ConnectionManager mgr) { public PacketQueue(I2PAppContext context, I2PSession session, ConnectionManager mgr) {
_context = context; _context = context;
@ -129,7 +128,13 @@ class PacketQueue {
if ( (packet.getSequenceNum() == 0) && (!packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) ) { if ( (packet.getSequenceNum() == 0) && (!packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) ) {
// ack only, so release it asap // ack only, so release it asap
_packetCache.release(packet.getPayload()); packet.releasePayload();
} else if (packet.isFlagSet(Packet.FLAG_ECHO) && !packet.isFlagSet(Packet.FLAG_SIGNATURE_INCLUDED) ) {
// pong
packet.releasePayload();
} else if (packet.isFlagSet(Packet.FLAG_RESET)) {
// reset
packet.releasePayload();
} }
} }

View File

@ -77,7 +77,7 @@ public class SimpleTimer {
totalEvents = _events.size(); totalEvents = _events.size();
_events.notifyAll(); _events.notifyAll();
} }
if (time.longValue() > eventTime + 5) { if (time.longValue() > eventTime + 100) {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error("Lots of timer congestion, had to push " + event + " back " _log.error("Lots of timer congestion, had to push " + event + " back "
+ (time.longValue()-eventTime) + "ms (# events: " + totalEvents + ")"); + (time.longValue()-eventTime) + "ms (# events: " + totalEvents + ")");

View File

@ -1,4 +1,16 @@
$Id: history.txt,v 1.156 2005/02/24 13:05:26 jrandom Exp $ $Id: history.txt,v 1.157 2005/02/24 18:53:35 jrandom Exp $
2005-02-26 jrandom
* Further streaming lib caching improvements
* Reduce the minimum RTT (used to calculate retry timeouts), but also
increase the RTT on resends.
* Lower the default message size to 4KB from 16KB to further reduce the
chance of failed fragmentation.
* Extend tunnel rebuild throttling to include fallback rebuilds
* If there are less than 20 routers known, don't drop the last 20 (to help
avoid dropping all peers under catastrophic failures)
* New stats for end to end messages - "client.leaseSetFoundLocally",
"client.leaseSetFoundRemoteTime", and "client.leaseSetFailedRemoteTime"
2005-02-24 jrandom 2005-02-24 jrandom
* Throttle the number of tunnel rebuilds per minute, preventing CPU * Throttle the number of tunnel rebuilds per minute, preventing CPU

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
* *
*/ */
public class RouterVersion { public class RouterVersion {
public final static String ID = "$Revision: 1.151 $ $Date: 2005/02/24 13:05:26 $"; public final static String ID = "$Revision: 1.152 $ $Date: 2005/02/24 18:53:36 $";
public final static String VERSION = "0.5.0.1"; public final static String VERSION = "0.5.0.1";
public final static long BUILD = 3; public final static long BUILD = 4;
public static void main(String args[]) { public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION); System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID); System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -62,6 +62,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
private long _cloveId; private long _cloveId;
private long _start; private long _start;
private boolean _finished; private boolean _finished;
private long _leaseSetLookupBegin;
/** /**
* final timeout (in milliseconds) that the outbound message will fail in. * final timeout (in milliseconds) that the outbound message will fail in.
@ -110,13 +111,16 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
ctx.statManager().createRateStat("client.timeoutCongestionTunnel", "How lagged our tunnels are when a send times out?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); ctx.statManager().createRateStat("client.timeoutCongestionTunnel", "How lagged our tunnels are when a send times out?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.timeoutCongestionMessage", "How fast we process messages locally when a send times out?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); ctx.statManager().createRateStat("client.timeoutCongestionMessage", "How fast we process messages locally when a send times out?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.timeoutCongestionInbound", "How much faster we are receiving data than our average bps when a send times out?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); ctx.statManager().createRateStat("client.timeoutCongestionInbound", "How much faster we are receiving data than our average bps when a send times out?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.leaseSetFoundLocally", "How often we tried to look for a leaseSet and found it locally?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.leaseSetFoundRemoteTime", "How long we tried to look fora remote leaseSet (when we succeeded)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.leaseSetFailedRemoteTime", "How long we tried to look for a remote leaseSet (when we failed)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
long timeoutMs = OVERALL_TIMEOUT_MS_DEFAULT; long timeoutMs = OVERALL_TIMEOUT_MS_DEFAULT;
_clientMessage = msg; _clientMessage = msg;
_clientMessageId = msg.getMessageId(); _clientMessageId = msg.getMessageId();
_clientMessageSize = msg.getPayload().getSize(); _clientMessageSize = msg.getPayload().getSize();
_from = msg.getFromDestination(); _from = msg.getFromDestination();
_to = msg.getDestination(); _to = msg.getDestination();
_leaseSetLookupBegin = -1;
String param = msg.getSenderConfig().getOptions().getProperty(OVERALL_TIMEOUT_MS_PARAM); String param = msg.getSenderConfig().getOptions().getProperty(OVERALL_TIMEOUT_MS_PARAM);
if (param == null) if (param == null)
@ -154,9 +158,15 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
LookupLeaseSetFailedJob failed = new LookupLeaseSetFailedJob(getContext()); LookupLeaseSetFailedJob failed = new LookupLeaseSetFailedJob(getContext());
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": Send outbound client message - sending off leaseSet lookup job"); _log.debug(getJobId() + ": Send outbound client message - sending off leaseSet lookup job");
getContext().netDb().lookupLeaseSet(key, success, failed, timeoutMs); LeaseSet ls = getContext().netDb().lookupLeaseSetLocally(key);
if (_log.shouldLog(Log.DEBUG)) if (ls != null) {
_log.debug(getJobId() + ": after sending off leaseSet lookup job"); getContext().statManager().addRateData("client.leaseSetFoundLocally", 1, 0);
_leaseSetLookupBegin = -1;
success.runJob();
} else {
_leaseSetLookupBegin = getContext().clock().now();
getContext().netDb().lookupLeaseSet(key, success, failed, timeoutMs);
}
} }
private boolean getShouldBundle() { private boolean getShouldBundle() {
@ -189,6 +199,10 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
} }
public String getName() { return "Send outbound client message through the lease"; } public String getName() { return "Send outbound client message through the lease"; }
public void runJob() { public void runJob() {
if (_leaseSetLookupBegin > 0) {
long lookupTime = getContext().clock().now() - _leaseSetLookupBegin;
getContext().statManager().addRateData("client.leaseSetFoundRemoteTime", lookupTime, lookupTime);
}
boolean ok = getNextLease(); boolean ok = getNextLease();
if (ok) if (ok)
send(); send();
@ -262,7 +276,12 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
super(enclosingContext); super(enclosingContext);
} }
public String getName() { return "Lookup for outbound client message failed"; } public String getName() { return "Lookup for outbound client message failed"; }
public void runJob() { public void runJob() {
if (_leaseSetLookupBegin > 0) {
long lookupTime = getContext().clock().now() - _leaseSetLookupBegin;
getContext().statManager().addRateData("client.leaseSetFailedRemoteTime", lookupTime, lookupTime);
}
dieFatal(); dieFatal();
} }
} }

View File

@ -115,8 +115,8 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
public final static String PROP_DB_DIR = "router.networkDatabase.dbDir"; public final static String PROP_DB_DIR = "router.networkDatabase.dbDir";
public final static String DEFAULT_DB_DIR = "netDb"; public final static String DEFAULT_DB_DIR = "netDb";
/** if we have less than 5 routers left, don't drop any more, even if they're failing or doing bad shit */ /** if we have less than 20 routers left, don't drop any more, even if they're failing or doing bad shit */
private final static int MIN_REMAINING_ROUTERS = 5; private final static int MIN_REMAINING_ROUTERS = 20;
/** /**
* dont accept any dbDtore of a router over 6 hours old (unless we dont * dont accept any dbDtore of a router over 6 hours old (unless we dont

View File

@ -37,9 +37,6 @@ public class TunnelBuilder {
PooledTunnelCreatorConfig cfg = configTunnel(ctx, pool, zeroHop); PooledTunnelCreatorConfig cfg = configTunnel(ctx, pool, zeroHop);
if (cfg == null) { if (cfg == null) {
RetryJob j = new RetryJob(ctx, pool);
j.getTiming().setStartAfter(ctx.clock().now() + ctx.random().nextInt(30*1000));
ctx.jobQueue().addJob(j);
return; return;
} }
OnCreatedJob onCreated = new OnCreatedJob(ctx, pool, cfg); OnCreatedJob onCreated = new OnCreatedJob(ctx, pool, cfg);

View File

@ -338,8 +338,8 @@ public class TunnelPool {
_log.info(toString() + ": building a fallback tunnel (usable: " + usable + " needed: " + quantity + ")"); _log.info(toString() + ": building a fallback tunnel (usable: " + usable + " needed: " + quantity + ")");
if ( (usable == 0) && (_settings.getAllowZeroHop()) ) if ( (usable == 0) && (_settings.getAllowZeroHop()) )
_builder.buildTunnel(_context, this, true); _builder.buildTunnel(_context, this, true);
else //else
_builder.buildTunnel(_context, this); // _builder.buildTunnel(_context, this);
refreshBuilders(); refreshBuilders();
} }
@ -433,7 +433,7 @@ public class TunnelPool {
if (!_alive) return; if (!_alive) return;
int added = refreshBuilders(); int added = refreshBuilders();
if ( (added > 0) && (_log.shouldLog(Log.WARN)) ) if ( (added > 0) && (_log.shouldLog(Log.WARN)) )
_log.warn("Passive rebuilding a tunnel"); _log.warn("Passive rebuilding a tunnel for " + TunnelPool.this.toString());
requeue(60*1000); requeue(60*1000);
} }
} }