From b2f0d17e94d14935df1dfc85d03f4f3eae66000e Mon Sep 17 00:00:00 2001 From: jrandom Date: Sun, 24 Apr 2005 18:42:02 +0000 Subject: [PATCH] 2005-04-24 jrandom * Added a pool of PRNGs using a different synchronization technique, hopefully sufficient to work around IBM's PRNG bugs until we get our own Fortuna. * In the streaming lib, don't jack up the RTT on NACK, and have the window size bound the not-yet-ready messages to the peer, not the unacked message count (not sure yet whether this is worthwile). * Many additions to the messageHistory log. * Handle out of order tunnel fragment delivery (not an issue on the live net with TCP, but critical with UDP). and for udp stuff: * implemented tcp-esque rto code in the udp transport * make sure we don't ACK too many messages at once * transmit fragments in a simple (nonrandom) order so that we can more easily adjust timeouts/etc. * let the active outbound pool grow dynamically if there are outbound slots to spare * use a simple decaying bloom filter at the UDP level to drop duplicate resent packets. --- .../net/i2p/client/streaming/Connection.java | 3 +- .../streaming/ConnectionPacketHandler.java | 7 +- core/java/src/net/i2p/I2PAppContext.java | 3 +- .../src/net/i2p/util/DecayingBloomFilter.java | 34 ++++- core/java/src/net/i2p/util/LogManager.java | 7 +- .../src/net/i2p/util/PooledRandomSource.java | 141 ++++++++++++++++++ core/java/src/net/i2p/util/RandomSource.java | 18 +-- history.txt | 13 +- .../src/net/i2p/router/InNetMessagePool.java | 22 +-- .../src/net/i2p/router/MessageHistory.java | 44 +++++- .../src/net/i2p/router/RouterVersion.java | 4 +- .../i2p/router/transport/udp/ACKSender.java | 3 + .../udp/InboundMessageFragments.java | 37 +++-- .../udp/OutboundMessageFragments.java | 60 ++++++-- .../transport/udp/OutboundMessageState.java | 26 +++- .../i2p/router/transport/udp/PeerState.java | 72 ++++++++- .../i2p/router/transport/udp/UDPPacket.java | 4 +- .../router/transport/udp/UDPTransport.java | 21 ++- .../router/tunnel/BatchedPreprocessor.java | 19 ++- .../tunnel/BatchedRouterPreprocessor.java | 13 +- .../i2p/router/tunnel/FragmentHandler.java | 10 +- .../i2p/router/tunnel/FragmentedMessage.java | 4 +- .../router/tunnel/RouterFragmentHandler.java | 17 ++- .../router/tunnel/TrivialPreprocessor.java | 6 +- .../i2p/router/tunnel/TunnelDispatcher.java | 26 +++- 25 files changed, 517 insertions(+), 97 deletions(-) create mode 100644 core/java/src/net/i2p/util/PooledRandomSource.java diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index cbf9fd0bb1..b74bdb538a 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -152,7 +152,8 @@ public class Connection { if (!_connected) return false; started = true; - if ( (_outboundPackets.size() >= _options.getWindowSize()) || (_activeResends > 0) ) { + if ( (_outboundPackets.size() >= _options.getWindowSize()) || (_activeResends > 0) || + (_lastSendId - _highestAckedThrough > _options.getWindowSize()) ) { if (writeExpire > 0) { if (timeLeft <= 0) { _log.error("Outbound window is full of " + _outboundPackets.size() diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index 95ebb291ba..fd5beab814 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -167,11 +167,14 @@ public class ConnectionPacketHandler { // non-ack message payloads are queued in the MessageInputStream packet.releasePayload(); } + + //if (choke) + // con.fastRetransmit(); } private boolean ack(Connection con, long ackThrough, long nacks[], Packet packet, boolean isNew, boolean choke) { - if ( (nacks != null) && (nacks.length > 0) ) - con.getOptions().setRTT(con.getOptions().getRTT() + nacks.length*1000); + //if ( (nacks != null) && (nacks.length > 0) ) + // con.getOptions().setRTT(con.getOptions().getRTT() + nacks.length*1000); int numResends = 0; List acked = con.ackPackets(ackThrough, nacks); diff --git a/core/java/src/net/i2p/I2PAppContext.java b/core/java/src/net/i2p/I2PAppContext.java index c25adcc02c..7e0d48e3fd 100644 --- a/core/java/src/net/i2p/I2PAppContext.java +++ b/core/java/src/net/i2p/I2PAppContext.java @@ -22,6 +22,7 @@ import net.i2p.stat.StatManager; import net.i2p.util.Clock; import net.i2p.util.LogManager; import net.i2p.util.RandomSource; +import net.i2p.util.PooledRandomSource; /** *

