diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index cbf9fd0bb1..b74bdb538a 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -152,7 +152,8 @@ public class Connection { if (!_connected) return false; started = true; - if ( (_outboundPackets.size() >= _options.getWindowSize()) || (_activeResends > 0) ) { + if ( (_outboundPackets.size() >= _options.getWindowSize()) || (_activeResends > 0) || + (_lastSendId - _highestAckedThrough > _options.getWindowSize()) ) { if (writeExpire > 0) { if (timeLeft <= 0) { _log.error("Outbound window is full of " + _outboundPackets.size() diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index 95ebb291ba..fd5beab814 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -167,11 +167,14 @@ public class ConnectionPacketHandler { // non-ack message payloads are queued in the MessageInputStream packet.releasePayload(); } + + //if (choke) + // con.fastRetransmit(); } private boolean ack(Connection con, long ackThrough, long nacks[], Packet packet, boolean isNew, boolean choke) { - if ( (nacks != null) && (nacks.length > 0) ) - con.getOptions().setRTT(con.getOptions().getRTT() + nacks.length*1000); + //if ( (nacks != null) && (nacks.length > 0) ) + // con.getOptions().setRTT(con.getOptions().getRTT() + nacks.length*1000); int numResends = 0; List acked = con.ackPackets(ackThrough, nacks); diff --git a/core/java/src/net/i2p/I2PAppContext.java b/core/java/src/net/i2p/I2PAppContext.java index c25adcc02c..7e0d48e3fd 100644 --- a/core/java/src/net/i2p/I2PAppContext.java +++ b/core/java/src/net/i2p/I2PAppContext.java @@ -22,6 +22,7 @@ import net.i2p.stat.StatManager; import net.i2p.util.Clock; import net.i2p.util.LogManager; import net.i2p.util.RandomSource; +import net.i2p.util.PooledRandomSource; /** *
Provide a base scope for accessing singletons that I2P exposes. Rather than @@ -432,7 +433,7 @@ public class I2PAppContext { private void initializeRandom() { synchronized (this) { if (_random == null) - _random = new RandomSource(this); + _random = new PooledRandomSource(this); _randomInitialized = true; } } diff --git a/core/java/src/net/i2p/util/DecayingBloomFilter.java b/core/java/src/net/i2p/util/DecayingBloomFilter.java index c02b39f3a1..4b00edfc8b 100644 --- a/core/java/src/net/i2p/util/DecayingBloomFilter.java +++ b/core/java/src/net/i2p/util/DecayingBloomFilter.java @@ -108,7 +108,29 @@ public class DecayingBloomFilter { } } + /** + * return true if the entry is already known. this does NOT add the + * entry however. + * + */ + public boolean isKnown(long entry) { + synchronized (this) { + if (_entryBytes <= 7) + entry &= _longToEntryMask; + if (entry < 0) { + DataHelper.toLong(_longToEntry, 0, _entryBytes, 0-entry); + _longToEntry[0] |= (1 << 7); + } else { + DataHelper.toLong(_longToEntry, 0, _entryBytes, entry); + } + return locked_add(_longToEntry, false); + } + } + private boolean locked_add(byte entry[]) { + return locked_add(entry, true); + } + private boolean locked_add(byte entry[], boolean addIfNew) { if (_extended != null) { // extend the entry to 32 bytes System.arraycopy(entry, 0, _extended, 0, entry.length); @@ -121,8 +143,10 @@ public class DecayingBloomFilter { _currentDuplicates++; return true; } else { - _current.insert(_extended); - _previous.insert(_extended); + if (addIfNew) { + _current.insert(_extended); + _previous.insert(_extended); + } return false; } } else { @@ -132,8 +156,10 @@ public class DecayingBloomFilter { _currentDuplicates++; return true; } else { - _current.locked_insert(entry); - _previous.locked_insert(entry); + if (addIfNew) { + _current.locked_insert(entry); + _previous.locked_insert(entry); + } return false; } } diff --git a/core/java/src/net/i2p/util/LogManager.java b/core/java/src/net/i2p/util/LogManager.java index 4b690afa4c..6d60b95183 100644 --- a/core/java/src/net/i2p/util/LogManager.java +++ b/core/java/src/net/i2p/util/LogManager.java @@ -140,8 +140,8 @@ public class LogManager { public Log getLog(String name) { return getLog(null, name); } public Log getLog(Class cls, String name) { Log rv = null; + String scope = Log.getScope(name, cls); synchronized (_logs) { - String scope = Log.getScope(name, cls); rv = (Log)_logs.get(scope); if (rv == null) { rv = new Log(this, cls, name); @@ -154,10 +154,7 @@ public class LogManager { public List getLogs() { List rv = null; synchronized (_logs) { - rv = new ArrayList(_logs.size()); - for (Iterator iter = _logs.values().iterator(); iter.hasNext(); ) { - rv.add(iter.next()); - } + rv = new ArrayList(_logs.values()); } return rv; } diff --git a/core/java/src/net/i2p/util/PooledRandomSource.java b/core/java/src/net/i2p/util/PooledRandomSource.java new file mode 100644 index 0000000000..21cf4f5145 --- /dev/null +++ b/core/java/src/net/i2p/util/PooledRandomSource.java @@ -0,0 +1,141 @@ +package net.i2p.util; + +/* + * free (adj.): unencumbered; not under the control of others + * Written by jrandom in 2005 and released into the public domain + * with no warranty of any kind, either expressed or implied. + * It probably won't make your computer catch on fire, or eat + * your children, but it might. Use at your own risk. + * + */ + +import net.i2p.I2PAppContext; +import net.i2p.crypto.EntropyHarvester; + +/** + * Maintain a set of PRNGs to feed the apps + */ +public class PooledRandomSource extends RandomSource { + private Log _log; + private RandomSource _pool[]; + private volatile int _nextPool; + + private static final int POOL_SIZE = 16; + + public PooledRandomSource(I2PAppContext context) { + super(context); + _log = context.logManager().getLog(PooledRandomSource.class); + _pool = new RandomSource[POOL_SIZE]; + for (int i = 0; i < POOL_SIZE; i++) { + _pool[i] = new RandomSource(context); + _pool[i].nextBoolean(); + } + _nextPool = 0; + } + + private final RandomSource pickPRNG() { + return _pool[(_nextPool++) % POOL_SIZE]; + } + + /** + * According to the java docs (http://java.sun.com/j2se/1.4.1/docs/api/java/util/Random.html#nextInt(int)) + * nextInt(n) should return a number between 0 and n (including 0 and excluding n). However, their pseudocode, + * as well as sun's, kaffe's, and classpath's implementation INCLUDES NEGATIVE VALUES. + * WTF. Ok, so we're going to have it return between 0 and n (including 0, excluding n), since + * thats what it has been used for. + * + */ + public int nextInt(int n) { + RandomSource prng = pickPRNG(); + synchronized (prng) { + return prng.nextInt(n); + } + } + + /** + * Like the modified nextInt, nextLong(n) returns a random number from 0 through n, + * including 0, excluding n. + */ + public long nextLong(long n) { + RandomSource prng = pickPRNG(); + synchronized (prng) { + return prng.nextLong(n); + } + } + + /** + * override as synchronized, for those JVMs that don't always pull via + * nextBytes (cough ibm) + */ + public boolean nextBoolean() { + RandomSource prng = pickPRNG(); + synchronized (prng) { + return prng.nextBoolean(); + } + } + /** + * override as synchronized, for those JVMs that don't always pull via + * nextBytes (cough ibm) + */ + public void nextBytes(byte buf[]) { + RandomSource prng = pickPRNG(); + synchronized (prng) { + prng.nextBytes(buf); + } + } + /** + * override as synchronized, for those JVMs that don't always pull via + * nextBytes (cough ibm) + */ + public double nextDouble() { + RandomSource prng = pickPRNG(); + synchronized (prng) { + return prng.nextDouble(); + } + } + /** + * override as synchronized, for those JVMs that don't always pull via + * nextBytes (cough ibm) + */ + public float nextFloat() { + RandomSource prng = pickPRNG(); + synchronized (prng) { + return prng.nextFloat(); + } + } + /** + * override as synchronized, for those JVMs that don't always pull via + * nextBytes (cough ibm) + */ + public double nextGaussian() { + RandomSource prng = pickPRNG(); + synchronized (prng) { + return prng.nextGaussian(); + } + } + /** + * override as synchronized, for those JVMs that don't always pull via + * nextBytes (cough ibm) + */ + public int nextInt() { + RandomSource prng = pickPRNG(); + synchronized (prng) { + return prng.nextInt(); + } + } + /** + * override as synchronized, for those JVMs that don't always pull via + * nextBytes (cough ibm) + */ + public long nextLong() { + RandomSource prng = pickPRNG(); + synchronized (prng) { + return prng.nextLong(); + } + } + + public EntropyHarvester harvester() { + RandomSource prng = pickPRNG(); + return prng.harvester(); + } +} \ No newline at end of file diff --git a/core/java/src/net/i2p/util/RandomSource.java b/core/java/src/net/i2p/util/RandomSource.java index 414f908394..4e22e8dce3 100644 --- a/core/java/src/net/i2p/util/RandomSource.java +++ b/core/java/src/net/i2p/util/RandomSource.java @@ -42,7 +42,7 @@ public class RandomSource extends SecureRandom { * thats what it has been used for. * */ - public synchronized int nextInt(int n) { + public int nextInt(int n) { if (n == 0) return 0; int val = super.nextInt(n); if (val < 0) val = 0 - val; @@ -54,7 +54,7 @@ public class RandomSource extends SecureRandom { * Like the modified nextInt, nextLong(n) returns a random number from 0 through n, * including 0, excluding n. */ - public synchronized long nextLong(long n) { + public long nextLong(long n) { long v = super.nextLong(); if (v < 0) v = 0 - v; if (v >= n) v = v % n; @@ -65,37 +65,37 @@ public class RandomSource extends SecureRandom { * override as synchronized, for those JVMs that don't always pull via * nextBytes (cough ibm) */ - public synchronized boolean nextBoolean() { return super.nextBoolean(); } + public boolean nextBoolean() { return super.nextBoolean(); } /** * override as synchronized, for those JVMs that don't always pull via * nextBytes (cough ibm) */ - public synchronized void nextBytes(byte buf[]) { super.nextBytes(buf); } + public void nextBytes(byte buf[]) { super.nextBytes(buf); } /** * override as synchronized, for those JVMs that don't always pull via * nextBytes (cough ibm) */ - public synchronized double nextDouble() { return super.nextDouble(); } + public double nextDouble() { return super.nextDouble(); } /** * override as synchronized, for those JVMs that don't always pull via * nextBytes (cough ibm) */ - public synchronized float nextFloat() { return super.nextFloat(); } + public float nextFloat() { return super.nextFloat(); } /** * override as synchronized, for those JVMs that don't always pull via * nextBytes (cough ibm) */ - public synchronized double nextGaussian() { return super.nextGaussian(); } + public double nextGaussian() { return super.nextGaussian(); } /** * override as synchronized, for those JVMs that don't always pull via * nextBytes (cough ibm) */ - public synchronized int nextInt() { return super.nextInt(); } + public int nextInt() { return super.nextInt(); } /** * override as synchronized, for those JVMs that don't always pull via * nextBytes (cough ibm) */ - public synchronized long nextLong() { return super.nextLong(); } + public long nextLong() { return super.nextLong(); } public EntropyHarvester harvester() { return _entropyHarvester; } diff --git a/history.txt b/history.txt index bace186e34..56a52be88b 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,15 @@ -$Id: history.txt,v 1.196 2005/04/20 14:15:28 jrandom Exp $ +$Id: history.txt,v 1.197 2005/04/20 15:14:17 jrandom Exp $ + +2005-04-24 jrandom + * Added a pool of PRNGs using a different synchronization technique, + hopefully sufficient to work around IBM's PRNG bugs until we get our + own Fortuna. + * In the streaming lib, don't jack up the RTT on NACK, and have the window + size bound the not-yet-ready messages to the peer, not the unacked + message count (not sure yet whether this is worthwile). + * Many additions to the messageHistory log. + * Handle out of order tunnel fragment delivery (not an issue on the live + net with TCP, but critical with UDP). * 2005-04-20 0.5.0.7 released diff --git a/router/java/src/net/i2p/router/InNetMessagePool.java b/router/java/src/net/i2p/router/InNetMessagePool.java index e1516860f2..c08ae0269a 100644 --- a/router/java/src/net/i2p/router/InNetMessagePool.java +++ b/router/java/src/net/i2p/router/InNetMessagePool.java @@ -150,18 +150,20 @@ public class InNetMessagePool implements Service { shortCircuitTunnelData(messageBody, fromRouterHash); allowMatches = false; } else { - HandlerJobBuilder builder = _handlerJobBuilders[type]; + if ( (type > 0) && (type < _handlerJobBuilders.length) ) { + HandlerJobBuilder builder = _handlerJobBuilders[type]; - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Add message to the inNetMessage pool - builder: " + builder - + " message class: " + messageBody.getClass().getName()); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Add message to the inNetMessage pool - builder: " + builder + + " message class: " + messageBody.getClass().getName()); - if (builder != null) { - Job job = builder.createJob(messageBody, fromRouter, - fromRouterHash); - if (job != null) { - _context.jobQueue().addJob(job); - jobFound = true; + if (builder != null) { + Job job = builder.createJob(messageBody, fromRouter, + fromRouterHash); + if (job != null) { + _context.jobQueue().addJob(job); + jobFound = true; + } } } } diff --git a/router/java/src/net/i2p/router/MessageHistory.java b/router/java/src/net/i2p/router/MessageHistory.java index 0e087b35dc..8570bcdc29 100644 --- a/router/java/src/net/i2p/router/MessageHistory.java +++ b/router/java/src/net/i2p/router/MessageHistory.java @@ -12,6 +12,7 @@ import java.util.TimeZone; import net.i2p.data.Hash; import net.i2p.data.TunnelId; import net.i2p.data.i2np.I2NPMessage; +import net.i2p.router.tunnel.HopConfig; import net.i2p.util.Log; /** @@ -178,10 +179,33 @@ public class MessageHistory { if (tunnel == null) return; StringBuffer buf = new StringBuffer(128); buf.append(getPrefix()); - buf.append("joining tunnel [").append(tunnel.getReceiveTunnelId(0).getTunnelId()).append("] as [").append(state).append("] "); + buf.append("joining as [").append(state); + buf.append("] to tunnel: ").append(tunnel.toString()); addEntry(buf.toString()); } + /** + * The local router has joined the given tunnel operating in the given state. + * + * @param state {"free inbound", "allocated inbound", "inactive inbound", "outbound", "participant", "pending"} + * @param tunnel tunnel joined + */ + public void tunnelJoined(String state, HopConfig tunnel) { + if (!_doLog) return; + if (tunnel == null) return; + StringBuffer buf = new StringBuffer(128); + buf.append(getPrefix()); + buf.append("joining as [").append(state); + buf.append("] to tunnel: ").append(tunnel.toString()); + addEntry(buf.toString()); + } + + public void tunnelDispatched(String info) { + if (!_doLog) return; + if (info == null) return; + addEntry(getPrefix() + "tunnel dispatched: " + info); + } + /** * The local router has detected a failure in the given tunnel * @@ -352,9 +376,6 @@ public class MessageHistory { buf.append("from [").append(getName(from)).append("] "); buf.append("expiring on [").append(getTime(expiration)).append("] valid? ").append(isValid); addEntry(buf.toString()); - if (messageType.equals("net.i2p.data.i2np.TunnelMessage")) { - //_log.warn("ReceiveMessage tunnel message ["+messageId+"]", new Exception("Receive tunnel")); - } } public void receiveMessage(String messageType, long messageId, long expiration, boolean isValid) { receiveMessage(messageType, messageId, expiration, null, isValid); @@ -404,12 +425,13 @@ public class MessageHistory { addEntry(buf.toString()); } - public void receiveTunnelFragment(long messageId, int fragmentId) { + public void receiveTunnelFragment(long messageId, int fragmentId, String status) { if (!_doLog) return; if (messageId == -1) throw new IllegalArgumentException("why are you -1?"); StringBuffer buf = new StringBuffer(48); buf.append(getPrefix()); buf.append("Receive fragment ").append(fragmentId).append(" in ").append(messageId); + buf.append(" status: ").append(status); addEntry(buf.toString()); } public void receiveTunnelFragmentComplete(long messageId) { @@ -420,12 +442,13 @@ public class MessageHistory { buf.append("Receive fragmented message completely: ").append(messageId); addEntry(buf.toString()); } - public void droppedFragmentedMessage(long messageId) { + public void droppedFragmentedMessage(long messageId, String status) { if (!_doLog) return; if (messageId == -1) throw new IllegalArgumentException("why are you -1?"); StringBuffer buf = new StringBuffer(48); buf.append(getPrefix()); buf.append("Fragmented message dropped: ").append(messageId); + buf.append(" ").append(status); addEntry(buf.toString()); } public void fragmentMessage(long messageId, int numFragments) { @@ -436,6 +459,15 @@ public class MessageHistory { buf.append("Break message ").append(messageId).append(" into fragments: ").append(numFragments); addEntry(buf.toString()); } + public void fragmentMessage(long messageId, int numFragments, String tunnel) { + if (!_doLog) return; + if (messageId == -1) throw new IllegalArgumentException("why are you -1?"); + StringBuffer buf = new StringBuffer(48); + buf.append(getPrefix()); + buf.append("Break message ").append(messageId).append(" into fragments: ").append(numFragments); + buf.append(" on ").append(tunnel); + addEntry(buf.toString()); + } public void droppedTunnelDataMessageUnknown(long msgId, long tunnelId) { if (!_doLog) return; if (msgId == -1) throw new IllegalArgumentException("why are you -1?"); diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index bd50e1adb8..bdf023fde5 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.188 $ $Date: 2005/04/20 14:15:25 $"; + public final static String ID = "$Revision: 1.189 $ $Date: 2005/04/20 15:14:19 $"; public final static String VERSION = "0.5.0.7"; - public final static long BUILD = 0; + public final static long BUILD = 1; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/transport/udp/ACKSender.java b/router/java/src/net/i2p/router/transport/udp/ACKSender.java index 407bd07bba..fd7b380a88 100644 --- a/router/java/src/net/i2p/router/transport/udp/ACKSender.java +++ b/router/java/src/net/i2p/router/transport/udp/ACKSender.java @@ -24,6 +24,7 @@ public class ACKSender implements Runnable { _fragments = fragments; _transport = transport; _builder = new PacketBuilder(_context); + _context.statManager().createRateStat("udp.sendACKCount", "how many ack messages were sent to a peer", "udp", new long[] { 60*1000, 60*60*1000 }); } public void run() { @@ -32,6 +33,8 @@ public class ACKSender implements Runnable { if (peer != null) { List acks = peer.retrieveACKs(); if ( (acks != null) && (acks.size() > 0) ) { + _context.statManager().addRateData("udp.sendACKCount", acks.size(), 0); + _context.statManager().getStatLog().addData(peer.getRemoteHostString(), "udp.peer.sendACKCount", acks.size(), 0); UDPPacket ack = _builder.buildACK(peer, acks); if (_log.shouldLog(Log.INFO)) _log.info("Sending ACK for " + acks); diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java index 76d0cd39bd..72d6349eb4 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java @@ -6,6 +6,7 @@ import java.util.List; import java.util.Map; import net.i2p.router.RouterContext; +import net.i2p.util.DecayingBloomFilter; import net.i2p.util.I2PThread; import net.i2p.util.Log; @@ -28,8 +29,8 @@ public class InboundMessageFragments { private List _unsentACKs; /** list of messages (InboundMessageState) fully received but not interpreted yet */ private List _completeMessages; - /** list of message IDs (Long) recently received, so we can ignore in flight dups */ - private List _recentlyCompletedMessages; + /** list of message IDs recently received, so we can ignore in flight dups */ + private DecayingBloomFilter _recentlyCompletedMessages; private OutboundMessageFragments _outbound; private UDPTransport _transport; /** this can be broken down further, but to start, OneBigLock does the trick */ @@ -39,6 +40,8 @@ public class InboundMessageFragments { private static final int RECENTLY_COMPLETED_SIZE = 100; /** how frequently do we want to send ACKs to a peer? */ private static final int ACK_FREQUENCY = 200; + /** decay the recently completed every 2 minutes */ + private static final int DECAY_PERIOD = 120*1000; public InboundMessageFragments(RouterContext ctx, OutboundMessageFragments outbound, UDPTransport transport) { _context = ctx; @@ -46,7 +49,6 @@ public class InboundMessageFragments { _inboundMessages = new HashMap(64); _unsentACKs = new ArrayList(64); _completeMessages = new ArrayList(64); - _recentlyCompletedMessages = new ArrayList(RECENTLY_COMPLETED_SIZE); _outbound = outbound; _transport = transport; _context.statManager().createRateStat("udp.receivedCompleteTime", "How long it takes to receive a full message", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); @@ -60,6 +62,11 @@ public class InboundMessageFragments { public void startup() { _alive = true; + // may want to extend the DecayingBloomFilter so we can use a smaller + // array size (currently its tuned for 10 minute rates for the + // messageValidator) + _recentlyCompletedMessages = new DecayingBloomFilter(_context, DECAY_PERIOD, 8); + I2PThread t = new I2PThread(new ACKSender(_context, this, _transport), "UDP ACK sender"); t.setDaemon(true); t.start(); @@ -70,6 +77,9 @@ public class InboundMessageFragments { } public void shutdown() { _alive = false; + if (_recentlyCompletedMessages != null) + _recentlyCompletedMessages.stopDecaying(); + _recentlyCompletedMessages = null; synchronized (_stateLock) { _completeMessages.clear(); _unsentACKs.clear(); @@ -112,8 +122,15 @@ public class InboundMessageFragments { for (int i = 0; i < fragments; i++) { Long messageId = new Long(data.readMessageId(i)); - if (_recentlyCompletedMessages.contains(messageId)) { + if (_recentlyCompletedMessages.isKnown(messageId.longValue())) { _context.statManager().addRateData("udp.ignoreRecentDuplicate", 1, 0); + from.messageFullyReceived(messageId); + if (!_unsentACKs.contains(from)) + _unsentACKs.add(from); + if (_log.shouldLog(Log.WARN)) + _log.warn("Message received is a dup: " + messageId + " dups: " + + _recentlyCompletedMessages.getCurrentDuplicateCount() + " out of " + + _recentlyCompletedMessages.getInsertedCount()); continue; } @@ -132,9 +149,7 @@ public class InboundMessageFragments { messageComplete = true; messages.remove(messageId); - while (_recentlyCompletedMessages.size() >= RECENTLY_COMPLETED_SIZE) - _recentlyCompletedMessages.remove(0); - _recentlyCompletedMessages.add(messageId); + _recentlyCompletedMessages.add(messageId.longValue()); _completeMessages.add(state); @@ -169,12 +184,15 @@ public class InboundMessageFragments { long acks[] = data.readACKs(); if (acks != null) { _context.statManager().addRateData("udp.receivedACKs", acks.length, 0); + _context.statManager().getStatLog().addData(from.getRemoteHostString(), "udp.peer.receiveACKCount", acks.length, 0); + for (int i = 0; i < acks.length; i++) { if (_log.shouldLog(Log.INFO)) _log.info("Full ACK of message " + acks[i] + " received!"); fragments += _outbound.acked(acks[i], from.getRemotePeer()); } - from.messageACKed(fragments * from.getMTU()); // estimated size + } else { + _log.error("Received ACKs with no acks?! " + data); } } if (data.readECN()) @@ -212,7 +230,8 @@ public class InboundMessageFragments { synchronized (_stateLock) { for (int i = 0; i < _unsentACKs.size(); i++) { PeerState peer = (PeerState)_unsentACKs.get(i); - if (peer.getLastACKSend() + ACK_FREQUENCY <= now) { + if ( (peer.getLastACKSend() + ACK_FREQUENCY <= now) || + (peer.unsentACKThresholdReached()) ) { _unsentACKs.remove(i); peer.setLastACKSend(now); return peer; diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java index fcc43492c1..32619896a9 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java @@ -31,6 +31,8 @@ public class OutboundMessageFragments { /** which message should we build the next packet out of? */ private int _nextPacketMessage; private PacketBuilder _builder; + /** if we can handle more messages explicitly, set this to true */ + private boolean _allowExcess; private static final int MAX_ACTIVE = 64; // don't send a packet more than 10 times @@ -44,12 +46,14 @@ public class OutboundMessageFragments { _nextPacketMessage = 0; _builder = new PacketBuilder(ctx); _alive = true; + _allowExcess = false; _context.statManager().createRateStat("udp.sendVolleyTime", "Long it takes to send a full volley", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.sendConfirmTime", "How long it takes to send a message and get the ACK", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.sendConfirmFragments", "How many fragments are included in a fully ACKed message", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.sendConfirmVolley", "How many times did fragments need to be sent before ACK", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.sendFailed", "How many fragments were in a message that couldn't be delivered", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.sendAggressiveFailed", "How many volleys was a packet sent before we gave up", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("udp.outboundActiveCount", "How many messages are in the active pool when a new one is added", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); } public void startup() { _alive = true; } @@ -75,7 +79,9 @@ public class OutboundMessageFragments { return false; else if (_activeMessages.size() < MAX_ACTIVE) return true; - else + else if (_allowExcess) + return true; + else _activeMessages.wait(); } } catch (InterruptedException ie) {} @@ -90,12 +96,16 @@ public class OutboundMessageFragments { public void add(OutNetMessage msg) { OutboundMessageState state = new OutboundMessageState(_context); boolean ok = state.initialize(msg); + state.setPeer(_transport.getPeerState(msg.getTarget().getIdentity().calculateHash())); finishMessages(); + int active = 0; synchronized (_activeMessages) { if (ok) _activeMessages.add(state); + active = _activeMessages.size(); _activeMessages.notifyAll(); } + _context.statManager().addRateData("udp.outboundActiveCount", active, 0); } /** @@ -156,6 +166,9 @@ public class OutboundMessageFragments { } } + private static final long SECOND_MASK = 1023l; + + /** * Grab the next packet that we want to send, blocking until one is ready. * This is the main driver for the packet scheduler @@ -208,17 +221,31 @@ public class OutboundMessageFragments { + " remaining" + " for message " + state.getMessageId() + ": " + state); + if (state.justBeganVolley() && (state.getPushCount() > 0) && (state.getFragmentCount() > 1)) { + peer.messageRetransmitted(); + if (_log.shouldLog(Log.ERROR)) + _log.error("Retransmitting " + state + " to " + peer); + } + // for fairness, we move on in a round robin _nextPacketMessage = i + 1; - if (state.getPushCount() != oldVolley) { + if (currentFragment >= state.getFragmentCount() - 1) { + // this is the last fragment _context.statManager().addRateData("udp.sendVolleyTime", state.getLifetime(), state.getFragmentCount()); - state.setNextSendTime(now + (1000-(now%1000)) + _context.random().nextInt(4000)); + if (state.getPeer() != null) { + int rto = state.getPeer().getRTO() * state.getPushCount(); + //_log.error("changed volley, rto=" + rto + " volley="+ state.getPushCount()); + state.setNextSendTime(now + rto); + } else { + _log.error("changed volley, unknown peer"); + state.setNextSendTime(now + 1000 + _context.random().nextInt(2000)); + } } else { if (peer.getSendWindowBytesRemaining() > 0) state.setNextSendTime(now); else - state.setNextSendTime(now + (1000-(now%1000))); + state.setNextSendTime((now + 1024) & ~SECOND_MASK); } break; } else { @@ -226,7 +253,7 @@ public class OutboundMessageFragments { _log.warn("Allocation of " + fragmentSize + " rejected w/ wsize=" + peer.getSendWindowBytes() + " available=" + peer.getSendWindowBytesRemaining() + " for message " + state.getMessageId() + ": " + state); - state.setNextSendTime(now + (1000-(now%1000))); + state.setNextSendTime((now + 1024) & ~SECOND_MASK); currentFragment = -1; } } @@ -234,7 +261,7 @@ public class OutboundMessageFragments { long time = state.getNextSendTime(); if ( (nextSend < 0) || (time < nextSend) ) nextSend = time; - } + } // end of the for(activeMessages) if (currentFragment < 0) { if (nextSend <= 0) { @@ -248,13 +275,16 @@ public class OutboundMessageFragments { delay = 10; if (delay > 1000) delay = 1000; + _allowExcess = true; + _activeMessages.notifyAll(); try { _activeMessages.wait(delay); } catch (InterruptedException ie) {} } } - } - } + _allowExcess = false; + } // end of the synchronized block + } // end of the while (alive && !found) if (currentFragment >= 0) { if (_log.shouldLog(Log.INFO)) @@ -269,8 +299,8 @@ public class OutboundMessageFragments { } private static final int SSU_HEADER_SIZE = 46; - private static final int UDP_HEADER_SIZE = 8; - private static final int IP_HEADER_SIZE = 20; + static final int UDP_HEADER_SIZE = 8; + static final int IP_HEADER_SIZE = 20; /** how much payload data can we shove in there? */ private static final int fragmentSize(int mtu) { return mtu - SSU_HEADER_SIZE - UDP_HEADER_SIZE - IP_HEADER_SIZE; @@ -309,17 +339,23 @@ public class OutboundMessageFragments { } if (state != null) { + int numSends = state.getMaxSends(); if (_log.shouldLog(Log.INFO)) _log.info("Received ack of " + messageId + " by " + ackedBy.toBase64() - + " after " + state.getLifetime()); + + " after " + state.getLifetime() + " and " + numSends + " sends"); _context.statManager().addRateData("udp.sendConfirmTime", state.getLifetime(), state.getLifetime()); _context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount(), state.getLifetime()); - int numSends = state.getMaxSends(); _context.statManager().addRateData("udp.sendConfirmVolley", numSends, state.getFragmentCount()); if ( (numSends > 1) && (state.getPeer() != null) ) state.getPeer().congestionOccurred(); _transport.succeeded(state.getMessage()); int numFragments = state.getFragmentCount(); + if (state.getPeer() != null) { + // this adjusts the rtt/rto/window/etc + state.getPeer().messageACKed(numFragments*state.getFragmentSize(), state.getLifetime(), state.getMaxSends()); + } else { + _log.warn("message acked, but no peer attacked: " + state); + } state.releaseResources(); return numFragments; } else { diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java index 5e3e92a135..1f73553ea6 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java @@ -31,6 +31,7 @@ public class OutboundMessageState { private long _nextSendTime; private int _pushCount; private short _maxSends; + private int _nextSendFragment; public static final int MAX_FRAGMENTS = 32; private static final ByteCache _cache = ByteCache.getInstance(64, MAX_FRAGMENTS*1024); @@ -40,6 +41,7 @@ public class OutboundMessageState { _log = _context.logManager().getLog(OutboundMessageState.class); _pushCount = 0; _maxSends = 0; + _nextSendFragment = 0; } public synchronized boolean initialize(OutNetMessage msg) { @@ -100,6 +102,7 @@ public class OutboundMessageState { public OutNetMessage getMessage() { return _message; } public long getMessageId() { return _messageId; } public PeerState getPeer() { return _peer; } + public void setPeer(PeerState peer) { _peer = peer; } public boolean isExpired() { return _expiration < _context.clock().now(); } @@ -160,6 +163,7 @@ public class OutboundMessageState { else return _fragmentSends.length; } + public int getFragmentSize() { return _fragmentSize; } /** should we continue sending this fragment? */ public boolean shouldSend(int fragmentNum) { return _fragmentSends[fragmentNum] >= (short)0; } public synchronized int fragmentSize(int fragmentNum) { @@ -178,6 +182,17 @@ public class OutboundMessageState { * @return fragment index, or -1 if all of the fragments were acked */ public int pickNextFragment() { + if (true) { + int rv = _nextSendFragment; + _fragmentSends[rv]++; + _maxSends = _fragmentSends[rv]; + _nextSendFragment++; + if (_nextSendFragment >= _fragmentSends.length) { + _nextSendFragment = 0; + _pushCount++; + } + return rv; + } short minValue = -1; int minIndex = -1; int startOffset = _context.random().nextInt(_fragmentSends.length); @@ -207,9 +222,9 @@ public class OutboundMessageState { break; } } - if (endOfVolley) + if (endOfVolley) { _pushCount++; - + } if (_log.shouldLog(Log.DEBUG)) { StringBuffer buf = new StringBuffer(64); @@ -223,6 +238,13 @@ public class OutboundMessageState { } return minIndex; } + + public boolean justBeganVolley() { + if (_fragmentSends.length == 1) + return true; + else + return _nextSendFragment == 1; + } /** * Write a part of the the message onto the specified buffer. diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index 3c6d4928d3..7c6b14d44a 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -113,6 +113,12 @@ public class PeerState { private int _mtu; /** when did we last check the MTU? */ private long _mtuLastChecked; + /** current round trip time estimate */ + private int _rtt; + /** smoothed mean deviation in the rtt */ + private int _rttDeviation; + /** current retransmission timeout */ + private int _rto; private long _messagesReceived; private long _messagesSent; @@ -120,7 +126,7 @@ public class PeerState { private static final int DEFAULT_SEND_WINDOW_BYTES = 16*1024; private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES; private static final int MAX_SEND_WINDOW_BYTES = 1024*1024; - private static final int DEFAULT_MTU = 1024; + private static final int DEFAULT_MTU = 1492; public PeerState(I2PAppContext ctx) { _context = ctx; @@ -153,6 +159,9 @@ public class PeerState { _mtu = DEFAULT_MTU; _mtuLastChecked = -1; _lastACKSend = -1; + _rtt = 1000; + _rttDeviation = _rtt; + _rto = 6000; _messagesReceived = 0; _messagesSent = 0; } @@ -328,6 +337,7 @@ public class PeerState { _sendWindowBytesRemaining = _sendWindowBytes; _lastSendRefill = now; } + //if (true) return true; if (size <= _sendWindowBytesRemaining) { _sendWindowBytesRemaining -= size; _lastSendTime = now; @@ -393,15 +403,22 @@ public class PeerState { /** pull off the ACKs (Long) to send to the peer */ public List retrieveACKs() { List rv = null; + int threshold = countMaxACKs(); synchronized (_currentACKs) { - rv = new ArrayList(_currentACKs); - _currentACKs.clear(); + if (_currentACKs.size() < threshold) { + rv = new ArrayList(_currentACKs); + _currentACKs.clear(); + } else { + rv = new ArrayList(threshold); + for (int i = 0; i < threshold; i++) + rv.add(_currentACKs.remove(0)); + } } return rv; } /** we sent a message which was ACKed containing the given # of bytes */ - public void messageACKed(int bytesACKed) { + public void messageACKed(int bytesACKed, long lifetime, int numSends) { _consecutiveSendingSecondsWithoutACKs = 0; if (_sendWindowBytes <= _slowStartThreshold) { _sendWindowBytes += bytesACKed; @@ -414,7 +431,35 @@ public class PeerState { _sendWindowBytes = MAX_SEND_WINDOW_BYTES; _lastReceiveTime = _context.clock().now(); _messagesSent++; + if (numSends <= 2) + recalculateTimeouts(lifetime); + else + _log.warn("acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed); } + + /** adjust the tcp-esque timeouts */ + private void recalculateTimeouts(long lifetime) { + _rttDeviation = _rttDeviation + (int)(0.25d*(Math.abs(lifetime-_rtt)-_rttDeviation)); + _rtt = (int)((float)_rtt*(0.9f) + (0.1f)*(float)lifetime); + _rto = _rtt + (_rttDeviation<<2); + if (_log.shouldLog(Log.WARN)) + _log.warn("Recalculating timeouts w/ lifetime=" + lifetime + ": rtt=" + _rtt + + " rttDev=" + _rttDeviation + " rto=" + _rto); + if (_rto < 1000) + _rto = 1000; + if (_rto > 5000) + _rto = 5000; + } + /** we are resending a packet, so lets jack up the rto */ + public void messageRetransmitted() { + //_rto *= 2; + } + /** how long does it usually take to get a message ACKed? */ + public int getRTT() { return _rtt; } + /** how soon should we retransmit an unacked packet? */ + public int getRTO() { return _rto; } + /** how skewed are the measured RTTs? */ + public long getRTTDeviation() { return _rttDeviation; } public long getMessagesSent() { return _messagesSent; } public long getMessagesReceived() { return _messagesReceived; } @@ -435,6 +480,25 @@ public class PeerState { /** when did we last send an ACK to the peer? */ public long getLastACKSend() { return _lastACKSend; } public void setLastACKSend(long when) { _lastACKSend = when; } + public boolean unsentACKThresholdReached() { + int threshold = countMaxACKs(); + synchronized (_currentACKs) { + return _currentACKs.size() >= threshold; + } + } + private int countMaxACKs() { + return (_mtu + - OutboundMessageFragments.IP_HEADER_SIZE + - OutboundMessageFragments.UDP_HEADER_SIZE + - UDPPacket.IV_SIZE + - UDPPacket.MAC_SIZE + - 1 // type flag + - 4 // timestamp + - 1 // data flag + - 1 // # ACKs + - 16 // padding safety + ) / 4; + } public String getRemoteHostString() { return _remoteHostString; } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPPacket.java b/router/java/src/net/i2p/router/transport/udp/UDPPacket.java index d7e1403ae3..4d340f19a8 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPPacket.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPPacket.java @@ -24,7 +24,7 @@ import net.i2p.util.Log; */ public class UDPPacket { private I2PAppContext _context; - private Log _log; + private static Log _log; private DatagramPacket _packet; private short _priority; private long _initializeTime; @@ -35,6 +35,7 @@ public class UDPPacket { private static final List _packetCache; static { _packetCache = new ArrayList(256); + _log = I2PAppContext.getGlobalContext().logManager().getLog(UDPPacket.class); } private static final boolean CACHE = false; @@ -67,7 +68,6 @@ public class UDPPacket { private UDPPacket(I2PAppContext ctx) { _context = ctx; - _log = ctx.logManager().getLog(UDPPacket.class); _dataBuf = _dataCache.acquire(); _data = _dataBuf.getData(); _packet = new DatagramPacket(_data, MAX_PACKET_SIZE); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index 4fb1f7743a..7b30df4808 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -86,7 +86,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority private static final int PRIORITY_WEIGHT[] = new int[] { 1, 1, 1, 1, 1, 2 }; /** should we flood all UDP peers with the configured rate? */ - private static final boolean SHOULD_FLOOD_PEERS = false; + private static final boolean SHOULD_FLOOD_PEERS = true; public UDPTransport(RouterContext ctx) { super(ctx); @@ -492,6 +492,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority buf.append("