diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index 2132a150d4..34b12ca041 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -76,7 +76,7 @@ public class Connection { private long _lifetimeDupMessageReceived; public static final long MAX_RESEND_DELAY = 60*1000; - public static final long MIN_RESEND_DELAY = 20*1000; + public static final long MIN_RESEND_DELAY = 10*1000; /** wait up to 5 minutes after disconnection so we can ack/close packets */ public static int DISCONNECT_TIMEOUT = 5*60*1000; @@ -870,6 +870,8 @@ public class Connection { _log.warn("Congestion resending packet " + _packet.getSequenceNum() + ": new windowSize " + newWindowSize + ") for " + Connection.this.toString()); + // setRTT has its own ceiling + getOptions().setRTT(getOptions().getRTT() + 30*1000); getOptions().setWindowSize(newWindowSize); windowAdjusted(); } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java index e1d59abce0..cdfa6b2de9 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java @@ -328,6 +328,7 @@ public class ConnectionManager { } _outboundQueue.enqueue(packet); + packet.releasePayload(); if (blocking) { synchronized (req) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java index 5757e39878..1adf4a3cee 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -81,7 +81,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { super.init(opts); setConnectDelay(getInt(opts, PROP_CONNECT_DELAY, -1)); setProfile(getInt(opts, PROP_PROFILE, PROFILE_BULK)); - setMaxMessageSize(getInt(opts, PROP_MAX_MESSAGE_SIZE, 16*1024)); + setMaxMessageSize(getInt(opts, PROP_MAX_MESSAGE_SIZE, 4*1024)); setRTT(getInt(opts, PROP_INITIAL_RTT, 30*1000)); setReceiveWindow(getInt(opts, PROP_INITIAL_RECEIVE_WINDOW, 1)); setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 1000)); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index 4cdf9ff334..6d0c4a2dfe 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -77,7 +77,7 @@ public class ConnectionPacketHandler { + ": dropping " + packet); ack(con, packet.getAckThrough(), packet.getNacks(), null, false); con.getOptions().setChoke(5*1000); - _cache.release(packet.getPayload()); + packet.releasePayload(); return; } con.getOptions().setChoke(0); @@ -219,6 +219,8 @@ public class ConnectionPacketHandler { + con.getLastCongestionSeenAt() + " (#resends: " + numResends + ") for " + con); + // setRTT has its own ceiling + con.getOptions().setRTT(con.getOptions().getRTT() + 30*1000); con.getOptions().setWindowSize(oldSize); congested = true; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java index f346a0bffe..fa53f30c9c 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java @@ -209,6 +209,8 @@ public class Packet { /** get the actual payload of the message. may be null */ public ByteArray getPayload() { return _payload; } public void setPayload(ByteArray payload) { + if ( (_payload != null) && (_payload != payload) ) + _cache.release(_payload); _payload = payload; if ( (payload != null) && (payload.getValid() > MAX_PAYLOAD_SIZE) ) throw new IllegalArgumentException("Too large payload: " + payload.getValid()); @@ -216,6 +218,11 @@ public class Packet { public int getPayloadSize() { return (_payload == null ? 0 : _payload.getValid()); } + public void releasePayload() { + if (_payload != null) + _cache.release(_payload); + _payload = null; + } /** is a particular flag set on this packet? */ public boolean isFlagSet(int flag) { return 0 != (_flags & flag); } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java index 09f3590523..083ef68442 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java @@ -155,12 +155,14 @@ public class PacketHandler { if (_log.shouldLog(Log.WARN)) _log.warn("Receive a syn packet with the wrong IDs, sending reset: " + packet); sendReset(packet); + packet.releasePayload(); } else { if (!con.getResetSent()) { // someone is sending us a packet on the wrong stream if (_log.shouldLog(Log.WARN)) _log.warn("Received a packet on the wrong stream: " + packet + " connection: " + con); } + packet.releasePayload(); } } } @@ -187,6 +189,7 @@ public class PacketHandler { if (_log.shouldLog(Log.WARN)) _log.warn("Echo packet received with no stream IDs: " + packet); } + packet.releasePayload(); } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Packet received on an unknown stream (and not an ECHO): " + packet); @@ -221,6 +224,7 @@ public class PacketHandler { + buf.toString() + " sendId: " + (sendId != null ? Base64.encode(sendId) : " unknown")); } + packet.releasePayload(); } } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java index ee661b8895..c6698ab7f1 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java @@ -82,32 +82,24 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat _numSends++; _lastSend = _context.clock().now(); } - public void ackReceived() { - ByteArray ba = null; + public void ackReceived() { synchronized (this) { if (_ackOn <= 0) - _ackOn = _context.clock().now(); - ba = getPayload(); - setPayload(null); + _ackOn = _context.clock().now(); + releasePayload(); notifyAll(); } SimpleTimer.getInstance().removeEvent(_resendEvent); - if (ba != null) - _cache.release(ba); } public void cancelled() { - ByteArray ba = null; synchronized (this) { _cancelledOn = _context.clock().now(); - ba = getPayload(); - setPayload(null); + releasePayload(); notifyAll(); } SimpleTimer.getInstance().removeEvent(_resendEvent); if (_log.shouldLog(Log.DEBUG)) _log.debug("Cancelled! " + toString(), new Exception("cancelled")); - if (ba != null) - _cache.release(ba); } /** how long after packet creation was it acked? */ @@ -144,10 +136,12 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat int window = _connection.getOptions().getWindowSize(); boolean accepted = _connection.packetSendChoke(maxWaitMs); long after = _context.clock().now(); - if (accepted) + if (accepted) { _acceptedOn = after; - else + } else { _acceptedOn = -1; + releasePayload(); + } int afterQueued = _connection.getUnackedPacketsSent(); if ( (after - before > 1000) && (_log.shouldLog(Log.DEBUG)) ) _log.debug("Took " + (after-before) + "ms to get " @@ -162,11 +156,11 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat long expiration = _context.clock().now()+maxWaitMs; while (true) { long timeRemaining = expiration - _context.clock().now(); - if ( (timeRemaining <= 0) && (maxWaitMs > 0) ) return; + if ( (timeRemaining <= 0) && (maxWaitMs > 0) ) break; try { synchronized (this) { - if (_ackOn > 0) return; - if (_cancelledOn > 0) return; + if (_ackOn > 0) break; + if (_cancelledOn > 0) break; if (timeRemaining > 60*1000) timeRemaining = 60*1000; else if (timeRemaining <= 0) @@ -175,6 +169,8 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat } } catch (InterruptedException ie) {} } + if (!writeSuccessful()) + releasePayload(); } public boolean writeAccepted() { return _acceptedOn > 0 && _cancelledOn <= 0; } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java index 6478ae09a3..1c8658275e 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java @@ -25,7 +25,6 @@ class PacketQueue { private I2PSession _session; private ConnectionManager _connectionManager; private ByteCache _cache = ByteCache.getInstance(64, 36*1024); - private ByteCache _packetCache = ByteCache.getInstance(128, Packet.MAX_PAYLOAD_SIZE); public PacketQueue(I2PAppContext context, I2PSession session, ConnectionManager mgr) { _context = context; @@ -129,7 +128,13 @@ class PacketQueue { if ( (packet.getSequenceNum() == 0) && (!packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) ) { // ack only, so release it asap - _packetCache.release(packet.getPayload()); + packet.releasePayload(); + } else if (packet.isFlagSet(Packet.FLAG_ECHO) && !packet.isFlagSet(Packet.FLAG_SIGNATURE_INCLUDED) ) { + // pong + packet.releasePayload(); + } else if (packet.isFlagSet(Packet.FLAG_RESET)) { + // reset + packet.releasePayload(); } } diff --git a/core/java/src/net/i2p/util/SimpleTimer.java b/core/java/src/net/i2p/util/SimpleTimer.java index ac9b2b44dc..13931314ad 100644 --- a/core/java/src/net/i2p/util/SimpleTimer.java +++ b/core/java/src/net/i2p/util/SimpleTimer.java @@ -77,7 +77,7 @@ public class SimpleTimer { totalEvents = _events.size(); _events.notifyAll(); } - if (time.longValue() > eventTime + 5) { + if (time.longValue() > eventTime + 100) { if (_log.shouldLog(Log.ERROR)) _log.error("Lots of timer congestion, had to push " + event + " back " + (time.longValue()-eventTime) + "ms (# events: " + totalEvents + ")"); diff --git a/history.txt b/history.txt index 8e6d5fa9a1..dc29f7bdf0 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,16 @@ -$Id: history.txt,v 1.156 2005/02/24 13:05:26 jrandom Exp $ +$Id: history.txt,v 1.157 2005/02/24 18:53:35 jrandom Exp $ + +2005-02-26 jrandom + * Further streaming lib caching improvements + * Reduce the minimum RTT (used to calculate retry timeouts), but also + increase the RTT on resends. + * Lower the default message size to 4KB from 16KB to further reduce the + chance of failed fragmentation. + * Extend tunnel rebuild throttling to include fallback rebuilds + * If there are less than 20 routers known, don't drop the last 20 (to help + avoid dropping all peers under catastrophic failures) + * New stats for end to end messages - "client.leaseSetFoundLocally", + "client.leaseSetFoundRemoteTime", and "client.leaseSetFailedRemoteTime" 2005-02-24 jrandom * Throttle the number of tunnel rebuilds per minute, preventing CPU diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 9637e9494d..db05918d3e 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.151 $ $Date: 2005/02/24 13:05:26 $"; + public final static String ID = "$Revision: 1.152 $ $Date: 2005/02/24 18:53:36 $"; public final static String VERSION = "0.5.0.1"; - public final static long BUILD = 3; + public final static long BUILD = 4; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java index f70577cd09..f4855ef1c5 100644 --- a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java +++ b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java @@ -62,6 +62,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { private long _cloveId; private long _start; private boolean _finished; + private long _leaseSetLookupBegin; /** * final timeout (in milliseconds) that the outbound message will fail in. @@ -110,13 +111,16 @@ public class OutboundClientMessageOneShotJob extends JobImpl { ctx.statManager().createRateStat("client.timeoutCongestionTunnel", "How lagged our tunnels are when a send times out?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); ctx.statManager().createRateStat("client.timeoutCongestionMessage", "How fast we process messages locally when a send times out?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); ctx.statManager().createRateStat("client.timeoutCongestionInbound", "How much faster we are receiving data than our average bps when a send times out?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); - + ctx.statManager().createRateStat("client.leaseSetFoundLocally", "How often we tried to look for a leaseSet and found it locally?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); + ctx.statManager().createRateStat("client.leaseSetFoundRemoteTime", "How long we tried to look fora remote leaseSet (when we succeeded)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); + ctx.statManager().createRateStat("client.leaseSetFailedRemoteTime", "How long we tried to look for a remote leaseSet (when we failed)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); long timeoutMs = OVERALL_TIMEOUT_MS_DEFAULT; _clientMessage = msg; _clientMessageId = msg.getMessageId(); _clientMessageSize = msg.getPayload().getSize(); _from = msg.getFromDestination(); _to = msg.getDestination(); + _leaseSetLookupBegin = -1; String param = msg.getSenderConfig().getOptions().getProperty(OVERALL_TIMEOUT_MS_PARAM); if (param == null) @@ -154,9 +158,15 @@ public class OutboundClientMessageOneShotJob extends JobImpl { LookupLeaseSetFailedJob failed = new LookupLeaseSetFailedJob(getContext()); if (_log.shouldLog(Log.DEBUG)) _log.debug(getJobId() + ": Send outbound client message - sending off leaseSet lookup job"); - getContext().netDb().lookupLeaseSet(key, success, failed, timeoutMs); - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": after sending off leaseSet lookup job"); + LeaseSet ls = getContext().netDb().lookupLeaseSetLocally(key); + if (ls != null) { + getContext().statManager().addRateData("client.leaseSetFoundLocally", 1, 0); + _leaseSetLookupBegin = -1; + success.runJob(); + } else { + _leaseSetLookupBegin = getContext().clock().now(); + getContext().netDb().lookupLeaseSet(key, success, failed, timeoutMs); + } } private boolean getShouldBundle() { @@ -189,6 +199,10 @@ public class OutboundClientMessageOneShotJob extends JobImpl { } public String getName() { return "Send outbound client message through the lease"; } public void runJob() { + if (_leaseSetLookupBegin > 0) { + long lookupTime = getContext().clock().now() - _leaseSetLookupBegin; + getContext().statManager().addRateData("client.leaseSetFoundRemoteTime", lookupTime, lookupTime); + } boolean ok = getNextLease(); if (ok) send(); @@ -262,7 +276,12 @@ public class OutboundClientMessageOneShotJob extends JobImpl { super(enclosingContext); } public String getName() { return "Lookup for outbound client message failed"; } - public void runJob() { + public void runJob() { + if (_leaseSetLookupBegin > 0) { + long lookupTime = getContext().clock().now() - _leaseSetLookupBegin; + getContext().statManager().addRateData("client.leaseSetFailedRemoteTime", lookupTime, lookupTime); + } + dieFatal(); } } diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java index 10ee2c5242..8b5504a4b8 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java @@ -115,8 +115,8 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { public final static String PROP_DB_DIR = "router.networkDatabase.dbDir"; public final static String DEFAULT_DB_DIR = "netDb"; - /** if we have less than 5 routers left, don't drop any more, even if they're failing or doing bad shit */ - private final static int MIN_REMAINING_ROUTERS = 5; + /** if we have less than 20 routers left, don't drop any more, even if they're failing or doing bad shit */ + private final static int MIN_REMAINING_ROUTERS = 20; /** * dont accept any dbDtore of a router over 6 hours old (unless we dont diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelBuilder.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelBuilder.java index 009253e240..abc225e584 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelBuilder.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelBuilder.java @@ -37,9 +37,6 @@ public class TunnelBuilder { PooledTunnelCreatorConfig cfg = configTunnel(ctx, pool, zeroHop); if (cfg == null) { - RetryJob j = new RetryJob(ctx, pool); - j.getTiming().setStartAfter(ctx.clock().now() + ctx.random().nextInt(30*1000)); - ctx.jobQueue().addJob(j); return; } OnCreatedJob onCreated = new OnCreatedJob(ctx, pool, cfg); diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java index 5d7bfabfd4..aaedc38d27 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java @@ -338,8 +338,8 @@ public class TunnelPool { _log.info(toString() + ": building a fallback tunnel (usable: " + usable + " needed: " + quantity + ")"); if ( (usable == 0) && (_settings.getAllowZeroHop()) ) _builder.buildTunnel(_context, this, true); - else - _builder.buildTunnel(_context, this); + //else + // _builder.buildTunnel(_context, this); refreshBuilders(); } @@ -433,7 +433,7 @@ public class TunnelPool { if (!_alive) return; int added = refreshBuilders(); if ( (added > 0) && (_log.shouldLog(Log.WARN)) ) - _log.warn("Passive rebuilding a tunnel"); + _log.warn("Passive rebuilding a tunnel for " + TunnelPool.this.toString()); requeue(60*1000); } }