Provide a base scope for accessing singletons that I2P exposes. Rather than @@ -432,7 +433,7 @@ public class I2PAppContext { private void initializeRandom() { synchronized (this) { if (_random == null) - _random = new RandomSource(this); + _random = new PooledRandomSource(this); _randomInitialized = true; } } diff --git a/core/java/src/net/i2p/util/DecayingBloomFilter.java b/core/java/src/net/i2p/util/DecayingBloomFilter.java index c02b39f3a1..4b00edfc8b 100644 --- a/core/java/src/net/i2p/util/DecayingBloomFilter.java +++ b/core/java/src/net/i2p/util/DecayingBloomFilter.java @@ -108,7 +108,29 @@ public class DecayingBloomFilter { } } + /** + * return true if the entry is already known. this does NOT add the + * entry however. + * + */ + public boolean isKnown(long entry) { + synchronized (this) { + if (_entryBytes <= 7) + entry &= _longToEntryMask; + if (entry < 0) { + DataHelper.toLong(_longToEntry, 0, _entryBytes, 0-entry); + _longToEntry[0] |= (1 << 7); + } else { + DataHelper.toLong(_longToEntry, 0, _entryBytes, entry); + } + return locked_add(_longToEntry, false); + } + } + private boolean locked_add(byte entry[]) { + return locked_add(entry, true); + } + private boolean locked_add(byte entry[], boolean addIfNew) { if (_extended != null) { // extend the entry to 32 bytes System.arraycopy(entry, 0, _extended, 0, entry.length); @@ -121,8 +143,10 @@ public class DecayingBloomFilter { _currentDuplicates++; return true; } else { - _current.insert(_extended); - _previous.insert(_extended); + if (addIfNew) { + _current.insert(_extended); + _previous.insert(_extended); + } return false; } } else { @@ -132,8 +156,10 @@ public class DecayingBloomFilter { _currentDuplicates++; return true; } else { - _current.locked_insert(entry); - _previous.locked_insert(entry); + if (addIfNew) { + _current.locked_insert(entry); + _previous.locked_insert(entry); + } return false; } } diff --git a/core/java/src/net/i2p/util/LogManager.java b/core/java/src/net/i2p/util/LogManager.java index 4b690afa4c..6d60b95183 100644 --- a/core/java/src/net/i2p/util/LogManager.java +++ b/core/java/src/net/i2p/util/LogManager.java @@ -140,8 +140,8 @@ public class LogManager { public Log getLog(String name) { return getLog(null, name); } public Log getLog(Class cls, String name) { Log rv = null; + String scope = Log.getScope(name, cls); synchronized (_logs) { - String scope = Log.getScope(name, cls); rv = (Log)_logs.get(scope); if (rv == null) { rv = new Log(this, cls, name); @@ -154,10 +154,7 @@ public class LogManager { public List getLogs() { List rv = null; synchronized (_logs) { - rv = new ArrayList(_logs.size()); - for (Iterator iter = _logs.values().iterator(); iter.hasNext(); ) { - rv.add(iter.next()); - } + rv = new ArrayList(_logs.values()); } return rv; } diff --git a/core/java/src/net/i2p/util/PooledRandomSource.java b/core/java/src/net/i2p/util/PooledRandomSource.java new file mode 100644 index 0000000000..21cf4f5145 --- /dev/null +++ b/core/java/src/net/i2p/util/PooledRandomSource.java @@ -0,0 +1,141 @@ +package net.i2p.util; + +/* + * free (adj.): unencumbered; not under the control of others + * Written by jrandom in 2005 and released into the public domain + * with no warranty of any kind, either expressed or implied. + * It probably won't make your computer catch on fire, or eat + * your children, but it might. Use at your own risk. + * + */ + +import net.i2p.I2PAppContext; +import net.i2p.crypto.EntropyHarvester; + +/** + * Maintain a set of PRNGs to feed the apps + */ +public class PooledRandomSource extends RandomSource { + private Log _log; + private RandomSource _pool[]; + private volatile int _nextPool; + + private static final int POOL_SIZE = 16; + + public PooledRandomSource(I2PAppContext context) { + super(context); + _log = context.logManager().getLog(PooledRandomSource.class); + _pool = new RandomSource[POOL_SIZE]; + for (int i = 0; i < POOL_SIZE; i++) { + _pool[i] = new RandomSource(context); + _pool[i].nextBoolean(); + } + _nextPool = 0; + } + + private final RandomSource pickPRNG() { + return _pool[(_nextPool++) % POOL_SIZE]; + } + + /** + * According to the java docs (http://java.sun.com/j2se/1.4.1/docs/api/java/util/Random.html#nextInt(int)) + * nextInt(n) should return a number between 0 and n (including 0 and excluding n). However, their pseudocode, + * as well as sun's, kaffe's, and classpath's implementation INCLUDES NEGATIVE VALUES. + * WTF. Ok, so we're going to have it return between 0 and n (including 0, excluding n), since + * thats what it has been used for. + * + */ + public int nextInt(int n) { + RandomSource prng = pickPRNG(); + synchronized (prng) { + return prng.nextInt(n); + } + } + + /** + * Like the modified nextInt, nextLong(n) returns a random number from 0 through n, + * including 0, excluding n. + */ + public long nextLong(long n) { + RandomSource prng = pickPRNG(); + synchronized (prng) { + return prng.nextLong(n); + } + } + + /** + * override as synchronized, for those JVMs that don't always pull via + * nextBytes (cough ibm) + */ + public boolean nextBoolean() { + RandomSource prng = pickPRNG(); + synchronized (prng) { + return prng.nextBoolean(); + } + } + /** + * override as synchronized, for those JVMs that don't always pull via + * nextBytes (cough ibm) + */ + public void nextBytes(byte buf[]) { + RandomSource prng = pickPRNG(); + synchronized (prng) { + prng.nextBytes(buf); + } + } + /** + * override as synchronized, for those JVMs that don't always pull via + * nextBytes (cough ibm) + */ + public double nextDouble() { + RandomSource prng = pickPRNG(); + synchronized (prng) { + return prng.nextDouble(); + } + } + /** + * override as synchronized, for those JVMs that don't always pull via + * nextBytes (cough ibm) + */ + public float nextFloat() { + RandomSource prng = pickPRNG(); + synchronized (prng) { + return prng.nextFloat(); + } + } + /** + * override as synchronized, for those JVMs that don't always pull via + * nextBytes (cough ibm) + */ + public double nextGaussian() { + RandomSource prng = pickPRNG(); + synchronized (prng) { + return prng.nextGaussian(); + } + } + /** + * override as synchronized, for those JVMs that don't always pull via + * nextBytes (cough ibm) + */ + public int nextInt() { + RandomSource prng = pickPRNG(); + synchronized (prng) { + return prng.nextInt(); + } + } + /** + * override as synchronized, for those JVMs that don't always pull via + * nextBytes (cough ibm) + */ + public long nextLong() { + RandomSource prng = pickPRNG(); + synchronized (prng) { + return prng.nextLong(); + } + } + + public EntropyHarvester harvester() { + RandomSource prng = pickPRNG(); + return prng.harvester(); + } +} \ No newline at end of file diff --git a/core/java/src/net/i2p/util/RandomSource.java b/core/java/src/net/i2p/util/RandomSource.java index 414f908394..4e22e8dce3 100644 --- a/core/java/src/net/i2p/util/RandomSource.java +++ b/core/java/src/net/i2p/util/RandomSource.java @@ -42,7 +42,7 @@ public class RandomSource extends SecureRandom { * thats what it has been used for. * */ - public synchronized int nextInt(int n) { + public int nextInt(int n) { if (n == 0) return 0; int val = super.nextInt(n); if (val < 0) val = 0 - val; @@ -54,7 +54,7 @@ public class RandomSource extends SecureRandom { * Like the modified nextInt, nextLong(n) returns a random number from 0 through n, * including 0, excluding n. */ - public synchronized long nextLong(long n) { + public long nextLong(long n) { long v = super.nextLong(); if (v < 0) v = 0 - v; if (v >= n) v = v % n; @@ -65,37 +65,37 @@ public class RandomSource extends SecureRandom { * override as synchronized, for those JVMs that don't always pull via * nextBytes (cough ibm) */ - public synchronized boolean nextBoolean() { return super.nextBoolean(); } + public boolean nextBoolean() { return super.nextBoolean(); } /** * override as synchronized, for those JVMs that don't always pull via * nextBytes (cough ibm) */ - public synchronized void nextBytes(byte buf[]) { super.nextBytes(buf); } + public void nextBytes(byte buf[]) { super.nextBytes(buf); } /** * override as synchronized, for those JVMs that don't always pull via * nextBytes (cough ibm) */ - public synchronized double nextDouble() { return super.nextDouble(); } + public double nextDouble() { return super.nextDouble(); } /** * override as synchronized, for those JVMs that don't always pull via * nextBytes (cough ibm) */ - public synchronized float nextFloat() { return super.nextFloat(); } + public float nextFloat() { return super.nextFloat(); } /** * override as synchronized, for those JVMs that don't always pull via * nextBytes (cough ibm) */ - public synchronized double nextGaussian() { return super.nextGaussian(); } + public double nextGaussian() { return super.nextGaussian(); } /** * override as synchronized, for those JVMs that don't always pull via * nextBytes (cough ibm) */ - public synchronized int nextInt() { return super.nextInt(); } + public int nextInt() { return super.nextInt(); } /** * override as synchronized, for those JVMs that don't always pull via * nextBytes (cough ibm) */ - public synchronized long nextLong() { return super.nextLong(); } + public long nextLong() { return super.nextLong(); } public EntropyHarvester harvester() { return _entropyHarvester; } diff --git a/history.txt b/history.txt index bace186e34..56a52be88b 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,15 @@ -$Id: history.txt,v 1.196 2005/04/20 14:15:28 jrandom Exp $ +$Id: history.txt,v 1.197 2005/04/20 15:14:17 jrandom Exp $ + +2005-04-24 jrandom + * Added a pool of PRNGs using a different synchronization technique, + hopefully sufficient to work around IBM's PRNG bugs until we get our + own Fortuna. + * In the streaming lib, don't jack up the RTT on NACK, and have the window + size bound the not-yet-ready messages to the peer, not the unacked + message count (not sure yet whether this is worthwile). + * Many additions to the messageHistory log. + * Handle out of order tunnel fragment delivery (not an issue on the live + net with TCP, but critical with UDP). * 2005-04-20 0.5.0.7 released diff --git a/router/java/src/net/i2p/router/InNetMessagePool.java b/router/java/src/net/i2p/router/InNetMessagePool.java index e1516860f2..c08ae0269a 100644 --- a/router/java/src/net/i2p/router/InNetMessagePool.java +++ b/router/java/src/net/i2p/router/InNetMessagePool.java @@ -150,18 +150,20 @@ public class InNetMessagePool implements Service { shortCircuitTunnelData(messageBody, fromRouterHash); allowMatches = false; } else { - HandlerJobBuilder builder = _handlerJobBuilders[type]; + if ( (type > 0) && (type < _handlerJobBuilders.length) ) { + HandlerJobBuilder builder = _handlerJobBuilders[type]; - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Add message to the inNetMessage pool - builder: " + builder - + " message class: " + messageBody.getClass().getName()); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Add message to the inNetMessage pool - builder: " + builder + + " message class: " + messageBody.getClass().getName()); - if (builder != null) { - Job job = builder.createJob(messageBody, fromRouter, - fromRouterHash); - if (job != null) { - _context.jobQueue().addJob(job); - jobFound = true; + if (builder != null) { + Job job = builder.createJob(messageBody, fromRouter, + fromRouterHash); + if (job != null) { + _context.jobQueue().addJob(job); + jobFound = true; + } } } } diff --git a/router/java/src/net/i2p/router/MessageHistory.java b/router/java/src/net/i2p/router/MessageHistory.java index 0e087b35dc..8570bcdc29 100644 --- a/router/java/src/net/i2p/router/MessageHistory.java +++ b/router/java/src/net/i2p/router/MessageHistory.java @@ -12,6 +12,7 @@ import java.util.TimeZone; import net.i2p.data.Hash; import net.i2p.data.TunnelId; import net.i2p.data.i2np.I2NPMessage; +import net.i2p.router.tunnel.HopConfig; import net.i2p.util.Log; /** @@ -178,10 +179,33 @@ public class MessageHistory { if (tunnel == null) return; StringBuffer buf = new StringBuffer(128); buf.append(getPrefix()); - buf.append("joining tunnel [").append(tunnel.getReceiveTunnelId(0).getTunnelId()).append("] as [").append(state).append("] "); + buf.append("joining as [").append(state); + buf.append("] to tunnel: ").append(tunnel.toString()); addEntry(buf.toString()); } + /** + * The local router has joined the given tunnel operating in the given state. + * + * @param state {"free inbound", "allocated inbound", "inactive inbound", "outbound", "participant", "pending"} + * @param tunnel tunnel joined + */ + public void tunnelJoined(String state, HopConfig tunnel) { + if (!_doLog) return; + if (tunnel == null) return; + StringBuffer buf = new StringBuffer(128); + buf.append(getPrefix()); + buf.append("joining as [").append(state); + buf.append("] to tunnel: ").append(tunnel.toString()); + addEntry(buf.toString()); + } + + public void tunnelDispatched(String info) { + if (!_doLog) return; + if (info == null) return; + addEntry(getPrefix() + "tunnel dispatched: " + info); + } + /** * The local router has detected a failure in the given tunnel * @@ -352,9 +376,6 @@ public class MessageHistory { buf.append("from [").append(getName(from)).append("] "); buf.append("expiring on [").append(getTime(expiration)).append("] valid? ").append(isValid); addEntry(buf.toString()); - if (messageType.equals("net.i2p.data.i2np.TunnelMessage")) { - //_log.warn("ReceiveMessage tunnel message ["+messageId+"]", new Exception("Receive tunnel")); - } } public void receiveMessage(String messageType, long messageId, long expiration, boolean isValid) { receiveMessage(messageType, messageId, expiration, null, isValid); @@ -404,12 +425,13 @@ public class MessageHistory { addEntry(buf.toString()); } - public void receiveTunnelFragment(long messageId, int fragmentId) { + public void receiveTunnelFragment(long messageId, int fragmentId, String status) { if (!_doLog) return; if (messageId == -1) throw new IllegalArgumentException("why are you -1?"); StringBuffer buf = new StringBuffer(48); buf.append(getPrefix()); buf.append("Receive fragment ").append(fragmentId).append(" in ").append(messageId); + buf.append(" status: ").append(status); addEntry(buf.toString()); } public void receiveTunnelFragmentComplete(long messageId) { @@ -420,12 +442,13 @@ public class MessageHistory { buf.append("Receive fragmented message completely: ").append(messageId); addEntry(buf.toString()); } - public void droppedFragmentedMessage(long messageId) { + public void droppedFragmentedMessage(long messageId, String status) { if (!_doLog) return; if (messageId == -1) throw new IllegalArgumentException("why are you -1?"); StringBuffer buf = new StringBuffer(48); buf.append(getPrefix()); buf.append("Fragmented message dropped: ").append(messageId); + buf.append(" ").append(status); addEntry(buf.toString()); } public void fragmentMessage(long messageId, int numFragments) { @@ -436,6 +459,15 @@ public class MessageHistory { buf.append("Break message ").append(messageId).append(" into fragments: ").append(numFragments); addEntry(buf.toString()); } + public void fragmentMessage(long messageId, int numFragments, String tunnel) { + if (!_doLog) return; + if (messageId == -1) throw new IllegalArgumentException("why are you -1?"); + StringBuffer buf = new StringBuffer(48); + buf.append(getPrefix()); + buf.append("Break message ").append(messageId).append(" into fragments: ").append(numFragments); + buf.append(" on ").append(tunnel); + addEntry(buf.toString()); + } public void droppedTunnelDataMessageUnknown(long msgId, long tunnelId) { if (!_doLog) return; if (msgId == -1) throw new IllegalArgumentException("why are you -1?"); diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index bd50e1adb8..bdf023fde5 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.188 $ $Date: 2005/04/20 14:15:25 $"; + public final static String ID = "$Revision: 1.189 $ $Date: 2005/04/20 15:14:19 $"; public final static String VERSION = "0.5.0.7"; - public final static long BUILD = 0; + public final static long BUILD = 1; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/transport/udp/ACKSender.java b/router/java/src/net/i2p/router/transport/udp/ACKSender.java index 407bd07bba..fd7b380a88 100644 --- a/router/java/src/net/i2p/router/transport/udp/ACKSender.java +++ b/router/java/src/net/i2p/router/transport/udp/ACKSender.java @@ -24,6 +24,7 @@ public class ACKSender implements Runnable { _fragments = fragments; _transport = transport; _builder = new PacketBuilder(_context); + _context.statManager().createRateStat("udp.sendACKCount", "how many ack messages were sent to a peer", "udp", new long[] { 60*1000, 60*60*1000 }); } public void run() { @@ -32,6 +33,8 @@ public class ACKSender implements Runnable { if (peer != null) { List acks = peer.retrieveACKs(); if ( (acks != null) && (acks.size() > 0) ) { + _context.statManager().addRateData("udp.sendACKCount", acks.size(), 0); + _context.statManager().getStatLog().addData(peer.getRemoteHostString(), "udp.peer.sendACKCount", acks.size(), 0); UDPPacket ack = _builder.buildACK(peer, acks); if (_log.shouldLog(Log.INFO)) _log.info("Sending ACK for " + acks); diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java index 76d0cd39bd..72d6349eb4 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java @@ -6,6 +6,7 @@ import java.util.List; import java.util.Map; import net.i2p.router.RouterContext; +import net.i2p.util.DecayingBloomFilter; import net.i2p.util.I2PThread; import net.i2p.util.Log; @@ -28,8 +29,8 @@ public class InboundMessageFragments { private List _unsentACKs; /** list of messages (InboundMessageState) fully received but not interpreted yet */ private List _completeMessages; - /** list of message IDs (Long) recently received, so we can ignore in flight dups */ - private List _recentlyCompletedMessages; + /** list of message IDs recently received, so we can ignore in flight dups */ + private DecayingBloomFilter _recentlyCompletedMessages; private OutboundMessageFragments _outbound; private UDPTransport _transport; /** this can be broken down further, but to start, OneBigLock does the trick */ @@ -39,6 +40,8 @@ public class InboundMessageFragments { private static final int RECENTLY_COMPLETED_SIZE = 100; /** how frequently do we want to send ACKs to a peer? */ private static final int ACK_FREQUENCY = 200; + /** decay the recently completed every 2 minutes */ + private static final int DECAY_PERIOD = 120*1000; public InboundMessageFragments(RouterContext ctx, OutboundMessageFragments outbound, UDPTransport transport) { _context = ctx; @@ -46,7 +49,6 @@ public class InboundMessageFragments { _inboundMessages = new HashMap(64); _unsentACKs = new ArrayList(64); _completeMessages = new ArrayList(64); - _recentlyCompletedMessages = new ArrayList(RECENTLY_COMPLETED_SIZE); _outbound = outbound; _transport = transport; _context.statManager().createRateStat("udp.receivedCompleteTime", "How long it takes to receive a full message", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); @@ -60,6 +62,11 @@ public class InboundMessageFragments { public void startup() { _alive = true; + // may want to extend the DecayingBloomFilter so we can use a smaller + // array size (currently its tuned for 10 minute rates for the + // messageValidator) + _recentlyCompletedMessages = new DecayingBloomFilter(_context, DECAY_PERIOD, 8); + I2PThread t = new I2PThread(new ACKSender(_context, this, _transport), "UDP ACK sender"); t.setDaemon(true); t.start(); @@ -70,6 +77,9 @@ public class InboundMessageFragments { } public void shutdown() { _alive = false; + if (_recentlyCompletedMessages != null) + _recentlyCompletedMessages.stopDecaying(); + _recentlyCompletedMessages = null; synchronized (_stateLock) { _completeMessages.clear(); _unsentACKs.clear(); @@ -112,8 +122,15 @@ public class InboundMessageFragments { for (int i = 0; i < fragments; i++) { Long messageId = new Long(data.readMessageId(i)); - if (_recentlyCompletedMessages.contains(messageId)) { + if (_recentlyCompletedMessages.isKnown(messageId.longValue())) { _context.statManager().addRateData("udp.ignoreRecentDuplicate", 1, 0); + from.messageFullyReceived(messageId); + if (!_unsentACKs.contains(from)) + _unsentACKs.add(from); + if (_log.shouldLog(Log.WARN)) + _log.warn("Message received is a dup: " + messageId + " dups: " + + _recentlyCompletedMessages.getCurrentDuplicateCount() + " out of " + + _recentlyCompletedMessages.getInsertedCount()); continue; } @@ -132,9 +149,7 @@ public class InboundMessageFragments { messageComplete = true; messages.remove(messageId); - while (_recentlyCompletedMessages.size() >= RECENTLY_COMPLETED_SIZE) - _recentlyCompletedMessages.remove(0); - _recentlyCompletedMessages.add(messageId); + _recentlyCompletedMessages.add(messageId.longValue()); _completeMessages.add(state); @@ -169,12 +184,15 @@ public class InboundMessageFragments { long acks[] = data.readACKs(); if (acks != null) { _context.statManager().addRateData("udp.receivedACKs", acks.length, 0); + _context.statManager().getStatLog().addData(from.getRemoteHostString(), "udp.peer.receiveACKCount", acks.length, 0); + for (int i = 0; i < acks.length; i++) { if (_log.shouldLog(Log.INFO)) _log.info("Full ACK of message " + acks[i] + " received!"); fragments += _outbound.acked(acks[i], from.getRemotePeer()); } - from.messageACKed(fragments * from.getMTU()); // estimated size + } else { + _log.error("Received ACKs with no acks?! " + data); } } if (data.readECN()) @@ -212,7 +230,8 @@ public class InboundMessageFragments { synchronized (_stateLock) { for (int i = 0; i < _unsentACKs.size(); i++) { PeerState peer = (PeerState)_unsentACKs.get(i); - if (peer.getLastACKSend() + ACK_FREQUENCY <= now) { + if ( (peer.getLastACKSend() + ACK_FREQUENCY <= now) || + (peer.unsentACKThresholdReached()) ) { _unsentACKs.remove(i); peer.setLastACKSend(now); return peer; diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java index fcc43492c1..32619896a9 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java @@ -31,6 +31,8 @@ public class OutboundMessageFragments { /** which message should we build the next packet out of? */ private int _nextPacketMessage; private PacketBuilder _builder; + /** if we can handle more messages explicitly, set this to true */ + private boolean _allowExcess; private static final int MAX_ACTIVE = 64; // don't send a packet more than 10 times @@ -44,12 +46,14 @@ public class OutboundMessageFragments { _nextPacketMessage = 0; _builder = new PacketBuilder(ctx); _alive = true; + _allowExcess = false; _context.statManager().createRateStat("udp.sendVolleyTime", "Long it takes to send a full volley", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.sendConfirmTime", "How long it takes to send a message and get the ACK", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.sendConfirmFragments", "How many fragments are included in a fully ACKed message", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.sendConfirmVolley", "How many times did fragments need to be sent before ACK", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.sendFailed", "How many fragments were in a message that couldn't be delivered", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.sendAggressiveFailed", "How many volleys was a packet sent before we gave up", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("udp.outboundActiveCount", "How many messages are in the active pool when a new one is added", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); } public void startup() { _alive = true; } @@ -75,7 +79,9 @@ public class OutboundMessageFragments { return false; else if (_activeMessages.size() < MAX_ACTIVE) return true; - else + else if (_allowExcess) + return true; + else _activeMessages.wait(); } } catch (InterruptedException ie) {} @@ -90,12 +96,16 @@ public class OutboundMessageFragments { public void add(OutNetMessage msg) { OutboundMessageState state = new OutboundMessageState(_context); boolean ok = state.initialize(msg); + state.setPeer(_transport.getPeerState(msg.getTarget().getIdentity().calculateHash())); finishMessages(); + int active = 0; synchronized (_activeMessages) { if (ok) _activeMessages.add(state); + active = _activeMessages.size(); _activeMessages.notifyAll(); } + _context.statManager().addRateData("udp.outboundActiveCount", active, 0); } /** @@ -156,6 +166,9 @@ public class OutboundMessageFragments { } } + private static final long SECOND_MASK = 1023l; + + /** * Grab the next packet that we want to send, blocking until one is ready. * This is the main driver for the packet scheduler @@ -208,17 +221,31 @@ public class OutboundMessageFragments { + " remaining" + " for message " + state.getMessageId() + ": " + state); + if (state.justBeganVolley() && (state.getPushCount() > 0) && (state.getFragmentCount() > 1)) { + peer.messageRetransmitted(); + if (_log.shouldLog(Log.ERROR)) + _log.error("Retransmitting " + state + " to " + peer); + } + // for fairness, we move on in a round robin _nextPacketMessage = i + 1; - if (state.getPushCount() != oldVolley) { + if (currentFragment >= state.getFragmentCount() - 1) { + // this is the last fragment _context.statManager().addRateData("udp.sendVolleyTime", state.getLifetime(), state.getFragmentCount()); - state.setNextSendTime(now + (1000-(now%1000)) + _context.random().nextInt(4000)); + if (state.getPeer() != null) { + int rto = state.getPeer().getRTO() * state.getPushCount(); + //_log.error("changed volley, rto=" + rto + " volley="+ state.getPushCount()); + state.setNextSendTime(now + rto); + } else { + _log.error("changed volley, unknown peer"); + state.setNextSendTime(now + 1000 + _context.random().nextInt(2000)); + } } else { if (peer.getSendWindowBytesRemaining() > 0) state.setNextSendTime(now); else - state.setNextSendTime(now + (1000-(now%1000))); + state.setNextSendTime((now + 1024) & ~SECOND_MASK); } break; } else { @@ -226,7 +253,7 @@ public class OutboundMessageFragments { _log.warn("Allocation of " + fragmentSize + " rejected w/ wsize=" + peer.getSendWindowBytes() + " available=" + peer.getSendWindowBytesRemaining() + " for message " + state.getMessageId() + ": " + state); - state.setNextSendTime(now + (1000-(now%1000))); + state.setNextSendTime((now + 1024) & ~SECOND_MASK); currentFragment = -1; } } @@ -234,7 +261,7 @@ public class OutboundMessageFragments { long time = state.getNextSendTime(); if ( (nextSend < 0) || (time < nextSend) ) nextSend = time; - } + } // end of the for(activeMessages) if (currentFragment < 0) { if (nextSend <= 0) { @@ -248,13 +275,16 @@ public class OutboundMessageFragments { delay = 10; if (delay > 1000) delay = 1000; + _allowExcess = true; + _activeMessages.notifyAll(); try { _activeMessages.wait(delay); } catch (InterruptedException ie) {} } } - } - } + _allowExcess = false; + } // end of the synchronized block + } // end of the while (alive && !found) if (currentFragment >= 0) { if (_log.shouldLog(Log.INFO)) @@ -269,8 +299,8 @@ public class OutboundMessageFragments { } private static final int SSU_HEADER_SIZE = 46; - private static final int UDP_HEADER_SIZE = 8; - private static final int IP_HEADER_SIZE = 20; + static final int UDP_HEADER_SIZE = 8; + static final int IP_HEADER_SIZE = 20; /** how much payload data can we shove in there? */ private static final int fragmentSize(int mtu) { return mtu - SSU_HEADER_SIZE - UDP_HEADER_SIZE - IP_HEADER_SIZE; @@ -309,17 +339,23 @@ public class OutboundMessageFragments { } if (state != null) { + int numSends = state.getMaxSends(); if (_log.shouldLog(Log.INFO)) _log.info("Received ack of " + messageId + " by " + ackedBy.toBase64() - + " after " + state.getLifetime()); + + " after " + state.getLifetime() + " and " + numSends + " sends"); _context.statManager().addRateData("udp.sendConfirmTime", state.getLifetime(), state.getLifetime()); _context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount(), state.getLifetime()); - int numSends = state.getMaxSends(); _context.statManager().addRateData("udp.sendConfirmVolley", numSends, state.getFragmentCount()); if ( (numSends > 1) && (state.getPeer() != null) ) state.getPeer().congestionOccurred(); _transport.succeeded(state.getMessage()); int numFragments = state.getFragmentCount(); + if (state.getPeer() != null) { + // this adjusts the rtt/rto/window/etc + state.getPeer().messageACKed(numFragments*state.getFragmentSize(), state.getLifetime(), state.getMaxSends()); + } else { + _log.warn("message acked, but no peer attacked: " + state); + } state.releaseResources(); return numFragments; } else { diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java index 5e3e92a135..1f73553ea6 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java @@ -31,6 +31,7 @@ public class OutboundMessageState { private long _nextSendTime; private int _pushCount; private short _maxSends; + private int _nextSendFragment; public static final int MAX_FRAGMENTS = 32; private static final ByteCache _cache = ByteCache.getInstance(64, MAX_FRAGMENTS*1024); @@ -40,6 +41,7 @@ public class OutboundMessageState { _log = _context.logManager().getLog(OutboundMessageState.class); _pushCount = 0; _maxSends = 0; + _nextSendFragment = 0; } public synchronized boolean initialize(OutNetMessage msg) { @@ -100,6 +102,7 @@ public class OutboundMessageState { public OutNetMessage getMessage() { return _message; } public long getMessageId() { return _messageId; } public PeerState getPeer() { return _peer; } + public void setPeer(PeerState peer) { _peer = peer; } public boolean isExpired() { return _expiration < _context.clock().now(); } @@ -160,6 +163,7 @@ public class OutboundMessageState { else return _fragmentSends.length; } + public int getFragmentSize() { return _fragmentSize; } /** should we continue sending this fragment? */ public boolean shouldSend(int fragmentNum) { return _fragmentSends[fragmentNum] >= (short)0; } public synchronized int fragmentSize(int fragmentNum) { @@ -178,6 +182,17 @@ public class OutboundMessageState { * @return fragment index, or -1 if all of the fragments were acked */ public int pickNextFragment() { + if (true) { + int rv = _nextSendFragment; + _fragmentSends[rv]++; + _maxSends = _fragmentSends[rv]; + _nextSendFragment++; + if (_nextSendFragment >= _fragmentSends.length) { + _nextSendFragment = 0; + _pushCount++; + } + return rv; + } short minValue = -1; int minIndex = -1; int startOffset = _context.random().nextInt(_fragmentSends.length); @@ -207,9 +222,9 @@ public class OutboundMessageState { break; } } - if (endOfVolley) + if (endOfVolley) { _pushCount++; - + } if (_log.shouldLog(Log.DEBUG)) { StringBuffer buf = new StringBuffer(64); @@ -223,6 +238,13 @@ public class OutboundMessageState { } return minIndex; } + + public boolean justBeganVolley() { + if (_fragmentSends.length == 1) + return true; + else + return _nextSendFragment == 1; + } /** * Write a part of the the message onto the specified buffer. diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index 3c6d4928d3..7c6b14d44a 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -113,6 +113,12 @@ public class PeerState { private int _mtu; /** when did we last check the MTU? */ private long _mtuLastChecked; + /** current round trip time estimate */ + private int _rtt; + /** smoothed mean deviation in the rtt */ + private int _rttDeviation; + /** current retransmission timeout */ + private int _rto; private long _messagesReceived; private long _messagesSent; @@ -120,7 +126,7 @@ public class PeerState { private static final int DEFAULT_SEND_WINDOW_BYTES = 16*1024; private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES; private static final int MAX_SEND_WINDOW_BYTES = 1024*1024; - private static final int DEFAULT_MTU = 1024; + private static final int DEFAULT_MTU = 1492; public PeerState(I2PAppContext ctx) { _context = ctx; @@ -153,6 +159,9 @@ public class PeerState { _mtu = DEFAULT_MTU; _mtuLastChecked = -1; _lastACKSend = -1; + _rtt = 1000; + _rttDeviation = _rtt; + _rto = 6000; _messagesReceived = 0; _messagesSent = 0; } @@ -328,6 +337,7 @@ public class PeerState { _sendWindowBytesRemaining = _sendWindowBytes; _lastSendRefill = now; } + //if (true) return true; if (size <= _sendWindowBytesRemaining) { _sendWindowBytesRemaining -= size; _lastSendTime = now; @@ -393,15 +403,22 @@ public class PeerState { /** pull off the ACKs (Long) to send to the peer */ public List retrieveACKs() { List rv = null; + int threshold = countMaxACKs(); synchronized (_currentACKs) { - rv = new ArrayList(_currentACKs); - _currentACKs.clear(); + if (_currentACKs.size() < threshold) { + rv = new ArrayList(_currentACKs); + _currentACKs.clear(); + } else { + rv = new ArrayList(threshold); + for (int i = 0; i < threshold; i++) + rv.add(_currentACKs.remove(0)); + } } return rv; } /** we sent a message which was ACKed containing the given # of bytes */ - public void messageACKed(int bytesACKed) { + public void messageACKed(int bytesACKed, long lifetime, int numSends) { _consecutiveSendingSecondsWithoutACKs = 0; if (_sendWindowBytes <= _slowStartThreshold) { _sendWindowBytes += bytesACKed; @@ -414,7 +431,35 @@ public class PeerState { _sendWindowBytes = MAX_SEND_WINDOW_BYTES; _lastReceiveTime = _context.clock().now(); _messagesSent++; + if (numSends <= 2) + recalculateTimeouts(lifetime); + else + _log.warn("acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed); } + + /** adjust the tcp-esque timeouts */ + private void recalculateTimeouts(long lifetime) { + _rttDeviation = _rttDeviation + (int)(0.25d*(Math.abs(lifetime-_rtt)-_rttDeviation)); + _rtt = (int)((float)_rtt*(0.9f) + (0.1f)*(float)lifetime); + _rto = _rtt + (_rttDeviation<<2); + if (_log.shouldLog(Log.WARN)) + _log.warn("Recalculating timeouts w/ lifetime=" + lifetime + ": rtt=" + _rtt + + " rttDev=" + _rttDeviation + " rto=" + _rto); + if (_rto < 1000) + _rto = 1000; + if (_rto > 5000) + _rto = 5000; + } + /** we are resending a packet, so lets jack up the rto */ + public void messageRetransmitted() { + //_rto *= 2; + } + /** how long does it usually take to get a message ACKed? */ + public int getRTT() { return _rtt; } + /** how soon should we retransmit an unacked packet? */ + public int getRTO() { return _rto; } + /** how skewed are the measured RTTs? */ + public long getRTTDeviation() { return _rttDeviation; } public long getMessagesSent() { return _messagesSent; } public long getMessagesReceived() { return _messagesReceived; } @@ -435,6 +480,25 @@ public class PeerState { /** when did we last send an ACK to the peer? */ public long getLastACKSend() { return _lastACKSend; } public void setLastACKSend(long when) { _lastACKSend = when; } + public boolean unsentACKThresholdReached() { + int threshold = countMaxACKs(); + synchronized (_currentACKs) { + return _currentACKs.size() >= threshold; + } + } + private int countMaxACKs() { + return (_mtu + - OutboundMessageFragments.IP_HEADER_SIZE + - OutboundMessageFragments.UDP_HEADER_SIZE + - UDPPacket.IV_SIZE + - UDPPacket.MAC_SIZE + - 1 // type flag + - 4 // timestamp + - 1 // data flag + - 1 // # ACKs + - 16 // padding safety + ) / 4; + } public String getRemoteHostString() { return _remoteHostString; } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPPacket.java b/router/java/src/net/i2p/router/transport/udp/UDPPacket.java index d7e1403ae3..4d340f19a8 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPPacket.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPPacket.java @@ -24,7 +24,7 @@ import net.i2p.util.Log; */ public class UDPPacket { private I2PAppContext _context; - private Log _log; + private static Log _log; private DatagramPacket _packet; private short _priority; private long _initializeTime; @@ -35,6 +35,7 @@ public class UDPPacket { private static final List _packetCache; static { _packetCache = new ArrayList(256); + _log = I2PAppContext.getGlobalContext().logManager().getLog(UDPPacket.class); } private static final boolean CACHE = false; @@ -67,7 +68,6 @@ public class UDPPacket { private UDPPacket(I2PAppContext ctx) { _context = ctx; - _log = ctx.logManager().getLog(UDPPacket.class); _dataBuf = _dataCache.acquire(); _data = _dataBuf.getData(); _packet = new DatagramPacket(_data, MAX_PACKET_SIZE); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index 4fb1f7743a..7b30df4808 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -86,7 +86,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority private static final int PRIORITY_WEIGHT[] = new int[] { 1, 1, 1, 1, 1, 2 }; /** should we flood all UDP peers with the configured rate? */ - private static final boolean SHOULD_FLOOD_PEERS = false; + private static final boolean SHOULD_FLOOD_PEERS = true; public UDPTransport(RouterContext ctx) { super(ctx); @@ -492,6 +492,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority buf.append(" PeerLocation\n"); buf.append(" Last sendLast recv\n"); buf.append(" Lifetimecwndssthresh\n"); + buf.append(" rttdevrto\n"); buf.append(" SentReceived\n"); buf.append(" \n"); out.write(buf.toString()); @@ -534,13 +535,25 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority buf.append(""); buf.append(""); - buf.append(peer.getSendWindowBytes()); + buf.append(peer.getSendWindowBytes()/1024); + buf.append("K"); + + buf.append(""); + buf.append(peer.getSlowStartThreshold()/1024); + buf.append("K"); + + buf.append(""); + buf.append(peer.getRTT()); + buf.append(""); + + buf.append(""); + buf.append(peer.getRTTDeviation()); buf.append(""); buf.append(""); - buf.append(peer.getSlowStartThreshold()); + buf.append(peer.getRTO()); buf.append(""); - + buf.append(""); buf.append(peer.getMessagesSent()); buf.append(""); diff --git a/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java b/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java index b213f57384..74b2794aee 100644 --- a/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java +++ b/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java @@ -52,8 +52,10 @@ public class BatchedPreprocessor extends TrivialPreprocessor { // loops because sends may be partial TunnelGateway.Pending msg = (TunnelGateway.Pending)pending.get(0); send(pending, 0, 0, sender, rec); - if (msg.getOffset() >= msg.getData().length) + if (msg.getOffset() >= msg.getData().length) { + notePreprocessing(msg.getMessageId(), msg.getFragmentNumber()); pending.remove(0); + } } return false; } @@ -84,11 +86,14 @@ public class BatchedPreprocessor extends TrivialPreprocessor { _log.info("Allocated=" + allocated + " so we sent " + (i+1) + " (last complete? " + (msg.getOffset() >= msg.getData().length) + ")"); - for (int j = 0; j < i; j++) - pending.remove(0); + for (int j = 0; j < i; j++) { + TunnelGateway.Pending cur = (TunnelGateway.Pending)pending.remove(0); + notePreprocessing(cur.getMessageId(), cur.getFragmentNumber()); + } if (msg.getOffset() >= msg.getData().length) { // ok, this last message fit perfectly, remove it too - pending.remove(0); + TunnelGateway.Pending cur = (TunnelGateway.Pending)pending.remove(0); + notePreprocessing(cur.getMessageId(), cur.getFragmentNumber()); } if (i > 0) _context.statManager().addRateData("tunnel.batchMultipleCount", i+1, 0); @@ -113,7 +118,11 @@ public class BatchedPreprocessor extends TrivialPreprocessor { _context.statManager().addRateData("tunnel.batchDelaySent", pending.size(), 0); send(pending, 0, pending.size()-1, sender, rec); - pending.clear(); + + while (pending.size() > 0) { + TunnelGateway.Pending cur = (TunnelGateway.Pending)pending.remove(0); + notePreprocessing(cur.getMessageId(), cur.getFragmentNumber()); + } _pendingSince = 0; return false; } else { diff --git a/router/java/src/net/i2p/router/tunnel/BatchedRouterPreprocessor.java b/router/java/src/net/i2p/router/tunnel/BatchedRouterPreprocessor.java index 7c8552e9d9..01e42bfb10 100644 --- a/router/java/src/net/i2p/router/tunnel/BatchedRouterPreprocessor.java +++ b/router/java/src/net/i2p/router/tunnel/BatchedRouterPreprocessor.java @@ -11,6 +11,7 @@ import net.i2p.router.RouterContext; public class BatchedRouterPreprocessor extends BatchedPreprocessor { private RouterContext _routerContext; private TunnelCreatorConfig _config; + private HopConfig _hopConfig; /** * How frequently should we flush non-full messages, in milliseconds @@ -20,13 +21,18 @@ public class BatchedRouterPreprocessor extends BatchedPreprocessor { public static final int DEFAULT_BATCH_FREQUENCY = 500; public BatchedRouterPreprocessor(RouterContext ctx) { - this(ctx, null); + this(ctx, (HopConfig)null); } public BatchedRouterPreprocessor(RouterContext ctx, TunnelCreatorConfig cfg) { super(ctx); _routerContext = ctx; _config = cfg; } + public BatchedRouterPreprocessor(RouterContext ctx, HopConfig cfg) { + super(ctx); + _routerContext = ctx; + _hopConfig = cfg; + } /** how long should we wait before flushing */ protected long getSendDelay() { @@ -50,6 +56,9 @@ public class BatchedRouterPreprocessor extends BatchedPreprocessor { } protected void notePreprocessing(long messageId, int numFragments) { - _routerContext.messageHistory().fragmentMessage(messageId, numFragments); + if (_config != null) + _routerContext.messageHistory().fragmentMessage(messageId, numFragments, _config.toString()); + else + _routerContext.messageHistory().fragmentMessage(messageId, numFragments, _hopConfig.toString()); } } diff --git a/router/java/src/net/i2p/router/tunnel/FragmentHandler.java b/router/java/src/net/i2p/router/tunnel/FragmentHandler.java index 496eb930f0..74a2f9e3fd 100644 --- a/router/java/src/net/i2p/router/tunnel/FragmentHandler.java +++ b/router/java/src/net/i2p/router/tunnel/FragmentHandler.java @@ -263,7 +263,7 @@ public class FragmentHandler { SimpleTimer.getInstance().removeEvent(msg.getExpireEvent()); receiveComplete(msg); } else { - noteReception(msg.getMessageId(), 0); + noteReception(msg.getMessageId(), 0, msg.toString()); } if (isNew && fragmented && !msg.isComplete()) { @@ -325,7 +325,7 @@ public class FragmentHandler { _context.statManager().addRateData("tunnel.fragmentedComplete", msg.getFragmentCount(), msg.getLifetime()); receiveComplete(msg); } else { - noteReception(msg.getMessageId(), fragmentNum); + noteReception(msg.getMessageId(), fragmentNum, msg.toString()); } if (isNew && !msg.isComplete()) { @@ -359,9 +359,9 @@ public class FragmentHandler { } } - protected void noteReception(long messageId, int fragmentId) {} + protected void noteReception(long messageId, int fragmentId, String status) {} protected void noteCompletion(long messageId) {} - protected void noteFailure(long messageId) {} + protected void noteFailure(long messageId, String status) {} /** * Receive messages out of the tunnel endpoint. There should be a single @@ -393,7 +393,7 @@ public class FragmentHandler { } if (removed && !_msg.getReleased()) { _failed++; - noteFailure(_msg.getMessageId()); + noteFailure(_msg.getMessageId(), _msg.toString()); if (_log.shouldLog(Log.WARN)) _log.warn("Dropped failed fragmented message: " + _msg); _context.statManager().addRateData("tunnel.fragmentedDropped", _msg.getFragmentCount(), _msg.getLifetime()); diff --git a/router/java/src/net/i2p/router/tunnel/FragmentedMessage.java b/router/java/src/net/i2p/router/tunnel/FragmentedMessage.java index cf671c6047..c5a3140145 100644 --- a/router/java/src/net/i2p/router/tunnel/FragmentedMessage.java +++ b/router/java/src/net/i2p/router/tunnel/FragmentedMessage.java @@ -84,7 +84,7 @@ public class FragmentedMessage { + Base64.encode(ba.getData(), ba.getOffset(), ba.getValid())); _fragments[fragmentNum] = ba; - _lastReceived = isLast; + _lastReceived = _lastReceived || isLast; if (fragmentNum > _highFragmentNum) _highFragmentNum = fragmentNum; if (isLast && fragmentNum <= 0) @@ -119,7 +119,7 @@ public class FragmentedMessage { _log.debug("fragment[0/" + offset + "/" + length + "]: " + Base64.encode(ba.getData(), ba.getOffset(), ba.getValid())); _fragments[0] = ba; - _lastReceived = isLast; + _lastReceived = _lastReceived || isLast; _toRouter = toRouter; _toTunnel = toTunnel; if (_highFragmentNum < 0) diff --git a/router/java/src/net/i2p/router/tunnel/RouterFragmentHandler.java b/router/java/src/net/i2p/router/tunnel/RouterFragmentHandler.java index 3d712699f3..bb0b3920d1 100644 --- a/router/java/src/net/i2p/router/tunnel/RouterFragmentHandler.java +++ b/router/java/src/net/i2p/router/tunnel/RouterFragmentHandler.java @@ -1,25 +1,34 @@ package net.i2p.router.tunnel; import net.i2p.router.RouterContext; +import net.i2p.util.Log; /** * Minor extension to allow message history integration */ public class RouterFragmentHandler extends FragmentHandler { private RouterContext _routerContext; + private Log _log; public RouterFragmentHandler(RouterContext context, DefragmentedReceiver receiver) { super(context, receiver); _routerContext = context; + _log = context.logManager().getLog(RouterFragmentHandler.class); } - protected void noteReception(long messageId, int fragmentId) { - _routerContext.messageHistory().receiveTunnelFragment(messageId, fragmentId); + protected void noteReception(long messageId, int fragmentId, String status) { + if (_log.shouldLog(Log.INFO)) + _log.info("Received fragment " + fragmentId + " for message " + messageId + ": " + status); + _routerContext.messageHistory().receiveTunnelFragment(messageId, fragmentId, status); } protected void noteCompletion(long messageId) { + if (_log.shouldLog(Log.INFO)) + _log.info("Received complete message " + messageId); _routerContext.messageHistory().receiveTunnelFragmentComplete(messageId); } - protected void noteFailure(long messageId) { - _routerContext.messageHistory().droppedFragmentedMessage(messageId); + protected void noteFailure(long messageId, String status) { + if (_log.shouldLog(Log.INFO)) + _log.info("Dropped message " + messageId + ": " + status); + _routerContext.messageHistory().droppedFragmentedMessage(messageId, status); } } diff --git a/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java b/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java index a435e28a42..6361d6e255 100644 --- a/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java +++ b/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java @@ -225,7 +225,8 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor { offset += payloadLength; msg.setOffset(msg.getOffset() + payloadLength); - msg.incrementFragmentNumber(); + if (fragmented) + msg.incrementFragmentNumber(); return offset; } @@ -264,7 +265,8 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor { offset += payloadLength; - msg.incrementFragmentNumber(); + if (!isLast) + msg.incrementFragmentNumber(); msg.setOffset(msg.getOffset() + payloadLength); return offset; } diff --git a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java index 3345217729..c8bfd9e2b1 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java @@ -107,8 +107,11 @@ public class TunnelDispatcher implements Service { new long[] { 60*10*1000l, 60*60*1000l, 24*60*60*1000l }); } - private TunnelGateway.QueuePreprocessor createPreprocessor() { - return createPreprocessor(null); + private TunnelGateway.QueuePreprocessor createPreprocessor(HopConfig cfg) { + if (true) + return new BatchedRouterPreprocessor(_context, cfg); + else + return new TrivialRouterPreprocessor(_context); } private TunnelGateway.QueuePreprocessor createPreprocessor(TunnelCreatorConfig cfg) { if (true) @@ -133,6 +136,7 @@ public class TunnelDispatcher implements Service { _outboundGateways.put(outId, gw); } _context.statManager().addRateData("tunnel.joinOutboundGateway", 1, 0); + _context.messageHistory().tunnelJoined("outbound", cfg); } else { TunnelGatewayZeroHop gw = new TunnelGatewayZeroHop(_context, cfg); TunnelId outId = cfg.getConfig(0).getSendTunnel(); @@ -140,6 +144,7 @@ public class TunnelDispatcher implements Service { _outboundGateways.put(outId, gw); } _context.statManager().addRateData("tunnel.joinOutboundGatewayZeroHop", 1, 0); + _context.messageHistory().tunnelJoined("outboundZeroHop", cfg); } } /** @@ -156,6 +161,7 @@ public class TunnelDispatcher implements Service { _participants.put(recvId, participant); } _context.statManager().addRateData("tunnel.joinInboundEndpoint", 1, 0); + _context.messageHistory().tunnelJoined("inboundEndpoint", cfg); } else { TunnelGatewayZeroHop gw = new TunnelGatewayZeroHop(_context, cfg); TunnelId recvId = cfg.getConfig(0).getReceiveTunnel(); @@ -163,6 +169,7 @@ public class TunnelDispatcher implements Service { _inboundGateways.put(recvId, gw); } _context.statManager().addRateData("tunnel.joinInboundEndpointZeroHop", 1, 0); + _context.messageHistory().tunnelJoined("inboundEndpointZeroHop", cfg); } } @@ -183,6 +190,7 @@ public class TunnelDispatcher implements Service { _participatingConfig.put(recvId, cfg); numParticipants = _participatingConfig.size(); } + _context.messageHistory().tunnelJoined("participant", cfg); _context.statManager().addRateData("tunnel.participatingTunnels", numParticipants, 0); _context.statManager().addRateData("tunnel.joinParticipant", 1, 0); if (cfg.getExpiration() > _lastParticipatingExpiration) @@ -206,6 +214,7 @@ public class TunnelDispatcher implements Service { _participatingConfig.put(recvId, cfg); numParticipants = _participatingConfig.size(); } + _context.messageHistory().tunnelJoined("outboundEndpoint", cfg); _context.statManager().addRateData("tunnel.participatingTunnels", numParticipants, 0); _context.statManager().addRateData("tunnel.joinOutboundEndpoint", 1, 0); @@ -221,7 +230,7 @@ public class TunnelDispatcher implements Service { public void joinInboundGateway(HopConfig cfg) { if (_log.shouldLog(Log.INFO)) _log.info("Joining as inbound gateway: " + cfg); - TunnelGateway.QueuePreprocessor preproc = createPreprocessor(); + TunnelGateway.QueuePreprocessor preproc = createPreprocessor(cfg); TunnelGateway.Sender sender = new InboundSender(_context, cfg); TunnelGateway.Receiver receiver = new InboundGatewayReceiver(_context, cfg); TunnelGateway gw = new TunnelGateway(_context, preproc, sender, receiver); @@ -234,6 +243,7 @@ public class TunnelDispatcher implements Service { _participatingConfig.put(recvId, cfg); numParticipants = _participatingConfig.size(); } + _context.messageHistory().tunnelJoined("inboundGateway", cfg); _context.statManager().addRateData("tunnel.participatingTunnels", numParticipants, 0); _context.statManager().addRateData("tunnel.joinInboundGateway", 1, 0); @@ -346,6 +356,8 @@ public class TunnelDispatcher implements Service { if (_log.shouldLog(Log.DEBUG)) _log.debug("dispatch to participant " + participant + ": " + msg.getUniqueId() + " from " + recvFrom.toBase64().substring(0,4)); + _context.messageHistory().tunnelDispatched("message " + msg.getUniqueId() + " on tunnel " + + msg.getTunnelId().getTunnelId() + " as participant"); participant.dispatch(msg, recvFrom); _context.statManager().addRateData("tunnel.dispatchParticipant", 1, 0); } else { @@ -358,7 +370,10 @@ public class TunnelDispatcher implements Service { if (_log.shouldLog(Log.DEBUG)) _log.debug("dispatch where we are the outbound endpoint: " + endpoint + ": " + msg + " from " + recvFrom.toBase64().substring(0,4)); + _context.messageHistory().tunnelDispatched("message " + msg.getUniqueId() + " on tunnel " + + msg.getTunnelId().getTunnelId() + " as outbound endpoint"); endpoint.dispatch(msg, recvFrom); + _context.statManager().addRateData("tunnel.dispatchEndpoint", 1, 0); } else { _context.messageHistory().droppedTunnelDataMessageUnknown(msg.getUniqueId(), msg.getTunnelId().getTunnelId()); @@ -397,6 +412,8 @@ public class TunnelDispatcher implements Service { + msg.getMessage().getClass().getName()); return; } + _context.messageHistory().tunnelDispatched("message " + msg.getUniqueId() + "/" + msg.getMessage().getUniqueId() + " on tunnel " + + msg.getTunnelId().getTunnelId() + " as inbound gateway"); gw.add(msg); _context.statManager().addRateData("tunnel.dispatchInbound", 1, 0); } else { @@ -464,6 +481,9 @@ public class TunnelDispatcher implements Service { + (before-msg.getMessageExpiration()) + "ms ago? " + msg, new Exception("cause")); } + _context.messageHistory().tunnelDispatched("message " + msg.getUniqueId() + " on tunnel " + + outboundTunnel + "/" + targetTunnel + " to " + + targetPeer + " as outbound gateway"); gw.add(msg, targetPeer, targetTunnel); if (targetTunnel == null) _context.statManager().addRateData("tunnel.dispatchOutboundPeer", 1, 0);