2005-04-24 jrandom

* Added a pool of PRNGs using a different synchronization technique,
      hopefully sufficient to work around IBM's PRNG bugs until we get our
      own Fortuna.
    * In the streaming lib, don't jack up the RTT on NACK, and have the window
      size bound the not-yet-ready messages to the peer, not the unacked
      message count (not sure yet whether this is worthwile).
    * Many additions to the messageHistory log.
    * Handle out of order tunnel fragment delivery (not an issue on the live
      net with TCP, but critical with UDP).
and for udp stuff:
* implemented tcp-esque rto code in the udp transport
* make sure we don't ACK too many messages at once
* transmit fragments in a simple (nonrandom) order so that we can more easily
  adjust timeouts/etc.
* let the active outbound pool grow dynamically if there are outbound slots to
  spare
* use a simple decaying bloom filter at the UDP level to drop duplicate resent
  packets.
This commit is contained in:
jrandom
2005-04-24 18:42:02 +00:00
committed by zzz
parent dae6be14b7
commit b2f0d17e94
25 changed files with 517 additions and 97 deletions

View File

@ -152,7 +152,8 @@ public class Connection {
if (!_connected) if (!_connected)
return false; return false;
started = true; started = true;
if ( (_outboundPackets.size() >= _options.getWindowSize()) || (_activeResends > 0) ) { if ( (_outboundPackets.size() >= _options.getWindowSize()) || (_activeResends > 0) ||
(_lastSendId - _highestAckedThrough > _options.getWindowSize()) ) {
if (writeExpire > 0) { if (writeExpire > 0) {
if (timeLeft <= 0) { if (timeLeft <= 0) {
_log.error("Outbound window is full of " + _outboundPackets.size() _log.error("Outbound window is full of " + _outboundPackets.size()

View File

@ -167,11 +167,14 @@ public class ConnectionPacketHandler {
// non-ack message payloads are queued in the MessageInputStream // non-ack message payloads are queued in the MessageInputStream
packet.releasePayload(); packet.releasePayload();
} }
//if (choke)
// con.fastRetransmit();
} }
private boolean ack(Connection con, long ackThrough, long nacks[], Packet packet, boolean isNew, boolean choke) { private boolean ack(Connection con, long ackThrough, long nacks[], Packet packet, boolean isNew, boolean choke) {
if ( (nacks != null) && (nacks.length > 0) ) //if ( (nacks != null) && (nacks.length > 0) )
con.getOptions().setRTT(con.getOptions().getRTT() + nacks.length*1000); // con.getOptions().setRTT(con.getOptions().getRTT() + nacks.length*1000);
int numResends = 0; int numResends = 0;
List acked = con.ackPackets(ackThrough, nacks); List acked = con.ackPackets(ackThrough, nacks);

View File

@ -22,6 +22,7 @@ import net.i2p.stat.StatManager;
import net.i2p.util.Clock; import net.i2p.util.Clock;
import net.i2p.util.LogManager; import net.i2p.util.LogManager;
import net.i2p.util.RandomSource; import net.i2p.util.RandomSource;
import net.i2p.util.PooledRandomSource;
/** /**
* <p>Provide a base scope for accessing singletons that I2P exposes. Rather than * <p>Provide a base scope for accessing singletons that I2P exposes. Rather than
@ -432,7 +433,7 @@ public class I2PAppContext {
private void initializeRandom() { private void initializeRandom() {
synchronized (this) { synchronized (this) {
if (_random == null) if (_random == null)
_random = new RandomSource(this); _random = new PooledRandomSource(this);
_randomInitialized = true; _randomInitialized = true;
} }
} }

View File

@ -108,7 +108,29 @@ public class DecayingBloomFilter {
} }
} }
/**
* return true if the entry is already known. this does NOT add the
* entry however.
*
*/
public boolean isKnown(long entry) {
synchronized (this) {
if (_entryBytes <= 7)
entry &= _longToEntryMask;
if (entry < 0) {
DataHelper.toLong(_longToEntry, 0, _entryBytes, 0-entry);
_longToEntry[0] |= (1 << 7);
} else {
DataHelper.toLong(_longToEntry, 0, _entryBytes, entry);
}
return locked_add(_longToEntry, false);
}
}
private boolean locked_add(byte entry[]) { private boolean locked_add(byte entry[]) {
return locked_add(entry, true);
}
private boolean locked_add(byte entry[], boolean addIfNew) {
if (_extended != null) { if (_extended != null) {
// extend the entry to 32 bytes // extend the entry to 32 bytes
System.arraycopy(entry, 0, _extended, 0, entry.length); System.arraycopy(entry, 0, _extended, 0, entry.length);
@ -121,8 +143,10 @@ public class DecayingBloomFilter {
_currentDuplicates++; _currentDuplicates++;
return true; return true;
} else { } else {
if (addIfNew) {
_current.insert(_extended); _current.insert(_extended);
_previous.insert(_extended); _previous.insert(_extended);
}
return false; return false;
} }
} else { } else {
@ -132,8 +156,10 @@ public class DecayingBloomFilter {
_currentDuplicates++; _currentDuplicates++;
return true; return true;
} else { } else {
if (addIfNew) {
_current.locked_insert(entry); _current.locked_insert(entry);
_previous.locked_insert(entry); _previous.locked_insert(entry);
}
return false; return false;
} }
} }

View File

@ -140,8 +140,8 @@ public class LogManager {
public Log getLog(String name) { return getLog(null, name); } public Log getLog(String name) { return getLog(null, name); }
public Log getLog(Class cls, String name) { public Log getLog(Class cls, String name) {
Log rv = null; Log rv = null;
synchronized (_logs) {
String scope = Log.getScope(name, cls); String scope = Log.getScope(name, cls);
synchronized (_logs) {
rv = (Log)_logs.get(scope); rv = (Log)_logs.get(scope);
if (rv == null) { if (rv == null) {
rv = new Log(this, cls, name); rv = new Log(this, cls, name);
@ -154,10 +154,7 @@ public class LogManager {
public List getLogs() { public List getLogs() {
List rv = null; List rv = null;
synchronized (_logs) { synchronized (_logs) {
rv = new ArrayList(_logs.size()); rv = new ArrayList(_logs.values());
for (Iterator iter = _logs.values().iterator(); iter.hasNext(); ) {
rv.add(iter.next());
}
} }
return rv; return rv;
} }

View File

@ -0,0 +1,141 @@
package net.i2p.util;
/*
* free (adj.): unencumbered; not under the control of others
* Written by jrandom in 2005 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
import net.i2p.I2PAppContext;
import net.i2p.crypto.EntropyHarvester;
/**
* Maintain a set of PRNGs to feed the apps
*/
public class PooledRandomSource extends RandomSource {
private Log _log;
private RandomSource _pool[];
private volatile int _nextPool;
private static final int POOL_SIZE = 16;
public PooledRandomSource(I2PAppContext context) {
super(context);
_log = context.logManager().getLog(PooledRandomSource.class);
_pool = new RandomSource[POOL_SIZE];
for (int i = 0; i < POOL_SIZE; i++) {
_pool[i] = new RandomSource(context);
_pool[i].nextBoolean();
}
_nextPool = 0;
}
private final RandomSource pickPRNG() {
return _pool[(_nextPool++) % POOL_SIZE];
}
/**
* According to the java docs (http://java.sun.com/j2se/1.4.1/docs/api/java/util/Random.html#nextInt(int))
* nextInt(n) should return a number between 0 and n (including 0 and excluding n). However, their pseudocode,
* as well as sun's, kaffe's, and classpath's implementation INCLUDES NEGATIVE VALUES.
* WTF. Ok, so we're going to have it return between 0 and n (including 0, excluding n), since
* thats what it has been used for.
*
*/
public int nextInt(int n) {
RandomSource prng = pickPRNG();
synchronized (prng) {
return prng.nextInt(n);
}
}
/**
* Like the modified nextInt, nextLong(n) returns a random number from 0 through n,
* including 0, excluding n.
*/
public long nextLong(long n) {
RandomSource prng = pickPRNG();
synchronized (prng) {
return prng.nextLong(n);
}
}
/**
* override as synchronized, for those JVMs that don't always pull via
* nextBytes (cough ibm)
*/
public boolean nextBoolean() {
RandomSource prng = pickPRNG();
synchronized (prng) {
return prng.nextBoolean();
}
}
/**
* override as synchronized, for those JVMs that don't always pull via
* nextBytes (cough ibm)
*/
public void nextBytes(byte buf[]) {
RandomSource prng = pickPRNG();
synchronized (prng) {
prng.nextBytes(buf);
}
}
/**
* override as synchronized, for those JVMs that don't always pull via
* nextBytes (cough ibm)
*/
public double nextDouble() {
RandomSource prng = pickPRNG();
synchronized (prng) {
return prng.nextDouble();
}
}
/**
* override as synchronized, for those JVMs that don't always pull via
* nextBytes (cough ibm)
*/
public float nextFloat() {
RandomSource prng = pickPRNG();
synchronized (prng) {
return prng.nextFloat();
}
}
/**
* override as synchronized, for those JVMs that don't always pull via
* nextBytes (cough ibm)
*/
public double nextGaussian() {
RandomSource prng = pickPRNG();
synchronized (prng) {
return prng.nextGaussian();
}
}
/**
* override as synchronized, for those JVMs that don't always pull via
* nextBytes (cough ibm)
*/
public int nextInt() {
RandomSource prng = pickPRNG();
synchronized (prng) {
return prng.nextInt();
}
}
/**
* override as synchronized, for those JVMs that don't always pull via
* nextBytes (cough ibm)
*/
public long nextLong() {
RandomSource prng = pickPRNG();
synchronized (prng) {
return prng.nextLong();
}
}
public EntropyHarvester harvester() {
RandomSource prng = pickPRNG();
return prng.harvester();
}
}

View File

@ -42,7 +42,7 @@ public class RandomSource extends SecureRandom {
* thats what it has been used for. * thats what it has been used for.
* *
*/ */
public synchronized int nextInt(int n) { public int nextInt(int n) {
if (n == 0) return 0; if (n == 0) return 0;
int val = super.nextInt(n); int val = super.nextInt(n);
if (val < 0) val = 0 - val; if (val < 0) val = 0 - val;
@ -54,7 +54,7 @@ public class RandomSource extends SecureRandom {
* Like the modified nextInt, nextLong(n) returns a random number from 0 through n, * Like the modified nextInt, nextLong(n) returns a random number from 0 through n,
* including 0, excluding n. * including 0, excluding n.
*/ */
public synchronized long nextLong(long n) { public long nextLong(long n) {
long v = super.nextLong(); long v = super.nextLong();
if (v < 0) v = 0 - v; if (v < 0) v = 0 - v;
if (v >= n) v = v % n; if (v >= n) v = v % n;
@ -65,37 +65,37 @@ public class RandomSource extends SecureRandom {
* override as synchronized, for those JVMs that don't always pull via * override as synchronized, for those JVMs that don't always pull via
* nextBytes (cough ibm) * nextBytes (cough ibm)
*/ */
public synchronized boolean nextBoolean() { return super.nextBoolean(); } public boolean nextBoolean() { return super.nextBoolean(); }
/** /**
* override as synchronized, for those JVMs that don't always pull via * override as synchronized, for those JVMs that don't always pull via
* nextBytes (cough ibm) * nextBytes (cough ibm)
*/ */
public synchronized void nextBytes(byte buf[]) { super.nextBytes(buf); } public void nextBytes(byte buf[]) { super.nextBytes(buf); }
/** /**
* override as synchronized, for those JVMs that don't always pull via * override as synchronized, for those JVMs that don't always pull via
* nextBytes (cough ibm) * nextBytes (cough ibm)
*/ */
public synchronized double nextDouble() { return super.nextDouble(); } public double nextDouble() { return super.nextDouble(); }
/** /**
* override as synchronized, for those JVMs that don't always pull via * override as synchronized, for those JVMs that don't always pull via
* nextBytes (cough ibm) * nextBytes (cough ibm)
*/ */
public synchronized float nextFloat() { return super.nextFloat(); } public float nextFloat() { return super.nextFloat(); }
/** /**
* override as synchronized, for those JVMs that don't always pull via * override as synchronized, for those JVMs that don't always pull via
* nextBytes (cough ibm) * nextBytes (cough ibm)
*/ */
public synchronized double nextGaussian() { return super.nextGaussian(); } public double nextGaussian() { return super.nextGaussian(); }
/** /**
* override as synchronized, for those JVMs that don't always pull via * override as synchronized, for those JVMs that don't always pull via
* nextBytes (cough ibm) * nextBytes (cough ibm)
*/ */
public synchronized int nextInt() { return super.nextInt(); } public int nextInt() { return super.nextInt(); }
/** /**
* override as synchronized, for those JVMs that don't always pull via * override as synchronized, for those JVMs that don't always pull via
* nextBytes (cough ibm) * nextBytes (cough ibm)
*/ */
public synchronized long nextLong() { return super.nextLong(); } public long nextLong() { return super.nextLong(); }
public EntropyHarvester harvester() { return _entropyHarvester; } public EntropyHarvester harvester() { return _entropyHarvester; }

View File

@ -1,4 +1,15 @@
$Id: history.txt,v 1.196 2005/04/20 14:15:28 jrandom Exp $ $Id: history.txt,v 1.197 2005/04/20 15:14:17 jrandom Exp $
2005-04-24 jrandom
* Added a pool of PRNGs using a different synchronization technique,
hopefully sufficient to work around IBM's PRNG bugs until we get our
own Fortuna.
* In the streaming lib, don't jack up the RTT on NACK, and have the window
size bound the not-yet-ready messages to the peer, not the unacked
message count (not sure yet whether this is worthwile).
* Many additions to the messageHistory log.
* Handle out of order tunnel fragment delivery (not an issue on the live
net with TCP, but critical with UDP).
* 2005-04-20 0.5.0.7 released * 2005-04-20 0.5.0.7 released

View File

@ -150,6 +150,7 @@ public class InNetMessagePool implements Service {
shortCircuitTunnelData(messageBody, fromRouterHash); shortCircuitTunnelData(messageBody, fromRouterHash);
allowMatches = false; allowMatches = false;
} else { } else {
if ( (type > 0) && (type < _handlerJobBuilders.length) ) {
HandlerJobBuilder builder = _handlerJobBuilders[type]; HandlerJobBuilder builder = _handlerJobBuilders[type];
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@ -165,6 +166,7 @@ public class InNetMessagePool implements Service {
} }
} }
} }
}
if (allowMatches) { if (allowMatches) {
List origMessages = _context.messageRegistry().getOriginalMessages(messageBody); List origMessages = _context.messageRegistry().getOriginalMessages(messageBody);

View File

@ -12,6 +12,7 @@ import java.util.TimeZone;
import net.i2p.data.Hash; import net.i2p.data.Hash;
import net.i2p.data.TunnelId; import net.i2p.data.TunnelId;
import net.i2p.data.i2np.I2NPMessage; import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.tunnel.HopConfig;
import net.i2p.util.Log; import net.i2p.util.Log;
/** /**
@ -178,10 +179,33 @@ public class MessageHistory {
if (tunnel == null) return; if (tunnel == null) return;
StringBuffer buf = new StringBuffer(128); StringBuffer buf = new StringBuffer(128);
buf.append(getPrefix()); buf.append(getPrefix());
buf.append("joining tunnel [").append(tunnel.getReceiveTunnelId(0).getTunnelId()).append("] as [").append(state).append("] "); buf.append("joining as [").append(state);
buf.append("] to tunnel: ").append(tunnel.toString());
addEntry(buf.toString()); addEntry(buf.toString());
} }
/**
* The local router has joined the given tunnel operating in the given state.
*
* @param state {"free inbound", "allocated inbound", "inactive inbound", "outbound", "participant", "pending"}
* @param tunnel tunnel joined
*/
public void tunnelJoined(String state, HopConfig tunnel) {
if (!_doLog) return;
if (tunnel == null) return;
StringBuffer buf = new StringBuffer(128);
buf.append(getPrefix());
buf.append("joining as [").append(state);
buf.append("] to tunnel: ").append(tunnel.toString());
addEntry(buf.toString());
}
public void tunnelDispatched(String info) {
if (!_doLog) return;
if (info == null) return;
addEntry(getPrefix() + "tunnel dispatched: " + info);
}
/** /**
* The local router has detected a failure in the given tunnel * The local router has detected a failure in the given tunnel
* *
@ -352,9 +376,6 @@ public class MessageHistory {
buf.append("from [").append(getName(from)).append("] "); buf.append("from [").append(getName(from)).append("] ");
buf.append("expiring on [").append(getTime(expiration)).append("] valid? ").append(isValid); buf.append("expiring on [").append(getTime(expiration)).append("] valid? ").append(isValid);
addEntry(buf.toString()); addEntry(buf.toString());
if (messageType.equals("net.i2p.data.i2np.TunnelMessage")) {
//_log.warn("ReceiveMessage tunnel message ["+messageId+"]", new Exception("Receive tunnel"));
}
} }
public void receiveMessage(String messageType, long messageId, long expiration, boolean isValid) { public void receiveMessage(String messageType, long messageId, long expiration, boolean isValid) {
receiveMessage(messageType, messageId, expiration, null, isValid); receiveMessage(messageType, messageId, expiration, null, isValid);
@ -404,12 +425,13 @@ public class MessageHistory {
addEntry(buf.toString()); addEntry(buf.toString());
} }
public void receiveTunnelFragment(long messageId, int fragmentId) { public void receiveTunnelFragment(long messageId, int fragmentId, String status) {
if (!_doLog) return; if (!_doLog) return;
if (messageId == -1) throw new IllegalArgumentException("why are you -1?"); if (messageId == -1) throw new IllegalArgumentException("why are you -1?");
StringBuffer buf = new StringBuffer(48); StringBuffer buf = new StringBuffer(48);
buf.append(getPrefix()); buf.append(getPrefix());
buf.append("Receive fragment ").append(fragmentId).append(" in ").append(messageId); buf.append("Receive fragment ").append(fragmentId).append(" in ").append(messageId);
buf.append(" status: ").append(status);
addEntry(buf.toString()); addEntry(buf.toString());
} }
public void receiveTunnelFragmentComplete(long messageId) { public void receiveTunnelFragmentComplete(long messageId) {
@ -420,12 +442,13 @@ public class MessageHistory {
buf.append("Receive fragmented message completely: ").append(messageId); buf.append("Receive fragmented message completely: ").append(messageId);
addEntry(buf.toString()); addEntry(buf.toString());
} }
public void droppedFragmentedMessage(long messageId) { public void droppedFragmentedMessage(long messageId, String status) {
if (!_doLog) return; if (!_doLog) return;
if (messageId == -1) throw new IllegalArgumentException("why are you -1?"); if (messageId == -1) throw new IllegalArgumentException("why are you -1?");
StringBuffer buf = new StringBuffer(48); StringBuffer buf = new StringBuffer(48);
buf.append(getPrefix()); buf.append(getPrefix());
buf.append("Fragmented message dropped: ").append(messageId); buf.append("Fragmented message dropped: ").append(messageId);
buf.append(" ").append(status);
addEntry(buf.toString()); addEntry(buf.toString());
} }
public void fragmentMessage(long messageId, int numFragments) { public void fragmentMessage(long messageId, int numFragments) {
@ -436,6 +459,15 @@ public class MessageHistory {
buf.append("Break message ").append(messageId).append(" into fragments: ").append(numFragments); buf.append("Break message ").append(messageId).append(" into fragments: ").append(numFragments);
addEntry(buf.toString()); addEntry(buf.toString());
} }
public void fragmentMessage(long messageId, int numFragments, String tunnel) {
if (!_doLog) return;
if (messageId == -1) throw new IllegalArgumentException("why are you -1?");
StringBuffer buf = new StringBuffer(48);
buf.append(getPrefix());
buf.append("Break message ").append(messageId).append(" into fragments: ").append(numFragments);
buf.append(" on ").append(tunnel);
addEntry(buf.toString());
}
public void droppedTunnelDataMessageUnknown(long msgId, long tunnelId) { public void droppedTunnelDataMessageUnknown(long msgId, long tunnelId) {
if (!_doLog) return; if (!_doLog) return;
if (msgId == -1) throw new IllegalArgumentException("why are you -1?"); if (msgId == -1) throw new IllegalArgumentException("why are you -1?");

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
* *
*/ */
public class RouterVersion { public class RouterVersion {
public final static String ID = "$Revision: 1.188 $ $Date: 2005/04/20 14:15:25 $"; public final static String ID = "$Revision: 1.189 $ $Date: 2005/04/20 15:14:19 $";
public final static String VERSION = "0.5.0.7"; public final static String VERSION = "0.5.0.7";
public final static long BUILD = 0; public final static long BUILD = 1;
public static void main(String args[]) { public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION); System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID); System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -24,6 +24,7 @@ public class ACKSender implements Runnable {
_fragments = fragments; _fragments = fragments;
_transport = transport; _transport = transport;
_builder = new PacketBuilder(_context); _builder = new PacketBuilder(_context);
_context.statManager().createRateStat("udp.sendACKCount", "how many ack messages were sent to a peer", "udp", new long[] { 60*1000, 60*60*1000 });
} }
public void run() { public void run() {
@ -32,6 +33,8 @@ public class ACKSender implements Runnable {
if (peer != null) { if (peer != null) {
List acks = peer.retrieveACKs(); List acks = peer.retrieveACKs();
if ( (acks != null) && (acks.size() > 0) ) { if ( (acks != null) && (acks.size() > 0) ) {
_context.statManager().addRateData("udp.sendACKCount", acks.size(), 0);
_context.statManager().getStatLog().addData(peer.getRemoteHostString(), "udp.peer.sendACKCount", acks.size(), 0);
UDPPacket ack = _builder.buildACK(peer, acks); UDPPacket ack = _builder.buildACK(peer, acks);
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Sending ACK for " + acks); _log.info("Sending ACK for " + acks);

View File

@ -6,6 +6,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.util.DecayingBloomFilter;
import net.i2p.util.I2PThread; import net.i2p.util.I2PThread;
import net.i2p.util.Log; import net.i2p.util.Log;
@ -28,8 +29,8 @@ public class InboundMessageFragments {
private List _unsentACKs; private List _unsentACKs;
/** list of messages (InboundMessageState) fully received but not interpreted yet */ /** list of messages (InboundMessageState) fully received but not interpreted yet */
private List _completeMessages; private List _completeMessages;
/** list of message IDs (Long) recently received, so we can ignore in flight dups */ /** list of message IDs recently received, so we can ignore in flight dups */
private List _recentlyCompletedMessages; private DecayingBloomFilter _recentlyCompletedMessages;
private OutboundMessageFragments _outbound; private OutboundMessageFragments _outbound;
private UDPTransport _transport; private UDPTransport _transport;
/** this can be broken down further, but to start, OneBigLock does the trick */ /** this can be broken down further, but to start, OneBigLock does the trick */
@ -39,6 +40,8 @@ public class InboundMessageFragments {
private static final int RECENTLY_COMPLETED_SIZE = 100; private static final int RECENTLY_COMPLETED_SIZE = 100;
/** how frequently do we want to send ACKs to a peer? */ /** how frequently do we want to send ACKs to a peer? */
private static final int ACK_FREQUENCY = 200; private static final int ACK_FREQUENCY = 200;
/** decay the recently completed every 2 minutes */
private static final int DECAY_PERIOD = 120*1000;
public InboundMessageFragments(RouterContext ctx, OutboundMessageFragments outbound, UDPTransport transport) { public InboundMessageFragments(RouterContext ctx, OutboundMessageFragments outbound, UDPTransport transport) {
_context = ctx; _context = ctx;
@ -46,7 +49,6 @@ public class InboundMessageFragments {
_inboundMessages = new HashMap(64); _inboundMessages = new HashMap(64);
_unsentACKs = new ArrayList(64); _unsentACKs = new ArrayList(64);
_completeMessages = new ArrayList(64); _completeMessages = new ArrayList(64);
_recentlyCompletedMessages = new ArrayList(RECENTLY_COMPLETED_SIZE);
_outbound = outbound; _outbound = outbound;
_transport = transport; _transport = transport;
_context.statManager().createRateStat("udp.receivedCompleteTime", "How long it takes to receive a full message", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.receivedCompleteTime", "How long it takes to receive a full message", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
@ -60,6 +62,11 @@ public class InboundMessageFragments {
public void startup() { public void startup() {
_alive = true; _alive = true;
// may want to extend the DecayingBloomFilter so we can use a smaller
// array size (currently its tuned for 10 minute rates for the
// messageValidator)
_recentlyCompletedMessages = new DecayingBloomFilter(_context, DECAY_PERIOD, 8);
I2PThread t = new I2PThread(new ACKSender(_context, this, _transport), "UDP ACK sender"); I2PThread t = new I2PThread(new ACKSender(_context, this, _transport), "UDP ACK sender");
t.setDaemon(true); t.setDaemon(true);
t.start(); t.start();
@ -70,6 +77,9 @@ public class InboundMessageFragments {
} }
public void shutdown() { public void shutdown() {
_alive = false; _alive = false;
if (_recentlyCompletedMessages != null)
_recentlyCompletedMessages.stopDecaying();
_recentlyCompletedMessages = null;
synchronized (_stateLock) { synchronized (_stateLock) {
_completeMessages.clear(); _completeMessages.clear();
_unsentACKs.clear(); _unsentACKs.clear();
@ -112,8 +122,15 @@ public class InboundMessageFragments {
for (int i = 0; i < fragments; i++) { for (int i = 0; i < fragments; i++) {
Long messageId = new Long(data.readMessageId(i)); Long messageId = new Long(data.readMessageId(i));
if (_recentlyCompletedMessages.contains(messageId)) { if (_recentlyCompletedMessages.isKnown(messageId.longValue())) {
_context.statManager().addRateData("udp.ignoreRecentDuplicate", 1, 0); _context.statManager().addRateData("udp.ignoreRecentDuplicate", 1, 0);
from.messageFullyReceived(messageId);
if (!_unsentACKs.contains(from))
_unsentACKs.add(from);
if (_log.shouldLog(Log.WARN))
_log.warn("Message received is a dup: " + messageId + " dups: "
+ _recentlyCompletedMessages.getCurrentDuplicateCount() + " out of "
+ _recentlyCompletedMessages.getInsertedCount());
continue; continue;
} }
@ -132,9 +149,7 @@ public class InboundMessageFragments {
messageComplete = true; messageComplete = true;
messages.remove(messageId); messages.remove(messageId);
while (_recentlyCompletedMessages.size() >= RECENTLY_COMPLETED_SIZE) _recentlyCompletedMessages.add(messageId.longValue());
_recentlyCompletedMessages.remove(0);
_recentlyCompletedMessages.add(messageId);
_completeMessages.add(state); _completeMessages.add(state);
@ -169,12 +184,15 @@ public class InboundMessageFragments {
long acks[] = data.readACKs(); long acks[] = data.readACKs();
if (acks != null) { if (acks != null) {
_context.statManager().addRateData("udp.receivedACKs", acks.length, 0); _context.statManager().addRateData("udp.receivedACKs", acks.length, 0);
_context.statManager().getStatLog().addData(from.getRemoteHostString(), "udp.peer.receiveACKCount", acks.length, 0);
for (int i = 0; i < acks.length; i++) { for (int i = 0; i < acks.length; i++) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Full ACK of message " + acks[i] + " received!"); _log.info("Full ACK of message " + acks[i] + " received!");
fragments += _outbound.acked(acks[i], from.getRemotePeer()); fragments += _outbound.acked(acks[i], from.getRemotePeer());
} }
from.messageACKed(fragments * from.getMTU()); // estimated size } else {
_log.error("Received ACKs with no acks?! " + data);
} }
} }
if (data.readECN()) if (data.readECN())
@ -212,7 +230,8 @@ public class InboundMessageFragments {
synchronized (_stateLock) { synchronized (_stateLock) {
for (int i = 0; i < _unsentACKs.size(); i++) { for (int i = 0; i < _unsentACKs.size(); i++) {
PeerState peer = (PeerState)_unsentACKs.get(i); PeerState peer = (PeerState)_unsentACKs.get(i);
if (peer.getLastACKSend() + ACK_FREQUENCY <= now) { if ( (peer.getLastACKSend() + ACK_FREQUENCY <= now) ||
(peer.unsentACKThresholdReached()) ) {
_unsentACKs.remove(i); _unsentACKs.remove(i);
peer.setLastACKSend(now); peer.setLastACKSend(now);
return peer; return peer;

View File

@ -31,6 +31,8 @@ public class OutboundMessageFragments {
/** which message should we build the next packet out of? */ /** which message should we build the next packet out of? */
private int _nextPacketMessage; private int _nextPacketMessage;
private PacketBuilder _builder; private PacketBuilder _builder;
/** if we can handle more messages explicitly, set this to true */
private boolean _allowExcess;
private static final int MAX_ACTIVE = 64; private static final int MAX_ACTIVE = 64;
// don't send a packet more than 10 times // don't send a packet more than 10 times
@ -44,12 +46,14 @@ public class OutboundMessageFragments {
_nextPacketMessage = 0; _nextPacketMessage = 0;
_builder = new PacketBuilder(ctx); _builder = new PacketBuilder(ctx);
_alive = true; _alive = true;
_allowExcess = false;
_context.statManager().createRateStat("udp.sendVolleyTime", "Long it takes to send a full volley", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.sendVolleyTime", "Long it takes to send a full volley", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.sendConfirmTime", "How long it takes to send a message and get the ACK", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.sendConfirmTime", "How long it takes to send a message and get the ACK", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.sendConfirmFragments", "How many fragments are included in a fully ACKed message", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.sendConfirmFragments", "How many fragments are included in a fully ACKed message", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.sendConfirmVolley", "How many times did fragments need to be sent before ACK", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.sendConfirmVolley", "How many times did fragments need to be sent before ACK", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.sendFailed", "How many fragments were in a message that couldn't be delivered", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.sendFailed", "How many fragments were in a message that couldn't be delivered", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.sendAggressiveFailed", "How many volleys was a packet sent before we gave up", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.sendAggressiveFailed", "How many volleys was a packet sent before we gave up", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.outboundActiveCount", "How many messages are in the active pool when a new one is added", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
} }
public void startup() { _alive = true; } public void startup() { _alive = true; }
@ -75,6 +79,8 @@ public class OutboundMessageFragments {
return false; return false;
else if (_activeMessages.size() < MAX_ACTIVE) else if (_activeMessages.size() < MAX_ACTIVE)
return true; return true;
else if (_allowExcess)
return true;
else else
_activeMessages.wait(); _activeMessages.wait();
} }
@ -90,12 +96,16 @@ public class OutboundMessageFragments {
public void add(OutNetMessage msg) { public void add(OutNetMessage msg) {
OutboundMessageState state = new OutboundMessageState(_context); OutboundMessageState state = new OutboundMessageState(_context);
boolean ok = state.initialize(msg); boolean ok = state.initialize(msg);
state.setPeer(_transport.getPeerState(msg.getTarget().getIdentity().calculateHash()));
finishMessages(); finishMessages();
int active = 0;
synchronized (_activeMessages) { synchronized (_activeMessages) {
if (ok) if (ok)
_activeMessages.add(state); _activeMessages.add(state);
active = _activeMessages.size();
_activeMessages.notifyAll(); _activeMessages.notifyAll();
} }
_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
} }
/** /**
@ -156,6 +166,9 @@ public class OutboundMessageFragments {
} }
} }
private static final long SECOND_MASK = 1023l;
/** /**
* Grab the next packet that we want to send, blocking until one is ready. * Grab the next packet that we want to send, blocking until one is ready.
* This is the main driver for the packet scheduler * This is the main driver for the packet scheduler
@ -208,17 +221,31 @@ public class OutboundMessageFragments {
+ " remaining" + " remaining"
+ " for message " + state.getMessageId() + ": " + state); + " for message " + state.getMessageId() + ": " + state);
if (state.justBeganVolley() && (state.getPushCount() > 0) && (state.getFragmentCount() > 1)) {
peer.messageRetransmitted();
if (_log.shouldLog(Log.ERROR))
_log.error("Retransmitting " + state + " to " + peer);
}
// for fairness, we move on in a round robin // for fairness, we move on in a round robin
_nextPacketMessage = i + 1; _nextPacketMessage = i + 1;
if (state.getPushCount() != oldVolley) { if (currentFragment >= state.getFragmentCount() - 1) {
// this is the last fragment
_context.statManager().addRateData("udp.sendVolleyTime", state.getLifetime(), state.getFragmentCount()); _context.statManager().addRateData("udp.sendVolleyTime", state.getLifetime(), state.getFragmentCount());
state.setNextSendTime(now + (1000-(now%1000)) + _context.random().nextInt(4000)); if (state.getPeer() != null) {
int rto = state.getPeer().getRTO() * state.getPushCount();
//_log.error("changed volley, rto=" + rto + " volley="+ state.getPushCount());
state.setNextSendTime(now + rto);
} else {
_log.error("changed volley, unknown peer");
state.setNextSendTime(now + 1000 + _context.random().nextInt(2000));
}
} else { } else {
if (peer.getSendWindowBytesRemaining() > 0) if (peer.getSendWindowBytesRemaining() > 0)
state.setNextSendTime(now); state.setNextSendTime(now);
else else
state.setNextSendTime(now + (1000-(now%1000))); state.setNextSendTime((now + 1024) & ~SECOND_MASK);
} }
break; break;
} else { } else {
@ -226,7 +253,7 @@ public class OutboundMessageFragments {
_log.warn("Allocation of " + fragmentSize + " rejected w/ wsize=" + peer.getSendWindowBytes() _log.warn("Allocation of " + fragmentSize + " rejected w/ wsize=" + peer.getSendWindowBytes()
+ " available=" + peer.getSendWindowBytesRemaining() + " available=" + peer.getSendWindowBytesRemaining()
+ " for message " + state.getMessageId() + ": " + state); + " for message " + state.getMessageId() + ": " + state);
state.setNextSendTime(now + (1000-(now%1000))); state.setNextSendTime((now + 1024) & ~SECOND_MASK);
currentFragment = -1; currentFragment = -1;
} }
} }
@ -234,7 +261,7 @@ public class OutboundMessageFragments {
long time = state.getNextSendTime(); long time = state.getNextSendTime();
if ( (nextSend < 0) || (time < nextSend) ) if ( (nextSend < 0) || (time < nextSend) )
nextSend = time; nextSend = time;
} } // end of the for(activeMessages)
if (currentFragment < 0) { if (currentFragment < 0) {
if (nextSend <= 0) { if (nextSend <= 0) {
@ -248,13 +275,16 @@ public class OutboundMessageFragments {
delay = 10; delay = 10;
if (delay > 1000) if (delay > 1000)
delay = 1000; delay = 1000;
_allowExcess = true;
_activeMessages.notifyAll();
try { try {
_activeMessages.wait(delay); _activeMessages.wait(delay);
} catch (InterruptedException ie) {} } catch (InterruptedException ie) {}
} }
} }
} _allowExcess = false;
} } // end of the synchronized block
} // end of the while (alive && !found)
if (currentFragment >= 0) { if (currentFragment >= 0) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
@ -269,8 +299,8 @@ public class OutboundMessageFragments {
} }
private static final int SSU_HEADER_SIZE = 46; private static final int SSU_HEADER_SIZE = 46;
private static final int UDP_HEADER_SIZE = 8; static final int UDP_HEADER_SIZE = 8;
private static final int IP_HEADER_SIZE = 20; static final int IP_HEADER_SIZE = 20;
/** how much payload data can we shove in there? */ /** how much payload data can we shove in there? */
private static final int fragmentSize(int mtu) { private static final int fragmentSize(int mtu) {
return mtu - SSU_HEADER_SIZE - UDP_HEADER_SIZE - IP_HEADER_SIZE; return mtu - SSU_HEADER_SIZE - UDP_HEADER_SIZE - IP_HEADER_SIZE;
@ -309,17 +339,23 @@ public class OutboundMessageFragments {
} }
if (state != null) { if (state != null) {
int numSends = state.getMaxSends();
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Received ack of " + messageId + " by " + ackedBy.toBase64() _log.info("Received ack of " + messageId + " by " + ackedBy.toBase64()
+ " after " + state.getLifetime()); + " after " + state.getLifetime() + " and " + numSends + " sends");
_context.statManager().addRateData("udp.sendConfirmTime", state.getLifetime(), state.getLifetime()); _context.statManager().addRateData("udp.sendConfirmTime", state.getLifetime(), state.getLifetime());
_context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount(), state.getLifetime()); _context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount(), state.getLifetime());
int numSends = state.getMaxSends();
_context.statManager().addRateData("udp.sendConfirmVolley", numSends, state.getFragmentCount()); _context.statManager().addRateData("udp.sendConfirmVolley", numSends, state.getFragmentCount());
if ( (numSends > 1) && (state.getPeer() != null) ) if ( (numSends > 1) && (state.getPeer() != null) )
state.getPeer().congestionOccurred(); state.getPeer().congestionOccurred();
_transport.succeeded(state.getMessage()); _transport.succeeded(state.getMessage());
int numFragments = state.getFragmentCount(); int numFragments = state.getFragmentCount();
if (state.getPeer() != null) {
// this adjusts the rtt/rto/window/etc
state.getPeer().messageACKed(numFragments*state.getFragmentSize(), state.getLifetime(), state.getMaxSends());
} else {
_log.warn("message acked, but no peer attacked: " + state);
}
state.releaseResources(); state.releaseResources();
return numFragments; return numFragments;
} else { } else {

View File

@ -31,6 +31,7 @@ public class OutboundMessageState {
private long _nextSendTime; private long _nextSendTime;
private int _pushCount; private int _pushCount;
private short _maxSends; private short _maxSends;
private int _nextSendFragment;
public static final int MAX_FRAGMENTS = 32; public static final int MAX_FRAGMENTS = 32;
private static final ByteCache _cache = ByteCache.getInstance(64, MAX_FRAGMENTS*1024); private static final ByteCache _cache = ByteCache.getInstance(64, MAX_FRAGMENTS*1024);
@ -40,6 +41,7 @@ public class OutboundMessageState {
_log = _context.logManager().getLog(OutboundMessageState.class); _log = _context.logManager().getLog(OutboundMessageState.class);
_pushCount = 0; _pushCount = 0;
_maxSends = 0; _maxSends = 0;
_nextSendFragment = 0;
} }
public synchronized boolean initialize(OutNetMessage msg) { public synchronized boolean initialize(OutNetMessage msg) {
@ -100,6 +102,7 @@ public class OutboundMessageState {
public OutNetMessage getMessage() { return _message; } public OutNetMessage getMessage() { return _message; }
public long getMessageId() { return _messageId; } public long getMessageId() { return _messageId; }
public PeerState getPeer() { return _peer; } public PeerState getPeer() { return _peer; }
public void setPeer(PeerState peer) { _peer = peer; }
public boolean isExpired() { public boolean isExpired() {
return _expiration < _context.clock().now(); return _expiration < _context.clock().now();
} }
@ -160,6 +163,7 @@ public class OutboundMessageState {
else else
return _fragmentSends.length; return _fragmentSends.length;
} }
public int getFragmentSize() { return _fragmentSize; }
/** should we continue sending this fragment? */ /** should we continue sending this fragment? */
public boolean shouldSend(int fragmentNum) { return _fragmentSends[fragmentNum] >= (short)0; } public boolean shouldSend(int fragmentNum) { return _fragmentSends[fragmentNum] >= (short)0; }
public synchronized int fragmentSize(int fragmentNum) { public synchronized int fragmentSize(int fragmentNum) {
@ -178,6 +182,17 @@ public class OutboundMessageState {
* @return fragment index, or -1 if all of the fragments were acked * @return fragment index, or -1 if all of the fragments were acked
*/ */
public int pickNextFragment() { public int pickNextFragment() {
if (true) {
int rv = _nextSendFragment;
_fragmentSends[rv]++;
_maxSends = _fragmentSends[rv];
_nextSendFragment++;
if (_nextSendFragment >= _fragmentSends.length) {
_nextSendFragment = 0;
_pushCount++;
}
return rv;
}
short minValue = -1; short minValue = -1;
int minIndex = -1; int minIndex = -1;
int startOffset = _context.random().nextInt(_fragmentSends.length); int startOffset = _context.random().nextInt(_fragmentSends.length);
@ -207,9 +222,9 @@ public class OutboundMessageState {
break; break;
} }
} }
if (endOfVolley) if (endOfVolley) {
_pushCount++; _pushCount++;
}
if (_log.shouldLog(Log.DEBUG)) { if (_log.shouldLog(Log.DEBUG)) {
StringBuffer buf = new StringBuffer(64); StringBuffer buf = new StringBuffer(64);
@ -224,6 +239,13 @@ public class OutboundMessageState {
return minIndex; return minIndex;
} }
public boolean justBeganVolley() {
if (_fragmentSends.length == 1)
return true;
else
return _nextSendFragment == 1;
}
/** /**
* Write a part of the the message onto the specified buffer. * Write a part of the the message onto the specified buffer.
* *

View File

@ -113,6 +113,12 @@ public class PeerState {
private int _mtu; private int _mtu;
/** when did we last check the MTU? */ /** when did we last check the MTU? */
private long _mtuLastChecked; private long _mtuLastChecked;
/** current round trip time estimate */
private int _rtt;
/** smoothed mean deviation in the rtt */
private int _rttDeviation;
/** current retransmission timeout */
private int _rto;
private long _messagesReceived; private long _messagesReceived;
private long _messagesSent; private long _messagesSent;
@ -120,7 +126,7 @@ public class PeerState {
private static final int DEFAULT_SEND_WINDOW_BYTES = 16*1024; private static final int DEFAULT_SEND_WINDOW_BYTES = 16*1024;
private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES; private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES;
private static final int MAX_SEND_WINDOW_BYTES = 1024*1024; private static final int MAX_SEND_WINDOW_BYTES = 1024*1024;
private static final int DEFAULT_MTU = 1024; private static final int DEFAULT_MTU = 1492;
public PeerState(I2PAppContext ctx) { public PeerState(I2PAppContext ctx) {
_context = ctx; _context = ctx;
@ -153,6 +159,9 @@ public class PeerState {
_mtu = DEFAULT_MTU; _mtu = DEFAULT_MTU;
_mtuLastChecked = -1; _mtuLastChecked = -1;
_lastACKSend = -1; _lastACKSend = -1;
_rtt = 1000;
_rttDeviation = _rtt;
_rto = 6000;
_messagesReceived = 0; _messagesReceived = 0;
_messagesSent = 0; _messagesSent = 0;
} }
@ -328,6 +337,7 @@ public class PeerState {
_sendWindowBytesRemaining = _sendWindowBytes; _sendWindowBytesRemaining = _sendWindowBytes;
_lastSendRefill = now; _lastSendRefill = now;
} }
//if (true) return true;
if (size <= _sendWindowBytesRemaining) { if (size <= _sendWindowBytesRemaining) {
_sendWindowBytesRemaining -= size; _sendWindowBytesRemaining -= size;
_lastSendTime = now; _lastSendTime = now;
@ -393,15 +403,22 @@ public class PeerState {
/** pull off the ACKs (Long) to send to the peer */ /** pull off the ACKs (Long) to send to the peer */
public List retrieveACKs() { public List retrieveACKs() {
List rv = null; List rv = null;
int threshold = countMaxACKs();
synchronized (_currentACKs) { synchronized (_currentACKs) {
if (_currentACKs.size() < threshold) {
rv = new ArrayList(_currentACKs); rv = new ArrayList(_currentACKs);
_currentACKs.clear(); _currentACKs.clear();
} else {
rv = new ArrayList(threshold);
for (int i = 0; i < threshold; i++)
rv.add(_currentACKs.remove(0));
}
} }
return rv; return rv;
} }
/** we sent a message which was ACKed containing the given # of bytes */ /** we sent a message which was ACKed containing the given # of bytes */
public void messageACKed(int bytesACKed) { public void messageACKed(int bytesACKed, long lifetime, int numSends) {
_consecutiveSendingSecondsWithoutACKs = 0; _consecutiveSendingSecondsWithoutACKs = 0;
if (_sendWindowBytes <= _slowStartThreshold) { if (_sendWindowBytes <= _slowStartThreshold) {
_sendWindowBytes += bytesACKed; _sendWindowBytes += bytesACKed;
@ -414,8 +431,36 @@ public class PeerState {
_sendWindowBytes = MAX_SEND_WINDOW_BYTES; _sendWindowBytes = MAX_SEND_WINDOW_BYTES;
_lastReceiveTime = _context.clock().now(); _lastReceiveTime = _context.clock().now();
_messagesSent++; _messagesSent++;
if (numSends <= 2)
recalculateTimeouts(lifetime);
else
_log.warn("acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed);
} }
/** adjust the tcp-esque timeouts */
private void recalculateTimeouts(long lifetime) {
_rttDeviation = _rttDeviation + (int)(0.25d*(Math.abs(lifetime-_rtt)-_rttDeviation));
_rtt = (int)((float)_rtt*(0.9f) + (0.1f)*(float)lifetime);
_rto = _rtt + (_rttDeviation<<2);
if (_log.shouldLog(Log.WARN))
_log.warn("Recalculating timeouts w/ lifetime=" + lifetime + ": rtt=" + _rtt
+ " rttDev=" + _rttDeviation + " rto=" + _rto);
if (_rto < 1000)
_rto = 1000;
if (_rto > 5000)
_rto = 5000;
}
/** we are resending a packet, so lets jack up the rto */
public void messageRetransmitted() {
//_rto *= 2;
}
/** how long does it usually take to get a message ACKed? */
public int getRTT() { return _rtt; }
/** how soon should we retransmit an unacked packet? */
public int getRTO() { return _rto; }
/** how skewed are the measured RTTs? */
public long getRTTDeviation() { return _rttDeviation; }
public long getMessagesSent() { return _messagesSent; } public long getMessagesSent() { return _messagesSent; }
public long getMessagesReceived() { return _messagesReceived; } public long getMessagesReceived() { return _messagesReceived; }
@ -435,6 +480,25 @@ public class PeerState {
/** when did we last send an ACK to the peer? */ /** when did we last send an ACK to the peer? */
public long getLastACKSend() { return _lastACKSend; } public long getLastACKSend() { return _lastACKSend; }
public void setLastACKSend(long when) { _lastACKSend = when; } public void setLastACKSend(long when) { _lastACKSend = when; }
public boolean unsentACKThresholdReached() {
int threshold = countMaxACKs();
synchronized (_currentACKs) {
return _currentACKs.size() >= threshold;
}
}
private int countMaxACKs() {
return (_mtu
- OutboundMessageFragments.IP_HEADER_SIZE
- OutboundMessageFragments.UDP_HEADER_SIZE
- UDPPacket.IV_SIZE
- UDPPacket.MAC_SIZE
- 1 // type flag
- 4 // timestamp
- 1 // data flag
- 1 // # ACKs
- 16 // padding safety
) / 4;
}
public String getRemoteHostString() { return _remoteHostString; } public String getRemoteHostString() { return _remoteHostString; }

View File

@ -24,7 +24,7 @@ import net.i2p.util.Log;
*/ */
public class UDPPacket { public class UDPPacket {
private I2PAppContext _context; private I2PAppContext _context;
private Log _log; private static Log _log;
private DatagramPacket _packet; private DatagramPacket _packet;
private short _priority; private short _priority;
private long _initializeTime; private long _initializeTime;
@ -35,6 +35,7 @@ public class UDPPacket {
private static final List _packetCache; private static final List _packetCache;
static { static {
_packetCache = new ArrayList(256); _packetCache = new ArrayList(256);
_log = I2PAppContext.getGlobalContext().logManager().getLog(UDPPacket.class);
} }
private static final boolean CACHE = false; private static final boolean CACHE = false;
@ -67,7 +68,6 @@ public class UDPPacket {
private UDPPacket(I2PAppContext ctx) { private UDPPacket(I2PAppContext ctx) {
_context = ctx; _context = ctx;
_log = ctx.logManager().getLog(UDPPacket.class);
_dataBuf = _dataCache.acquire(); _dataBuf = _dataCache.acquire();
_data = _dataBuf.getData(); _data = _dataBuf.getData();
_packet = new DatagramPacket(_data, MAX_PACKET_SIZE); _packet = new DatagramPacket(_data, MAX_PACKET_SIZE);

View File

@ -86,7 +86,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private static final int PRIORITY_WEIGHT[] = new int[] { 1, 1, 1, 1, 1, 2 }; private static final int PRIORITY_WEIGHT[] = new int[] { 1, 1, 1, 1, 1, 2 };
/** should we flood all UDP peers with the configured rate? */ /** should we flood all UDP peers with the configured rate? */
private static final boolean SHOULD_FLOOD_PEERS = false; private static final boolean SHOULD_FLOOD_PEERS = true;
public UDPTransport(RouterContext ctx) { public UDPTransport(RouterContext ctx) {
super(ctx); super(ctx);
@ -492,6 +492,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
buf.append(" <tr><td><b>Peer</b></td><td><b>Location</b></td>\n"); buf.append(" <tr><td><b>Peer</b></td><td><b>Location</b></td>\n");
buf.append(" <td><b>Last send</b></td><td><b>Last recv</b></td>\n"); buf.append(" <td><b>Last send</b></td><td><b>Last recv</b></td>\n");
buf.append(" <td><b>Lifetime</b></td><td><b>cwnd</b></td><td><b>ssthresh</b></td>\n"); buf.append(" <td><b>Lifetime</b></td><td><b>cwnd</b></td><td><b>ssthresh</b></td>\n");
buf.append(" <td><b>rtt</b></td><td><b>dev</b></td><td><b>rto</b></td>\n");
buf.append(" <td><b>Sent</b></td><td><b>Received</b></td>\n"); buf.append(" <td><b>Sent</b></td><td><b>Received</b></td>\n");
buf.append(" </tr>\n"); buf.append(" </tr>\n");
out.write(buf.toString()); out.write(buf.toString());
@ -534,11 +535,23 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
buf.append("</td>"); buf.append("</td>");
buf.append("<td>"); buf.append("<td>");
buf.append(peer.getSendWindowBytes()); buf.append(peer.getSendWindowBytes()/1024);
buf.append("K</td>");
buf.append("<td>");
buf.append(peer.getSlowStartThreshold()/1024);
buf.append("K</td>");
buf.append("<td>");
buf.append(peer.getRTT());
buf.append("</td>"); buf.append("</td>");
buf.append("<td>"); buf.append("<td>");
buf.append(peer.getSlowStartThreshold()); buf.append(peer.getRTTDeviation());
buf.append("</td>");
buf.append("<td>");
buf.append(peer.getRTO());
buf.append("</td>"); buf.append("</td>");
buf.append("<td>"); buf.append("<td>");

View File

@ -52,9 +52,11 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
// loops because sends may be partial // loops because sends may be partial
TunnelGateway.Pending msg = (TunnelGateway.Pending)pending.get(0); TunnelGateway.Pending msg = (TunnelGateway.Pending)pending.get(0);
send(pending, 0, 0, sender, rec); send(pending, 0, 0, sender, rec);
if (msg.getOffset() >= msg.getData().length) if (msg.getOffset() >= msg.getData().length) {
notePreprocessing(msg.getMessageId(), msg.getFragmentNumber());
pending.remove(0); pending.remove(0);
} }
}
return false; return false;
} }
@ -84,11 +86,14 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
_log.info("Allocated=" + allocated + " so we sent " + (i+1) _log.info("Allocated=" + allocated + " so we sent " + (i+1)
+ " (last complete? " + (msg.getOffset() >= msg.getData().length) + ")"); + " (last complete? " + (msg.getOffset() >= msg.getData().length) + ")");
for (int j = 0; j < i; j++) for (int j = 0; j < i; j++) {
pending.remove(0); TunnelGateway.Pending cur = (TunnelGateway.Pending)pending.remove(0);
notePreprocessing(cur.getMessageId(), cur.getFragmentNumber());
}
if (msg.getOffset() >= msg.getData().length) { if (msg.getOffset() >= msg.getData().length) {
// ok, this last message fit perfectly, remove it too // ok, this last message fit perfectly, remove it too
pending.remove(0); TunnelGateway.Pending cur = (TunnelGateway.Pending)pending.remove(0);
notePreprocessing(cur.getMessageId(), cur.getFragmentNumber());
} }
if (i > 0) if (i > 0)
_context.statManager().addRateData("tunnel.batchMultipleCount", i+1, 0); _context.statManager().addRateData("tunnel.batchMultipleCount", i+1, 0);
@ -113,7 +118,11 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
_context.statManager().addRateData("tunnel.batchDelaySent", pending.size(), 0); _context.statManager().addRateData("tunnel.batchDelaySent", pending.size(), 0);
send(pending, 0, pending.size()-1, sender, rec); send(pending, 0, pending.size()-1, sender, rec);
pending.clear();
while (pending.size() > 0) {
TunnelGateway.Pending cur = (TunnelGateway.Pending)pending.remove(0);
notePreprocessing(cur.getMessageId(), cur.getFragmentNumber());
}
_pendingSince = 0; _pendingSince = 0;
return false; return false;
} else { } else {

View File

@ -11,6 +11,7 @@ import net.i2p.router.RouterContext;
public class BatchedRouterPreprocessor extends BatchedPreprocessor { public class BatchedRouterPreprocessor extends BatchedPreprocessor {
private RouterContext _routerContext; private RouterContext _routerContext;
private TunnelCreatorConfig _config; private TunnelCreatorConfig _config;
private HopConfig _hopConfig;
/** /**
* How frequently should we flush non-full messages, in milliseconds * How frequently should we flush non-full messages, in milliseconds
@ -20,13 +21,18 @@ public class BatchedRouterPreprocessor extends BatchedPreprocessor {
public static final int DEFAULT_BATCH_FREQUENCY = 500; public static final int DEFAULT_BATCH_FREQUENCY = 500;
public BatchedRouterPreprocessor(RouterContext ctx) { public BatchedRouterPreprocessor(RouterContext ctx) {
this(ctx, null); this(ctx, (HopConfig)null);
} }
public BatchedRouterPreprocessor(RouterContext ctx, TunnelCreatorConfig cfg) { public BatchedRouterPreprocessor(RouterContext ctx, TunnelCreatorConfig cfg) {
super(ctx); super(ctx);
_routerContext = ctx; _routerContext = ctx;
_config = cfg; _config = cfg;
} }
public BatchedRouterPreprocessor(RouterContext ctx, HopConfig cfg) {
super(ctx);
_routerContext = ctx;
_hopConfig = cfg;
}
/** how long should we wait before flushing */ /** how long should we wait before flushing */
protected long getSendDelay() { protected long getSendDelay() {
@ -50,6 +56,9 @@ public class BatchedRouterPreprocessor extends BatchedPreprocessor {
} }
protected void notePreprocessing(long messageId, int numFragments) { protected void notePreprocessing(long messageId, int numFragments) {
_routerContext.messageHistory().fragmentMessage(messageId, numFragments); if (_config != null)
_routerContext.messageHistory().fragmentMessage(messageId, numFragments, _config.toString());
else
_routerContext.messageHistory().fragmentMessage(messageId, numFragments, _hopConfig.toString());
} }
} }

View File

@ -263,7 +263,7 @@ public class FragmentHandler {
SimpleTimer.getInstance().removeEvent(msg.getExpireEvent()); SimpleTimer.getInstance().removeEvent(msg.getExpireEvent());
receiveComplete(msg); receiveComplete(msg);
} else { } else {
noteReception(msg.getMessageId(), 0); noteReception(msg.getMessageId(), 0, msg.toString());
} }
if (isNew && fragmented && !msg.isComplete()) { if (isNew && fragmented && !msg.isComplete()) {
@ -325,7 +325,7 @@ public class FragmentHandler {
_context.statManager().addRateData("tunnel.fragmentedComplete", msg.getFragmentCount(), msg.getLifetime()); _context.statManager().addRateData("tunnel.fragmentedComplete", msg.getFragmentCount(), msg.getLifetime());
receiveComplete(msg); receiveComplete(msg);
} else { } else {
noteReception(msg.getMessageId(), fragmentNum); noteReception(msg.getMessageId(), fragmentNum, msg.toString());
} }
if (isNew && !msg.isComplete()) { if (isNew && !msg.isComplete()) {
@ -359,9 +359,9 @@ public class FragmentHandler {
} }
} }
protected void noteReception(long messageId, int fragmentId) {} protected void noteReception(long messageId, int fragmentId, String status) {}
protected void noteCompletion(long messageId) {} protected void noteCompletion(long messageId) {}
protected void noteFailure(long messageId) {} protected void noteFailure(long messageId, String status) {}
/** /**
* Receive messages out of the tunnel endpoint. There should be a single * Receive messages out of the tunnel endpoint. There should be a single
@ -393,7 +393,7 @@ public class FragmentHandler {
} }
if (removed && !_msg.getReleased()) { if (removed && !_msg.getReleased()) {
_failed++; _failed++;
noteFailure(_msg.getMessageId()); noteFailure(_msg.getMessageId(), _msg.toString());
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Dropped failed fragmented message: " + _msg); _log.warn("Dropped failed fragmented message: " + _msg);
_context.statManager().addRateData("tunnel.fragmentedDropped", _msg.getFragmentCount(), _msg.getLifetime()); _context.statManager().addRateData("tunnel.fragmentedDropped", _msg.getFragmentCount(), _msg.getLifetime());

View File

@ -84,7 +84,7 @@ public class FragmentedMessage {
+ Base64.encode(ba.getData(), ba.getOffset(), ba.getValid())); + Base64.encode(ba.getData(), ba.getOffset(), ba.getValid()));
_fragments[fragmentNum] = ba; _fragments[fragmentNum] = ba;
_lastReceived = isLast; _lastReceived = _lastReceived || isLast;
if (fragmentNum > _highFragmentNum) if (fragmentNum > _highFragmentNum)
_highFragmentNum = fragmentNum; _highFragmentNum = fragmentNum;
if (isLast && fragmentNum <= 0) if (isLast && fragmentNum <= 0)
@ -119,7 +119,7 @@ public class FragmentedMessage {
_log.debug("fragment[0/" + offset + "/" + length + "]: " _log.debug("fragment[0/" + offset + "/" + length + "]: "
+ Base64.encode(ba.getData(), ba.getOffset(), ba.getValid())); + Base64.encode(ba.getData(), ba.getOffset(), ba.getValid()));
_fragments[0] = ba; _fragments[0] = ba;
_lastReceived = isLast; _lastReceived = _lastReceived || isLast;
_toRouter = toRouter; _toRouter = toRouter;
_toTunnel = toTunnel; _toTunnel = toTunnel;
if (_highFragmentNum < 0) if (_highFragmentNum < 0)

View File

@ -1,25 +1,34 @@
package net.i2p.router.tunnel; package net.i2p.router.tunnel;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.util.Log;
/** /**
* Minor extension to allow message history integration * Minor extension to allow message history integration
*/ */
public class RouterFragmentHandler extends FragmentHandler { public class RouterFragmentHandler extends FragmentHandler {
private RouterContext _routerContext; private RouterContext _routerContext;
private Log _log;
public RouterFragmentHandler(RouterContext context, DefragmentedReceiver receiver) { public RouterFragmentHandler(RouterContext context, DefragmentedReceiver receiver) {
super(context, receiver); super(context, receiver);
_routerContext = context; _routerContext = context;
_log = context.logManager().getLog(RouterFragmentHandler.class);
} }
protected void noteReception(long messageId, int fragmentId) { protected void noteReception(long messageId, int fragmentId, String status) {
_routerContext.messageHistory().receiveTunnelFragment(messageId, fragmentId); if (_log.shouldLog(Log.INFO))
_log.info("Received fragment " + fragmentId + " for message " + messageId + ": " + status);
_routerContext.messageHistory().receiveTunnelFragment(messageId, fragmentId, status);
} }
protected void noteCompletion(long messageId) { protected void noteCompletion(long messageId) {
if (_log.shouldLog(Log.INFO))
_log.info("Received complete message " + messageId);
_routerContext.messageHistory().receiveTunnelFragmentComplete(messageId); _routerContext.messageHistory().receiveTunnelFragmentComplete(messageId);
} }
protected void noteFailure(long messageId) { protected void noteFailure(long messageId, String status) {
_routerContext.messageHistory().droppedFragmentedMessage(messageId); if (_log.shouldLog(Log.INFO))
_log.info("Dropped message " + messageId + ": " + status);
_routerContext.messageHistory().droppedFragmentedMessage(messageId, status);
} }
} }

View File

@ -225,6 +225,7 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
offset += payloadLength; offset += payloadLength;
msg.setOffset(msg.getOffset() + payloadLength); msg.setOffset(msg.getOffset() + payloadLength);
if (fragmented)
msg.incrementFragmentNumber(); msg.incrementFragmentNumber();
return offset; return offset;
} }
@ -264,6 +265,7 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
offset += payloadLength; offset += payloadLength;
if (!isLast)
msg.incrementFragmentNumber(); msg.incrementFragmentNumber();
msg.setOffset(msg.getOffset() + payloadLength); msg.setOffset(msg.getOffset() + payloadLength);
return offset; return offset;

View File

@ -107,8 +107,11 @@ public class TunnelDispatcher implements Service {
new long[] { 60*10*1000l, 60*60*1000l, 24*60*60*1000l }); new long[] { 60*10*1000l, 60*60*1000l, 24*60*60*1000l });
} }
private TunnelGateway.QueuePreprocessor createPreprocessor() { private TunnelGateway.QueuePreprocessor createPreprocessor(HopConfig cfg) {
return createPreprocessor(null); if (true)
return new BatchedRouterPreprocessor(_context, cfg);
else
return new TrivialRouterPreprocessor(_context);
} }
private TunnelGateway.QueuePreprocessor createPreprocessor(TunnelCreatorConfig cfg) { private TunnelGateway.QueuePreprocessor createPreprocessor(TunnelCreatorConfig cfg) {
if (true) if (true)
@ -133,6 +136,7 @@ public class TunnelDispatcher implements Service {
_outboundGateways.put(outId, gw); _outboundGateways.put(outId, gw);
} }
_context.statManager().addRateData("tunnel.joinOutboundGateway", 1, 0); _context.statManager().addRateData("tunnel.joinOutboundGateway", 1, 0);
_context.messageHistory().tunnelJoined("outbound", cfg);
} else { } else {
TunnelGatewayZeroHop gw = new TunnelGatewayZeroHop(_context, cfg); TunnelGatewayZeroHop gw = new TunnelGatewayZeroHop(_context, cfg);
TunnelId outId = cfg.getConfig(0).getSendTunnel(); TunnelId outId = cfg.getConfig(0).getSendTunnel();
@ -140,6 +144,7 @@ public class TunnelDispatcher implements Service {
_outboundGateways.put(outId, gw); _outboundGateways.put(outId, gw);
} }
_context.statManager().addRateData("tunnel.joinOutboundGatewayZeroHop", 1, 0); _context.statManager().addRateData("tunnel.joinOutboundGatewayZeroHop", 1, 0);
_context.messageHistory().tunnelJoined("outboundZeroHop", cfg);
} }
} }
/** /**
@ -156,6 +161,7 @@ public class TunnelDispatcher implements Service {
_participants.put(recvId, participant); _participants.put(recvId, participant);
} }
_context.statManager().addRateData("tunnel.joinInboundEndpoint", 1, 0); _context.statManager().addRateData("tunnel.joinInboundEndpoint", 1, 0);
_context.messageHistory().tunnelJoined("inboundEndpoint", cfg);
} else { } else {
TunnelGatewayZeroHop gw = new TunnelGatewayZeroHop(_context, cfg); TunnelGatewayZeroHop gw = new TunnelGatewayZeroHop(_context, cfg);
TunnelId recvId = cfg.getConfig(0).getReceiveTunnel(); TunnelId recvId = cfg.getConfig(0).getReceiveTunnel();
@ -163,6 +169,7 @@ public class TunnelDispatcher implements Service {
_inboundGateways.put(recvId, gw); _inboundGateways.put(recvId, gw);
} }
_context.statManager().addRateData("tunnel.joinInboundEndpointZeroHop", 1, 0); _context.statManager().addRateData("tunnel.joinInboundEndpointZeroHop", 1, 0);
_context.messageHistory().tunnelJoined("inboundEndpointZeroHop", cfg);
} }
} }
@ -183,6 +190,7 @@ public class TunnelDispatcher implements Service {
_participatingConfig.put(recvId, cfg); _participatingConfig.put(recvId, cfg);
numParticipants = _participatingConfig.size(); numParticipants = _participatingConfig.size();
} }
_context.messageHistory().tunnelJoined("participant", cfg);
_context.statManager().addRateData("tunnel.participatingTunnels", numParticipants, 0); _context.statManager().addRateData("tunnel.participatingTunnels", numParticipants, 0);
_context.statManager().addRateData("tunnel.joinParticipant", 1, 0); _context.statManager().addRateData("tunnel.joinParticipant", 1, 0);
if (cfg.getExpiration() > _lastParticipatingExpiration) if (cfg.getExpiration() > _lastParticipatingExpiration)
@ -206,6 +214,7 @@ public class TunnelDispatcher implements Service {
_participatingConfig.put(recvId, cfg); _participatingConfig.put(recvId, cfg);
numParticipants = _participatingConfig.size(); numParticipants = _participatingConfig.size();
} }
_context.messageHistory().tunnelJoined("outboundEndpoint", cfg);
_context.statManager().addRateData("tunnel.participatingTunnels", numParticipants, 0); _context.statManager().addRateData("tunnel.participatingTunnels", numParticipants, 0);
_context.statManager().addRateData("tunnel.joinOutboundEndpoint", 1, 0); _context.statManager().addRateData("tunnel.joinOutboundEndpoint", 1, 0);
@ -221,7 +230,7 @@ public class TunnelDispatcher implements Service {
public void joinInboundGateway(HopConfig cfg) { public void joinInboundGateway(HopConfig cfg) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Joining as inbound gateway: " + cfg); _log.info("Joining as inbound gateway: " + cfg);
TunnelGateway.QueuePreprocessor preproc = createPreprocessor(); TunnelGateway.QueuePreprocessor preproc = createPreprocessor(cfg);
TunnelGateway.Sender sender = new InboundSender(_context, cfg); TunnelGateway.Sender sender = new InboundSender(_context, cfg);
TunnelGateway.Receiver receiver = new InboundGatewayReceiver(_context, cfg); TunnelGateway.Receiver receiver = new InboundGatewayReceiver(_context, cfg);
TunnelGateway gw = new TunnelGateway(_context, preproc, sender, receiver); TunnelGateway gw = new TunnelGateway(_context, preproc, sender, receiver);
@ -234,6 +243,7 @@ public class TunnelDispatcher implements Service {
_participatingConfig.put(recvId, cfg); _participatingConfig.put(recvId, cfg);
numParticipants = _participatingConfig.size(); numParticipants = _participatingConfig.size();
} }
_context.messageHistory().tunnelJoined("inboundGateway", cfg);
_context.statManager().addRateData("tunnel.participatingTunnels", numParticipants, 0); _context.statManager().addRateData("tunnel.participatingTunnels", numParticipants, 0);
_context.statManager().addRateData("tunnel.joinInboundGateway", 1, 0); _context.statManager().addRateData("tunnel.joinInboundGateway", 1, 0);
@ -346,6 +356,8 @@ public class TunnelDispatcher implements Service {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("dispatch to participant " + participant + ": " + msg.getUniqueId() + " from " _log.debug("dispatch to participant " + participant + ": " + msg.getUniqueId() + " from "
+ recvFrom.toBase64().substring(0,4)); + recvFrom.toBase64().substring(0,4));
_context.messageHistory().tunnelDispatched("message " + msg.getUniqueId() + " on tunnel "
+ msg.getTunnelId().getTunnelId() + " as participant");
participant.dispatch(msg, recvFrom); participant.dispatch(msg, recvFrom);
_context.statManager().addRateData("tunnel.dispatchParticipant", 1, 0); _context.statManager().addRateData("tunnel.dispatchParticipant", 1, 0);
} else { } else {
@ -358,7 +370,10 @@ public class TunnelDispatcher implements Service {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("dispatch where we are the outbound endpoint: " + endpoint + ": " _log.debug("dispatch where we are the outbound endpoint: " + endpoint + ": "
+ msg + " from " + recvFrom.toBase64().substring(0,4)); + msg + " from " + recvFrom.toBase64().substring(0,4));
_context.messageHistory().tunnelDispatched("message " + msg.getUniqueId() + " on tunnel "
+ msg.getTunnelId().getTunnelId() + " as outbound endpoint");
endpoint.dispatch(msg, recvFrom); endpoint.dispatch(msg, recvFrom);
_context.statManager().addRateData("tunnel.dispatchEndpoint", 1, 0); _context.statManager().addRateData("tunnel.dispatchEndpoint", 1, 0);
} else { } else {
_context.messageHistory().droppedTunnelDataMessageUnknown(msg.getUniqueId(), msg.getTunnelId().getTunnelId()); _context.messageHistory().droppedTunnelDataMessageUnknown(msg.getUniqueId(), msg.getTunnelId().getTunnelId());
@ -397,6 +412,8 @@ public class TunnelDispatcher implements Service {
+ msg.getMessage().getClass().getName()); + msg.getMessage().getClass().getName());
return; return;
} }
_context.messageHistory().tunnelDispatched("message " + msg.getUniqueId() + "/" + msg.getMessage().getUniqueId() + " on tunnel "
+ msg.getTunnelId().getTunnelId() + " as inbound gateway");
gw.add(msg); gw.add(msg);
_context.statManager().addRateData("tunnel.dispatchInbound", 1, 0); _context.statManager().addRateData("tunnel.dispatchInbound", 1, 0);
} else { } else {
@ -464,6 +481,9 @@ public class TunnelDispatcher implements Service {
+ (before-msg.getMessageExpiration()) + "ms ago? " + (before-msg.getMessageExpiration()) + "ms ago? "
+ msg, new Exception("cause")); + msg, new Exception("cause"));
} }
_context.messageHistory().tunnelDispatched("message " + msg.getUniqueId() + " on tunnel "
+ outboundTunnel + "/" + targetTunnel + " to "
+ targetPeer + " as outbound gateway");
gw.add(msg, targetPeer, targetTunnel); gw.add(msg, targetPeer, targetTunnel);
if (targetTunnel == null) if (targetTunnel == null)
_context.statManager().addRateData("tunnel.dispatchOutboundPeer", 1, 0); _context.statManager().addRateData("tunnel.dispatchOutboundPeer", 1, 0);