diff --git a/history.txt b/history.txt
index 4974fa3da..e23cb9cf0 100644
--- a/history.txt
+++ b/history.txt
@@ -1,3 +1,13 @@
+2009-12-22 zzz
+ * Tunnels:
+ - Do RED dropping before the IBGW fragmenter, not after
+ - Change batch time to 250ms for IBGWs (was 100ms)
+ - Change batch time to 150ms for exploratory OBGWs (was 100ms)
+ - Start a new message in the fragmenter if almost full
+ - Fix a major, longstanding synchronization bug in the FragmentHandler
+ which led to corrupt messages at the endpoints
+ - More cleanups and comments
+
2009-12-20 zzz
* Console:
- Fix status to show a disconnected network error rather than
diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java
index 1440f81c0..217d41d54 100644
--- a/router/java/src/net/i2p/router/RouterVersion.java
+++ b/router/java/src/net/i2p/router/RouterVersion.java
@@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
- public final static long BUILD = 10;
+ public final static long BUILD = 11;
/** for example "-test" */
public final static String EXTRA = "";
public final static String FULL_VERSION = VERSION + "-" + BUILD + EXTRA;
diff --git a/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java b/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java
index 30fbbdd27..cfaabcfcb 100644
--- a/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java
+++ b/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java
@@ -67,6 +67,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
ctx.statManager().createRateStat("tunnel.batchFragmentation", "Avg. number of fragments per msg", "Tunnels", new long[] { 10*60*1000, 60*60*1000 });
}
+ /** 1003 */
private static final int FULL_SIZE = PREPROCESSED_SIZE
- IV_SIZE
- 1 // 0x00 ending the padding
@@ -76,11 +77,28 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
/* not final or private so the test code can adjust */
static long DEFAULT_DELAY = 100;
- /** Wait up to this long before sending (flushing) a small tunnel message */
+ /**
+ * Wait up to this long before sending (flushing) a small tunnel message
+ * Warning - overridden in BatchedRouterPreprocessor
+ */
protected long getSendDelay() { return DEFAULT_DELAY; }
- /** if we have 50 messages queued that are too small, flush them anyway */
- private static final int FORCE_BATCH_FLUSH = 50;
+ /**
+ * if we have this many messages queued that are too small, flush them anyway
+ * Even small messages take up about 200 bytes or so.
+ */
+ private static final int FORCE_BATCH_FLUSH = 5;
+
+ /** If we have this much allocated, flush anyway.
+ * Tune this to trade off padding vs. fragmentation.
+ * The lower the value, the more we are willing to send off
+ * a tunnel msg that isn't full so the next message can start
+ * in a new tunnel msg to minimize fragmentation.
+ *
+ * This should be at most FULL_SIZE - (39 + a few), since
+ * you want to at least fit in the instructions and a few bytes.
+ */
+ private static final int FULL_ENOUGH_SIZE = (FULL_SIZE * 80) / 100;
/** how long do we want to wait before flushing */
@Override
@@ -100,6 +118,8 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
/* See TunnelGateway.QueuePreprocessor for Javadoc */
@Override
public boolean preprocessQueue(List pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
+ if (_log.shouldLog(Log.INFO))
+ display(0, pending, "Starting");
StringBuilder timingBuf = null;
if (_log.shouldLog(Log.DEBUG)) {
_log.debug("Preprocess queue with " + pending.size() + " to send");
@@ -155,9 +175,10 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
_context.statManager().addRateData("tunnel.batchFullFragments", 1, 0);
long afterSend = System.currentTimeMillis();
if (_log.shouldLog(Log.INFO))
- _log.info("Allocated=" + allocated + " so we sent " + (i+1)
- + " (last complete? " + (msg.getOffset() >= msg.getData().length)
- + ", off=" + msg.getOffset() + ", count=" + pending.size() + ")");
+ display(allocated, pending, "Sent the message with " + (i+1));
+ //_log.info(_name + ": Allocated=" + allocated + "B, Sent " + (i+1)
+ // + " msgs (last complete? " + (msg.getOffset() >= msg.getData().length)
+ // + ", off=" + msg.getOffset() + ", pending=" + pending.size() + ")");
// Remove what we sent from the pending queue
for (int j = 0; j < i; j++) {
@@ -197,7 +218,6 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
timingBuf.append(" After pending loop " + (System.currentTimeMillis()-beforePendingLoop)).append(".");
} // for
- long afterCleared = System.currentTimeMillis();
if (_log.shouldLog(Log.INFO))
display(allocated, pending, "after looping to clear " + (beforeLooping - pending.size()));
long afterDisplayed = System.currentTimeMillis();
@@ -205,7 +225,12 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
// After going through the entire pending list, we have only a partial message.
// We might flush it or might not, but we are returning either way.
- if ( (pending.size() > FORCE_BATCH_FLUSH) || ( (_pendingSince > 0) && (getDelayAmount() <= 0) ) ) {
+ if ( (pending.size() > FORCE_BATCH_FLUSH) || // enough msgs - or
+ ( (_pendingSince > 0) && (getDelayAmount() <= 0) ) || // time to flush - or
+ (allocated >= FULL_ENOUGH_SIZE)) { // full enough
+ //(pending.get(0).getFragmentNumber() > 0)) { // don't delay anybody's last fragment,
+ // // which would be the first fragment in the message
+
// not even a full message, but we want to flush it anyway
if (pending.size() > 1)
@@ -215,22 +240,24 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
send(pending, 0, pending.size()-1, sender, rec);
_context.statManager().addRateData("tunnel.batchSmallFragments", FULL_SIZE - allocated, 0);
- // Remove everything in the message from the pending queue
+ // Remove everything in the outgoing message from the pending queue
int beforeSize = pending.size();
- for (int i = 0; i < pending.size(); i++) {
- TunnelGateway.Pending cur = pending.get(i);
- if (cur.getOffset() >= cur.getData().length) {
- pending.remove(i);
- notePreprocessing(cur.getMessageId(), cur.getFragmentNumber(), cur.getData().length, cur.getMessageIds(), "flushed remaining");
- _context.statManager().addRateData("tunnel.batchFragmentation", cur.getFragmentNumber() + 1, 0);
- _context.statManager().addRateData("tunnel.writeDelay", cur.getLifetime(), cur.getData().length);
- i--;
- }
+ for (int i = 0; i < beforeSize; i++) {
+ TunnelGateway.Pending cur = pending.get(0);
+ if (cur.getOffset() < cur.getData().length)
+ break;
+ pending.remove(0);
+ notePreprocessing(cur.getMessageId(), cur.getFragmentNumber(), cur.getData().length, cur.getMessageIds(), "flushed remaining");
+ _context.statManager().addRateData("tunnel.batchFragmentation", cur.getFragmentNumber() + 1, 0);
+ _context.statManager().addRateData("tunnel.writeDelay", cur.getLifetime(), cur.getData().length);
}
+
if (pending.size() > 0) {
+ // rare
_pendingSince = _context.clock().now();
_context.statManager().addRateData("tunnel.batchFlushRemaining", pending.size(), beforeSize);
- display(allocated, pending, "flushed, some remain");
+ if (_log.shouldLog(Log.INFO))
+ display(allocated, pending, "flushed, some remain");
if (timingBuf != null) {
timingBuf.append(" flushed, some remain (displayed to now: " + (System.currentTimeMillis()-afterDisplayed) + ")");
@@ -239,12 +266,15 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
}
return true;
} else {
- long delayAmount = _context.clock().now() - _pendingSince;
- _pendingSince = 0;
+ long delayAmount = 0;
+ if (_pendingSince > 0) {
+ delayAmount = _context.clock().now() - _pendingSince;
+ _pendingSince = 0;
+ }
if (batchCount > 1)
_context.statManager().addRateData("tunnel.batchCount", batchCount, 0);
if (_log.shouldLog(Log.INFO))
- display(allocated, pending, "flushed " + (beforeSize) + ", no remaining after " + delayAmount);
+ display(allocated, pending, "flushed " + (beforeSize) + ", no remaining after " + delayAmount + "ms");
if (timingBuf != null) {
timingBuf.append(" flushed, none remain (displayed to now: " + (System.currentTimeMillis()-afterDisplayed) + ")");
@@ -262,7 +292,8 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
if (batchCount > 1)
_context.statManager().addRateData("tunnel.batchCount", batchCount, 0);
// not yet time to send the delayed flush
- display(allocated, pending, "dont flush");
+ if (_log.shouldLog(Log.INFO))
+ display(allocated, pending, "dont flush");
if (timingBuf != null) {
timingBuf.append(" dont flush (displayed to now: " + (System.currentTimeMillis()-afterDisplayed) + ")");
@@ -293,20 +324,25 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
return false;
}
+ /*
+ * Only if Log.INFO
+ *
+ * title: allocated: X pending: X (delay: X) [0]:offset/length/lifetime [1]:etc.
+ */
private void display(long allocated, List pending, String title) {
if (_log.shouldLog(Log.INFO)) {
long highestDelay = 0;
- StringBuilder buf = new StringBuilder();
+ StringBuilder buf = new StringBuilder(128);
buf.append(_name).append(": ");
buf.append(title);
- buf.append(" allocated: ").append(allocated);
+ buf.append(" - allocated: ").append(allocated);
buf.append(" pending: ").append(pending.size());
if (_pendingSince > 0)
buf.append(" delay: ").append(getDelayAmount(false));
for (int i = 0; i < pending.size(); i++) {
TunnelGateway.Pending curPending = pending.get(i);
- buf.append(" pending[").append(i).append("]: ");
- buf.append(curPending.getOffset()).append("/").append(curPending.getData().length).append('/');
+ buf.append(" [").append(i).append("]:");
+ buf.append(curPending.getOffset()).append('/').append(curPending.getData().length).append('/');
buf.append(curPending.getLifetime());
if (curPending.getLifetime() > highestDelay)
highestDelay = curPending.getLifetime();
diff --git a/router/java/src/net/i2p/router/tunnel/BatchedRouterPreprocessor.java b/router/java/src/net/i2p/router/tunnel/BatchedRouterPreprocessor.java
index 6af65e88e..71d80b011 100644
--- a/router/java/src/net/i2p/router/tunnel/BatchedRouterPreprocessor.java
+++ b/router/java/src/net/i2p/router/tunnel/BatchedRouterPreprocessor.java
@@ -14,68 +14,96 @@ public class BatchedRouterPreprocessor extends BatchedPreprocessor {
protected RouterContext _routerContext;
private TunnelCreatorConfig _config;
protected HopConfig _hopConfig;
+ private final long _sendDelay;
/**
* How frequently should we flush non-full messages, in milliseconds
+ * This goes in I2CP custom options for the pool.
+ * Only applies to OBGWs.
*/
public static final String PROP_BATCH_FREQUENCY = "batchFrequency";
+ /** This goes in router advanced config */
public static final String PROP_ROUTER_BATCH_FREQUENCY = "router.batchFrequency";
- public static final int DEFAULT_BATCH_FREQUENCY = 100;
+ /** for client OBGWs only (our data) */
+ public static final int OB_CLIENT_BATCH_FREQ = 100;
+ /** for exploratory OBGWs only (our tunnel tests and build messages) */
+ public static final int OB_EXPL_BATCH_FREQ = 150;
+ /** for IBGWs for efficiency (not our data) */
+ public static final int DEFAULT_BATCH_FREQUENCY = 250;
- public BatchedRouterPreprocessor(RouterContext ctx) {
- this(ctx, (HopConfig)null);
- }
+ /** for OBGWs */
public BatchedRouterPreprocessor(RouterContext ctx, TunnelCreatorConfig cfg) {
super(ctx, getName(cfg));
_routerContext = ctx;
_config = cfg;
+ _sendDelay = initialSendDelay();
}
+
+ /** for IBGWs */
public BatchedRouterPreprocessor(RouterContext ctx, HopConfig cfg) {
super(ctx, getName(cfg));
_routerContext = ctx;
_hopConfig = cfg;
+ _sendDelay = initialSendDelay();
}
private static String getName(HopConfig cfg) {
- if (cfg == null) return "[unknown]";
+ if (cfg == null) return "IB??";
if (cfg.getReceiveTunnel() != null)
- return cfg.getReceiveTunnel().getTunnelId() + "";
+ return "IB " + cfg.getReceiveTunnel().getTunnelId();
else if (cfg.getSendTunnel() != null)
- return cfg.getSendTunnel().getTunnelId() + "";
+ return "IB " + cfg.getSendTunnel().getTunnelId();
else
- return "[n/a]";
+ return "IB??";
}
private static String getName(TunnelCreatorConfig cfg) {
- if (cfg == null) return "[unknown]";
+ if (cfg == null) return "OB??";
if (cfg.getReceiveTunnelId(0) != null)
- return cfg.getReceiveTunnelId(0).getTunnelId() + "";
+ return "OB " + cfg.getReceiveTunnelId(0).getTunnelId();
else if (cfg.getSendTunnelId(0) != null)
- return cfg.getSendTunnelId(0).getTunnelId() + "";
+ return "OB " + cfg.getSendTunnelId(0).getTunnelId();
else
- return "[n/a]";
+ return "OB??";
}
- /** how long should we wait before flushing */
+ /**
+ * how long should we wait before flushing
+ */
@Override
- protected long getSendDelay() {
- String freq = null;
+ protected long getSendDelay() { return _sendDelay; }
+
+ /*
+ * Extend the batching time for exploratory OBGWs, they have a lot of small
+ * tunnel test messages, and build messages that don't fit perfectly.
+ * And these are not as delay-sensitive.
+ *
+ * We won't pick up config changes after the preprocessor is created,
+ * but a preprocessor lifetime is only 10 minutes, so just wait...
+ */
+ private long initialSendDelay() {
if (_config != null) {
Properties opts = _config.getOptions();
- if (opts != null)
- freq = opts.getProperty(PROP_BATCH_FREQUENCY);
- }
- if (freq == null)
- freq = _routerContext.getProperty(PROP_ROUTER_BATCH_FREQUENCY);
-
- if (freq != null) {
- try {
- return Integer.parseInt(freq);
- } catch (NumberFormatException nfe) {
- return DEFAULT_BATCH_FREQUENCY;
+ if (opts != null) {
+ String freq = opts.getProperty(PROP_BATCH_FREQUENCY);
+ if (freq != null) {
+ try {
+ return Integer.parseInt(freq);
+ } catch (NumberFormatException nfe) {}
+ }
}
}
- return DEFAULT_BATCH_FREQUENCY;
+
+ int def;
+ if (_config != null) {
+ if (_config.getDestination() != null)
+ def = OB_CLIENT_BATCH_FREQ;
+ else
+ def = OB_EXPL_BATCH_FREQ;
+ } else {
+ def = DEFAULT_BATCH_FREQUENCY;
+ }
+ return _routerContext.getProperty(PROP_ROUTER_BATCH_FREQUENCY, def);
}
@Override
diff --git a/router/java/src/net/i2p/router/tunnel/FragmentHandler.java b/router/java/src/net/i2p/router/tunnel/FragmentHandler.java
index f294d5c78..6e4c00e1c 100644
--- a/router/java/src/net/i2p/router/tunnel/FragmentHandler.java
+++ b/router/java/src/net/i2p/router/tunnel/FragmentHandler.java
@@ -39,29 +39,32 @@ data into the raw tunnel payload:
a series of zero or more { instructions, message } pairs
+Note that the padding, if any, must be before the instruction/message pairs.
+there is no provision for padding at the end.
+
The instructions are encoded with a single control byte, followed by any
necessary additional information. The first bit in that control byte determines
how the remainder of the header is interpreted - if it is not set, the message
is either not fragmented or this is the first fragment in the message. If it is
set, this is a follow on fragment.
-With the first bit being 0, the instructions are:
+With the first (leftmost or MSB) bit being 0, the instructions are:
- 1 byte control byte:
bit 0: is follow on fragment? (1 = true, 0 = false, must be 0)
bits 1-2: delivery type
(0x0 = LOCAL, 0x01 = TUNNEL, 0x02 = ROUTER)
- bit 3: delay included? (1 = true, 0 = false)
+ bit 3: delay included? (1 = true, 0 = false) (unimplemented)
bit 4: fragmented? (1 = true, 0 = false)
- bit 5: extended options? (1 = true, 0 = false)
+ bit 5: extended options? (1 = true, 0 = false) (unimplemented)
bits 6-7: reserved
- if the delivery type was TUNNEL, a 4 byte tunnel ID
- if the delivery type was TUNNEL or ROUTER, a 32 byte router hash
-- if the delay included flag is true, a 1 byte value:
+
- if the delay included flag is true, a 1 byte value (unimplemented):
bit 0: type (0 = strict, 1 = randomized)
bits 1-7: delay exponent (2^value minutes)
- if the fragmented flag is true, a 4 byte message ID
-- if the extended options flag is true:
+
- if the extended options flag is true (unimplemented):
= a 1 byte option size (in bytes)
= that many bytes
- 2 byte size of the I2NP message or this fragment
@@ -85,10 +88,11 @@ preprocessed payload must be padded to a multiple of 16 bytes.
public class FragmentHandler {
private I2PAppContext _context;
private Log _log;
- private final Map _fragmentedMessages;
+ private final Map _fragmentedMessages;
private DefragmentedReceiver _receiver;
private int _completed;
private int _failed;
+ private static final long[] RATES = { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000 };
/** don't wait more than 60s to defragment the partial message */
static long MAX_DEFRAGMENT_TIME = 60*1000;
@@ -97,18 +101,18 @@ public class FragmentHandler {
public FragmentHandler(I2PAppContext context, DefragmentedReceiver receiver) {
_context = context;
_log = context.logManager().getLog(FragmentHandler.class);
- _fragmentedMessages = new HashMap(4);
+ _fragmentedMessages = new HashMap(8);
_receiver = receiver;
_context.statManager().createRateStat("tunnel.smallFragments", "How many pad bytes are in small fragments?",
- "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000 });
+ "Tunnels", RATES);
_context.statManager().createRateStat("tunnel.fullFragments", "How many tunnel messages use the full data area?",
- "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000 });
+ "Tunnels", RATES);
_context.statManager().createRateStat("tunnel.fragmentedComplete", "How many fragments were in a completely received message?",
- "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000 });
+ "Tunnels", RATES);
_context.statManager().createRateStat("tunnel.fragmentedDropped", "How many fragments were in a partially received yet failed message?",
- "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000 });
+ "Tunnels", RATES);
_context.statManager().createRateStat("tunnel.corruptMessage", "How many corrupted messages arrived?",
- "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000 });
+ "Tunnels", RATES);
}
/**
@@ -233,7 +237,7 @@ public class FragmentHandler {
+ " != " + Base64.encode(v.getData(), 0, 4));
_log.warn("No matching endpoint: # pad bytes: " + (paddingEnd-(HopProcessor.IV_LENGTH+4)-1)
+ " offset=" + offset + " length=" + length + " paddingEnd=" + paddingEnd + ' '
- + Base64.encode(preprocessed, offset, length));
+ + Base64.encode(preprocessed, offset, length), new Exception("trace"));
}
}
@@ -254,11 +258,12 @@ public class FragmentHandler {
static final byte MASK_TYPE = (byte)(3 << 5);
/** is this the first of a fragmented message? */
static final byte MASK_FRAGMENTED = (byte)(1 << 3);
- /** are there follow up headers? */
+ /** are there follow up headers? UNIMPLEMENTED */
static final byte MASK_EXTENDED = (byte)(1 << 2);
/** for subsequent fragments, which bits contain the fragment #? */
private static final int MASK_FRAGMENT_NUM = (byte)((1 << 7) - 2); // 0x7E;
+ /** LOCAL isn't explicitly used anywhere, because the code knows that it is 0 */
static final short TYPE_LOCAL = 0;
static final short TYPE_TUNNEL = 1;
static final short TYPE_ROUTER = 2;
@@ -268,8 +273,8 @@ public class FragmentHandler {
*/
private int receiveFragment(byte preprocessed[], int offset, int length) {
if (_log.shouldLog(Log.DEBUG))
- _log.debug("CONTROL: " + Integer.toHexString(preprocessed[offset]) + " / "
- + "/" + Base64.encode(preprocessed, offset, 1) + " at offset " + offset);
+ _log.debug("CONTROL: 0x" + Integer.toHexString(preprocessed[offset] & 0xff) +
+ " at offset " + offset);
if (0 == (preprocessed[offset] & MASK_IS_SUBSEQUENT))
return receiveInitialFragment(preprocessed, offset, length);
else
@@ -330,42 +335,48 @@ public class FragmentHandler {
int size = (int)DataHelper.fromLong(preprocessed, offset, 2);
offset += 2;
- boolean isNew = false;
FragmentedMessage msg = null;
if (fragmented) {
synchronized (_fragmentedMessages) {
- msg = (FragmentedMessage)_fragmentedMessages.get(new Long(messageId));
+ msg = _fragmentedMessages.get(new Long(messageId));
if (msg == null) {
msg = new FragmentedMessage(_context);
_fragmentedMessages.put(new Long(messageId), msg);
- isNew = true;
}
}
} else {
msg = new FragmentedMessage(_context);
}
- boolean ok = msg.receive(messageId, preprocessed, offset, size, !fragmented, router, tunnelId);
- if (!ok) return -1;
- if (msg.isComplete()) {
- if (fragmented) {
- synchronized (_fragmentedMessages) {
- _fragmentedMessages.remove(new Long(messageId));
+ if (fragmented) {
+ // synchronized is required, fragments may be arriving in different threads
+ synchronized(msg) {
+ boolean ok = msg.receive(messageId, preprocessed, offset, size, false, router, tunnelId);
+ if (!ok) return -1;
+ if (msg.isComplete()) {
+ synchronized (_fragmentedMessages) {
+ _fragmentedMessages.remove(new Long(messageId));
+ }
+ if (msg.getExpireEvent() != null)
+ SimpleTimer.getInstance().removeEvent(msg.getExpireEvent());
+ receiveComplete(msg);
+ } else {
+ noteReception(msg.getMessageId(), 0, msg);
+ if (msg.getExpireEvent() == null) {
+ RemoveFailed evt = new RemoveFailed(msg);
+ msg.setExpireEvent(evt);
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("In " + MAX_DEFRAGMENT_TIME + " dropping " + messageId);
+ SimpleTimer.getInstance().addEvent(evt, MAX_DEFRAGMENT_TIME);
+ }
}
}
- if (msg.getExpireEvent() != null)
- SimpleTimer.getInstance().removeEvent(msg.getExpireEvent());
+ } else {
+ // synchronized not required if !fragmented
+ boolean ok = msg.receive(messageId, preprocessed, offset, size, true, router, tunnelId);
+ if (!ok) return -1;
+ // always complete, never an expire event
receiveComplete(msg);
- } else {
- noteReception(msg.getMessageId(), 0, msg);
- }
-
- if (isNew && fragmented && !msg.isComplete()) {
- RemoveFailed evt = new RemoveFailed(msg);
- msg.setExpireEvent(evt);
- if (_log.shouldLog(Log.DEBUG))
- _log.debug("In " + MAX_DEFRAGMENT_TIME + " dropping " + messageId);
- SimpleTimer.getInstance().addEvent(evt, MAX_DEFRAGMENT_TIME);
}
offset += size;
@@ -397,38 +408,38 @@ public class FragmentHandler {
throw new RuntimeException("Preprocessed message was invalid [messageId =" + messageId + " size="
+ size + " offset=" + offset + " fragment=" + fragmentNum);
- boolean isNew = false;
FragmentedMessage msg = null;
synchronized (_fragmentedMessages) {
- msg = (FragmentedMessage)_fragmentedMessages.get(new Long(messageId));
+ msg = _fragmentedMessages.get(new Long(messageId));
if (msg == null) {
msg = new FragmentedMessage(_context);
_fragmentedMessages.put(new Long(messageId), msg);
- isNew = true;
}
}
- boolean ok = msg.receive(messageId, fragmentNum, preprocessed, offset, size, isLast);
- if (!ok) return -1;
-
- if (msg.isComplete()) {
- synchronized (_fragmentedMessages) {
- _fragmentedMessages.remove(new Long(messageId));
+ // synchronized is required, fragments may be arriving in different threads
+ synchronized(msg) {
+ boolean ok = msg.receive(messageId, fragmentNum, preprocessed, offset, size, isLast);
+ if (!ok) return -1;
+
+ if (msg.isComplete()) {
+ synchronized (_fragmentedMessages) {
+ _fragmentedMessages.remove(new Long(messageId));
+ }
+ if (msg.getExpireEvent() != null)
+ SimpleTimer.getInstance().removeEvent(msg.getExpireEvent());
+ _context.statManager().addRateData("tunnel.fragmentedComplete", msg.getFragmentCount(), msg.getLifetime());
+ receiveComplete(msg);
+ } else {
+ noteReception(msg.getMessageId(), fragmentNum, msg);
+ if (msg.getExpireEvent() == null) {
+ RemoveFailed evt = new RemoveFailed(msg);
+ msg.setExpireEvent(evt);
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("In " + MAX_DEFRAGMENT_TIME + " dropping " + msg.getMessageId() + "/" + fragmentNum);
+ SimpleTimer.getInstance().addEvent(evt, MAX_DEFRAGMENT_TIME);
+ }
}
- if (msg.getExpireEvent() != null)
- SimpleTimer.getInstance().removeEvent(msg.getExpireEvent());
- _context.statManager().addRateData("tunnel.fragmentedComplete", msg.getFragmentCount(), msg.getLifetime());
- receiveComplete(msg);
- } else {
- noteReception(msg.getMessageId(), fragmentNum, msg);
- }
-
- if (isNew && !msg.isComplete()) {
- RemoveFailed evt = new RemoveFailed(msg);
- msg.setExpireEvent(evt);
- if (_log.shouldLog(Log.DEBUG))
- _log.debug("In " + MAX_DEFRAGMENT_TIME + " dropping " + msg.getMessageId() + "/" + fragmentNum);
- SimpleTimer.getInstance().addEvent(evt, MAX_DEFRAGMENT_TIME);
}
offset += size;
@@ -449,8 +460,8 @@ public class FragmentHandler {
if (data == null)
throw new I2NPMessageException("null data"); // fragments already released???
if (_log.shouldLog(Log.DEBUG))
- _log.debug("RECV(" + data.length + "): " + Base64.encode(data)
- + " " + _context.sha().calculateHash(data).toBase64());
+ _log.debug("RECV(" + data.length + "): "); // + Base64.encode(data)
+ //+ " " + _context.sha().calculateHash(data).toBase64());
I2NPMessage m = new I2NPMessageHandler(_context).readMessage(data);
noteReception(m.getUniqueId(), fragmentCount-1, "complete: ");// + msg.toString());
noteCompletion(m.getUniqueId());
@@ -498,15 +509,17 @@ public class FragmentHandler {
synchronized (_fragmentedMessages) {
removed = (null != _fragmentedMessages.remove(new Long(_msg.getMessageId())));
}
- if (removed && !_msg.getReleased()) {
- _failed++;
- noteFailure(_msg.getMessageId(), _msg.toString());
- if (_log.shouldLog(Log.WARN))
- _log.warn("Dropped failed fragmented message: " + _msg);
- _context.statManager().addRateData("tunnel.fragmentedDropped", _msg.getFragmentCount(), _msg.getLifetime());
- _msg.failed();
- } else {
- // succeeded before timeout
+ synchronized (_msg) {
+ if (removed && !_msg.getReleased()) {
+ _failed++;
+ noteFailure(_msg.getMessageId(), _msg.toString());
+ if (_log.shouldLog(Log.WARN))
+ _log.warn("Dropped failed fragmented message: " + _msg);
+ _context.statManager().addRateData("tunnel.fragmentedDropped", _msg.getFragmentCount(), _msg.getLifetime());
+ _msg.failed();
+ } else {
+ // succeeded before timeout
+ }
}
}
diff --git a/router/java/src/net/i2p/router/tunnel/FragmentedMessage.java b/router/java/src/net/i2p/router/tunnel/FragmentedMessage.java
index 77f54bb2c..10841701d 100644
--- a/router/java/src/net/i2p/router/tunnel/FragmentedMessage.java
+++ b/router/java/src/net/i2p/router/tunnel/FragmentedMessage.java
@@ -21,6 +21,8 @@ import net.i2p.util.SimpleTimer;
* Gather fragments of I2NPMessages at a tunnel endpoint, making them available
* for reading when complete.
*
+ * Warning - this is all unsynchronized here - receivers must implement synchronization
+ *
*/
public class FragmentedMessage {
private I2PAppContext _context;
@@ -282,7 +284,7 @@ public class FragmentedMessage {
if (ba != null)
buf.append(i).append(":").append(ba.getValid()).append(" bytes ");
else
- buf.append(i).append(": missing ");
+ buf.append(i).append(":missing ");
}
buf.append(" highest received: ").append(_highFragmentNum);
buf.append(" last received? ").append(_lastReceived);
diff --git a/router/java/src/net/i2p/router/tunnel/InboundGatewayReceiver.java b/router/java/src/net/i2p/router/tunnel/InboundGatewayReceiver.java
index 382be541f..ab39adf89 100644
--- a/router/java/src/net/i2p/router/tunnel/InboundGatewayReceiver.java
+++ b/router/java/src/net/i2p/router/tunnel/InboundGatewayReceiver.java
@@ -35,8 +35,9 @@ public class InboundGatewayReceiver implements TunnelGateway.Receiver {
}
}
- if (_context.tunnelDispatcher().shouldDropParticipatingMessage("IBGW", encrypted.length))
- return -1;
+ // We do this before the preprocessor now (i.e. before fragmentation)
+ //if (_context.tunnelDispatcher().shouldDropParticipatingMessage("IBGW", encrypted.length))
+ // return -1;
_config.incrementSentMessages();
TunnelDataMessage msg = new TunnelDataMessage(_context);
msg.setData(encrypted);
diff --git a/router/java/src/net/i2p/router/tunnel/InboundSender.java b/router/java/src/net/i2p/router/tunnel/InboundSender.java
index 1fd1b915d..14c911f93 100644
--- a/router/java/src/net/i2p/router/tunnel/InboundSender.java
+++ b/router/java/src/net/i2p/router/tunnel/InboundSender.java
@@ -1,7 +1,6 @@
package net.i2p.router.tunnel;
import net.i2p.I2PAppContext;
-import net.i2p.util.Log;
/**
* Receive the preprocessed data for an inbound gateway, encrypt it, and forward
@@ -9,16 +8,12 @@ import net.i2p.util.Log;
*
*/
public class InboundSender implements TunnelGateway.Sender {
- private I2PAppContext _context;
- private Log _log;
private InboundGatewayProcessor _processor;
static final boolean USE_ENCRYPTION = HopProcessor.USE_ENCRYPTION;
public InboundSender(I2PAppContext ctx, HopConfig config) {
- _context = ctx;
- _log = ctx.logManager().getLog(InboundSender.class);
- _processor = new InboundGatewayProcessor(_context, config);
+ _processor = new InboundGatewayProcessor(ctx, config);
}
public long sendPreprocessed(byte[] preprocessed, TunnelGateway.Receiver receiver) {
diff --git a/router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java b/router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java
index 9364092df..2d4f53855 100644
--- a/router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java
+++ b/router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java
@@ -3,14 +3,16 @@ package net.i2p.router.tunnel;
import java.util.ArrayList;
import java.util.List;
-import net.i2p.I2PAppContext;
import net.i2p.data.Hash;
import net.i2p.data.TunnelId;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.Router;
+import net.i2p.router.RouterContext;
import net.i2p.util.Log;
/**
+ * This is used for all gateways with more than zero hops.
+ *
* Serve as the gatekeeper for a tunnel, accepting messages, coallescing and/or
* fragmenting them before wrapping them up for tunnel delivery. The flow here
* is:
@@ -32,7 +34,7 @@ import net.i2p.util.Log;
*
*/
public class PumpedTunnelGateway extends TunnelGateway {
- private final List _prequeue;
+ private final List _prequeue;
private TunnelGatewayPumper _pumper;
/**
@@ -43,7 +45,7 @@ public class PumpedTunnelGateway extends TunnelGateway {
* @param receiver this receives the encrypted message and forwards it off
* to the first hop
*/
- public PumpedTunnelGateway(I2PAppContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver, TunnelGatewayPumper pumper) {
+ public PumpedTunnelGateway(RouterContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver, TunnelGatewayPumper pumper) {
super(context, preprocessor, sender, receiver);
_prequeue = new ArrayList(4);
_pumper = pumper;
@@ -78,7 +80,7 @@ public class PumpedTunnelGateway extends TunnelGateway {
* go quickly, rather than blocking its callers on potentially substantial
* processing.
*/
- void pump(List queueBuf) {
+ void pump(List queueBuf) {
synchronized (_prequeue) {
if (_prequeue.size() > 0) {
queueBuf.addAll(_prequeue);
@@ -88,7 +90,7 @@ public class PumpedTunnelGateway extends TunnelGateway {
}
}
long startAdd = System.currentTimeMillis();
- long beforeLock = System.currentTimeMillis();
+ long beforeLock = startAdd;
long afterAdded = -1;
boolean delayedFlush = false;
long delayAmount = -1;
@@ -108,7 +110,7 @@ public class PumpedTunnelGateway extends TunnelGateway {
// expire any as necessary, even if its framented
for (int i = 0; i < _queue.size(); i++) {
- Pending m = (Pending)_queue.get(i);
+ Pending m = _queue.get(i);
if (m.getExpiration() + Router.CLOCK_FUDGE_FACTOR < _lastFlush) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Expire on the queue (size=" + _queue.size() + "): " + m);
diff --git a/router/java/src/net/i2p/router/tunnel/ThrottledPumpedTunnelGateway.java b/router/java/src/net/i2p/router/tunnel/ThrottledPumpedTunnelGateway.java
new file mode 100644
index 000000000..5f415b9c3
--- /dev/null
+++ b/router/java/src/net/i2p/router/tunnel/ThrottledPumpedTunnelGateway.java
@@ -0,0 +1,49 @@
+package net.i2p.router.tunnel;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import net.i2p.data.Hash;
+import net.i2p.data.TunnelId;
+import net.i2p.data.i2np.I2NPMessage;
+import net.i2p.router.RouterContext;
+
+/**
+ * Same as PTG, but check to see if a message should be dropped before queueing it.
+ * Used for IBGWs.
+ *
+ */
+public class ThrottledPumpedTunnelGateway extends PumpedTunnelGateway {
+ /** saved so we can note messages that get dropped */
+ private HopConfig _config;
+
+ public ThrottledPumpedTunnelGateway(RouterContext context, QueuePreprocessor preprocessor, Sender sender,
+ Receiver receiver, TunnelGatewayPumper pumper, HopConfig config) {
+ super(context, preprocessor, sender, receiver, pumper);
+ _config = config;
+ }
+
+ /**
+ * Possibly drop a message due to bandwidth before adding it to the preprocessor queue.
+ * We do this here instead of in the InboundGatewayReceiver because it is much smarter to drop
+ * whole I2NP messages, where we know the message type and length, rather than
+ * tunnel messages containing I2NP fragments.
+ */
+ @Override
+ public void add(I2NPMessage msg, Hash toRouter, TunnelId toTunnel) {
+ //_log.error("IBGW count: " + _config.getProcessedMessagesCount() + " type: " + msg.getType() + " size: " + msg.getMessageSize());
+
+ // Hard to do this exactly, but we'll assume 2:1 batching
+ // for the purpose of estimating outgoing size.
+ // We assume that it's the outbound bandwidth that is the issue...
+ int size = Math.max(msg.getMessageSize(), 1024/2);
+ if (_context.tunnelDispatcher().shouldDropParticipatingMessage("IBGW " + msg.getType(), size)) {
+ // this overstates the stat somewhat, but ok for now
+ int kb = (size + 1023) / 1024;
+ for (int i = 0; i < kb; i++)
+ _config.incrementProcessedMessages();
+ return;
+ }
+ super.add(msg, toRouter,toTunnel);
+ }
+}
diff --git a/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java b/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java
index bd7bc7362..6cda3071f 100644
--- a/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java
+++ b/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java
@@ -17,6 +17,7 @@ import net.i2p.util.Log;
* each of those out. This does not coallesce message fragments or delay for more
* optimal throughput.
*
+ * See FragmentHandler Javadoc for tunnel message fragment format
*/
public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
protected RouterContext _context;
@@ -175,6 +176,8 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
//_log.debug("# pad bytes: " + numPadBytes + " payloadLength: " + payloadLength + " instructions: " + instructionsLength);
int paddingRemaining = numPadBytes;
+ // FIXME inefficient, waste of 3/4 of the entropy
+ // Should get a byte array of random, change all the zeros to something else, and ArrayCopy
while (paddingRemaining > 0) {
byte b = (byte)(_context.random().nextInt() & 0xFF);
if (b != 0x00) {
@@ -196,7 +199,11 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
private static final byte MASK_TYPE = FragmentHandler.MASK_TYPE;
/** is this the first of a fragmented message? */
private static final byte MASK_FRAGMENTED = FragmentHandler.MASK_FRAGMENTED;
- /** are there follow up headers? */
+
+ /**
+ * are there follow up headers?
+ * @deprecated unimplemented
+ */
private static final byte MASK_EXTENDED = FragmentHandler.MASK_EXTENDED;
private static final byte MASK_TUNNEL = (byte)(FragmentHandler.TYPE_TUNNEL << 5);
private static final byte MASK_ROUTER = (byte)(FragmentHandler.TYPE_ROUTER << 5);
@@ -311,19 +318,30 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
return offset;
}
+ /**
+ * @return generally 3 or 35 or 39 for first fragment, 7 for subsequent fragments.
+ *
+ * Does NOT include 4 for the message ID if the message will be fragmented;
+ * call getInstructionAugmentationSize() for that.
+ */
protected int getInstructionsSize(TunnelGateway.Pending msg) {
if (msg.getFragmentNumber() > 0)
return 7;
+ // control byte
int header = 1;
+ // tunnel ID
if (msg.getToTunnel() != null)
header += 4;
+ // router hash
if (msg.getToRouter() != null)
header += 32;
+ // size
header += 2;
return header;
}
+ /** @return 0 or 4 */
protected int getInstructionAugmentationSize(TunnelGateway.Pending msg, int offset, int instructionsSize) {
int payloadLength = msg.getData().length - msg.getOffset();
if (offset + payloadLength + instructionsSize + IV_SIZE + 1 + 4 > PREPROCESSED_SIZE) {
diff --git a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java
index 80426c238..05721d0fb 100644
--- a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java
+++ b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java
@@ -240,7 +240,7 @@ public class TunnelDispatcher implements Service {
TunnelGateway.Sender sender = new InboundSender(_context, cfg);
TunnelGateway.Receiver receiver = new InboundGatewayReceiver(_context, cfg);
//TunnelGateway gw = new TunnelGateway(_context, preproc, sender, receiver);
- TunnelGateway gw = new PumpedTunnelGateway(_context, preproc, sender, receiver, _pumper);
+ TunnelGateway gw = new ThrottledPumpedTunnelGateway(_context, preproc, sender, receiver, _pumper, cfg);
TunnelId recvId = cfg.getReceiveTunnel();
_inboundGateways.put(recvId, gw);
_participatingConfig.put(recvId, cfg);
diff --git a/router/java/src/net/i2p/router/tunnel/TunnelGateway.java b/router/java/src/net/i2p/router/tunnel/TunnelGateway.java
index 2b394e6da..1eb9897e9 100644
--- a/router/java/src/net/i2p/router/tunnel/TunnelGateway.java
+++ b/router/java/src/net/i2p/router/tunnel/TunnelGateway.java
@@ -3,12 +3,12 @@ package net.i2p.router.tunnel;
import java.util.ArrayList;
import java.util.List;
-import net.i2p.I2PAppContext;
import net.i2p.data.Hash;
import net.i2p.data.TunnelId;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.i2np.TunnelGatewayMessage;
import net.i2p.router.Router;
+import net.i2p.router.RouterContext;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
@@ -34,9 +34,9 @@ import net.i2p.util.SimpleTimer;
*
*/
public class TunnelGateway {
- protected I2PAppContext _context;
+ protected RouterContext _context;
protected Log _log;
- protected final List _queue;
+ protected final List _queue;
protected QueuePreprocessor _preprocessor;
protected Sender _sender;
protected Receiver _receiver;
@@ -53,7 +53,7 @@ public class TunnelGateway {
* @param receiver this receives the encrypted message and forwards it off
* to the first hop
*/
- public TunnelGateway(I2PAppContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver) {
+ public TunnelGateway(RouterContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver) {
_context = context;
_log = context.logManager().getLog(getClass());
_queue = new ArrayList(4);
@@ -110,7 +110,7 @@ public class TunnelGateway {
// expire any as necessary, even if its framented
for (int i = 0; i < _queue.size(); i++) {
- Pending m = (Pending)_queue.get(i);
+ Pending m = _queue.get(i);
if (m.getExpiration() + Router.CLOCK_FUDGE_FACTOR < _lastFlush) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Expire on the queue (size=" + _queue.size() + "): " + m);
@@ -280,11 +280,11 @@ public class TunnelGateway {
long beforeLock = _context.clock().now();
long afterChecked = -1;
long delayAmount = -1;
- if (_queue.size() > 10000) // stay out of the synchronized block
- System.out.println("foo!");
+ //if (_queue.size() > 10000) // stay out of the synchronized block
+ // System.out.println("foo!");
synchronized (_queue) {
- if (_queue.size() > 10000) // stay in the synchronized block
- System.out.println("foo!");
+ //if (_queue.size() > 10000) // stay in the synchronized block
+ // System.out.println("foo!");
afterChecked = _context.clock().now();
if (_queue.size() > 0) {
if ( (remaining > 0) && (_log.shouldLog(Log.DEBUG)) )
diff --git a/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java b/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java
index b840dad28..205086472 100644
--- a/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java
+++ b/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java
@@ -5,7 +5,6 @@ import java.util.List;
import net.i2p.router.RouterContext;
import net.i2p.util.I2PThread;
-import net.i2p.util.Log;
/**
* run through the tunnel gateways that have had messages added to them and push
@@ -13,14 +12,12 @@ import net.i2p.util.Log;
*/
public class TunnelGatewayPumper implements Runnable {
private RouterContext _context;
- private Log _log;
- private final List _wantsPumping;
+ private final List _wantsPumping;
private boolean _stop;
/** Creates a new instance of TunnelGatewayPumper */
public TunnelGatewayPumper(RouterContext ctx) {
_context = ctx;
- _log = ctx.logManager().getLog(getClass());
_wantsPumping = new ArrayList(64);
_stop = false;
for (int i = 0; i < 4; i++)
@@ -40,12 +37,12 @@ public class TunnelGatewayPumper implements Runnable {
public void run() {
PumpedTunnelGateway gw = null;
- List queueBuf = new ArrayList(32);
+ List queueBuf = new ArrayList(32);
while (!_stop) {
try {
synchronized (_wantsPumping) {
if (_wantsPumping.size() > 0)
- gw = (PumpedTunnelGateway)_wantsPumping.remove(0);
+ gw = _wantsPumping.remove(0);
else
_wantsPumping.wait();
}
diff --git a/router/java/src/net/i2p/router/tunnel/pool/PooledTunnelCreatorConfig.java b/router/java/src/net/i2p/router/tunnel/pool/PooledTunnelCreatorConfig.java
index dfbdd3eca..afe149fc6 100644
--- a/router/java/src/net/i2p/router/tunnel/pool/PooledTunnelCreatorConfig.java
+++ b/router/java/src/net/i2p/router/tunnel/pool/PooledTunnelCreatorConfig.java
@@ -55,6 +55,9 @@ public class PooledTunnelCreatorConfig extends TunnelCreatorConfig {
// remove us from the pool (but not the dispatcher) so that we aren't
// selected again. _expireJob is left to do its thing, in case there
// are any straggling messages coming down the tunnel
+ //
+ // Todo: Maybe delay or prevent failing if we are near tunnel build capacity,
+ // to prevent collapse (loss of all tunnels)
_pool.tunnelFailed(this);
if (_testJob != null) // just in case...
_context.jobQueue().removeJob(_testJob);