From 629d12ade12737d323d33a8a3084002cab511835 Mon Sep 17 00:00:00 2001 From: zzz Date: Tue, 22 Dec 2009 15:08:10 +0000 Subject: [PATCH] * Tunnels: - Do RED dropping before the IBGW fragmenter, not after - Change batch time to 250ms for IBGWs (was 100ms) - Change batch time to 150ms for exploratory OBGWs (was 100ms) - Start a new message in the fragmenter if almost full - Fix a major, longstanding synchronization bug in the FragmentHandler which led to corrupt messages at the endpoints - More cleanups and comments --- history.txt | 10 ++ .../src/net/i2p/router/RouterVersion.java | 2 +- .../router/tunnel/BatchedPreprocessor.java | 90 +++++++--- .../tunnel/BatchedRouterPreprocessor.java | 82 ++++++--- .../i2p/router/tunnel/FragmentHandler.java | 155 ++++++++++-------- .../i2p/router/tunnel/FragmentedMessage.java | 4 +- .../router/tunnel/InboundGatewayReceiver.java | 5 +- .../net/i2p/router/tunnel/InboundSender.java | 7 +- .../router/tunnel/PumpedTunnelGateway.java | 14 +- .../tunnel/ThrottledPumpedTunnelGateway.java | 49 ++++++ .../router/tunnel/TrivialPreprocessor.java | 20 ++- .../i2p/router/tunnel/TunnelDispatcher.java | 2 +- .../net/i2p/router/tunnel/TunnelGateway.java | 18 +- .../router/tunnel/TunnelGatewayPumper.java | 9 +- .../pool/PooledTunnelCreatorConfig.java | 3 + 15 files changed, 312 insertions(+), 158 deletions(-) create mode 100644 router/java/src/net/i2p/router/tunnel/ThrottledPumpedTunnelGateway.java diff --git a/history.txt b/history.txt index 4974fa3da..e23cb9cf0 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,13 @@ +2009-12-22 zzz + * Tunnels: + - Do RED dropping before the IBGW fragmenter, not after + - Change batch time to 250ms for IBGWs (was 100ms) + - Change batch time to 150ms for exploratory OBGWs (was 100ms) + - Start a new message in the fragmenter if almost full + - Fix a major, longstanding synchronization bug in the FragmentHandler + which led to corrupt messages at the endpoints + - More cleanups and comments + 2009-12-20 zzz * Console: - Fix status to show a disconnected network error rather than diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 1440f81c0..217d41d54 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 10; + public final static long BUILD = 11; /** for example "-test" */ public final static String EXTRA = ""; public final static String FULL_VERSION = VERSION + "-" + BUILD + EXTRA; diff --git a/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java b/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java index 30fbbdd27..cfaabcfcb 100644 --- a/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java +++ b/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java @@ -67,6 +67,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor { ctx.statManager().createRateStat("tunnel.batchFragmentation", "Avg. number of fragments per msg", "Tunnels", new long[] { 10*60*1000, 60*60*1000 }); } + /** 1003 */ private static final int FULL_SIZE = PREPROCESSED_SIZE - IV_SIZE - 1 // 0x00 ending the padding @@ -76,11 +77,28 @@ public class BatchedPreprocessor extends TrivialPreprocessor { /* not final or private so the test code can adjust */ static long DEFAULT_DELAY = 100; - /** Wait up to this long before sending (flushing) a small tunnel message */ + /** + * Wait up to this long before sending (flushing) a small tunnel message + * Warning - overridden in BatchedRouterPreprocessor + */ protected long getSendDelay() { return DEFAULT_DELAY; } - /** if we have 50 messages queued that are too small, flush them anyway */ - private static final int FORCE_BATCH_FLUSH = 50; + /** + * if we have this many messages queued that are too small, flush them anyway + * Even small messages take up about 200 bytes or so. + */ + private static final int FORCE_BATCH_FLUSH = 5; + + /** If we have this much allocated, flush anyway. + * Tune this to trade off padding vs. fragmentation. + * The lower the value, the more we are willing to send off + * a tunnel msg that isn't full so the next message can start + * in a new tunnel msg to minimize fragmentation. + * + * This should be at most FULL_SIZE - (39 + a few), since + * you want to at least fit in the instructions and a few bytes. + */ + private static final int FULL_ENOUGH_SIZE = (FULL_SIZE * 80) / 100; /** how long do we want to wait before flushing */ @Override @@ -100,6 +118,8 @@ public class BatchedPreprocessor extends TrivialPreprocessor { /* See TunnelGateway.QueuePreprocessor for Javadoc */ @Override public boolean preprocessQueue(List pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) { + if (_log.shouldLog(Log.INFO)) + display(0, pending, "Starting"); StringBuilder timingBuf = null; if (_log.shouldLog(Log.DEBUG)) { _log.debug("Preprocess queue with " + pending.size() + " to send"); @@ -155,9 +175,10 @@ public class BatchedPreprocessor extends TrivialPreprocessor { _context.statManager().addRateData("tunnel.batchFullFragments", 1, 0); long afterSend = System.currentTimeMillis(); if (_log.shouldLog(Log.INFO)) - _log.info("Allocated=" + allocated + " so we sent " + (i+1) - + " (last complete? " + (msg.getOffset() >= msg.getData().length) - + ", off=" + msg.getOffset() + ", count=" + pending.size() + ")"); + display(allocated, pending, "Sent the message with " + (i+1)); + //_log.info(_name + ": Allocated=" + allocated + "B, Sent " + (i+1) + // + " msgs (last complete? " + (msg.getOffset() >= msg.getData().length) + // + ", off=" + msg.getOffset() + ", pending=" + pending.size() + ")"); // Remove what we sent from the pending queue for (int j = 0; j < i; j++) { @@ -197,7 +218,6 @@ public class BatchedPreprocessor extends TrivialPreprocessor { timingBuf.append(" After pending loop " + (System.currentTimeMillis()-beforePendingLoop)).append("."); } // for - long afterCleared = System.currentTimeMillis(); if (_log.shouldLog(Log.INFO)) display(allocated, pending, "after looping to clear " + (beforeLooping - pending.size())); long afterDisplayed = System.currentTimeMillis(); @@ -205,7 +225,12 @@ public class BatchedPreprocessor extends TrivialPreprocessor { // After going through the entire pending list, we have only a partial message. // We might flush it or might not, but we are returning either way. - if ( (pending.size() > FORCE_BATCH_FLUSH) || ( (_pendingSince > 0) && (getDelayAmount() <= 0) ) ) { + if ( (pending.size() > FORCE_BATCH_FLUSH) || // enough msgs - or + ( (_pendingSince > 0) && (getDelayAmount() <= 0) ) || // time to flush - or + (allocated >= FULL_ENOUGH_SIZE)) { // full enough + //(pending.get(0).getFragmentNumber() > 0)) { // don't delay anybody's last fragment, + // // which would be the first fragment in the message + // not even a full message, but we want to flush it anyway if (pending.size() > 1) @@ -215,22 +240,24 @@ public class BatchedPreprocessor extends TrivialPreprocessor { send(pending, 0, pending.size()-1, sender, rec); _context.statManager().addRateData("tunnel.batchSmallFragments", FULL_SIZE - allocated, 0); - // Remove everything in the message from the pending queue + // Remove everything in the outgoing message from the pending queue int beforeSize = pending.size(); - for (int i = 0; i < pending.size(); i++) { - TunnelGateway.Pending cur = pending.get(i); - if (cur.getOffset() >= cur.getData().length) { - pending.remove(i); - notePreprocessing(cur.getMessageId(), cur.getFragmentNumber(), cur.getData().length, cur.getMessageIds(), "flushed remaining"); - _context.statManager().addRateData("tunnel.batchFragmentation", cur.getFragmentNumber() + 1, 0); - _context.statManager().addRateData("tunnel.writeDelay", cur.getLifetime(), cur.getData().length); - i--; - } + for (int i = 0; i < beforeSize; i++) { + TunnelGateway.Pending cur = pending.get(0); + if (cur.getOffset() < cur.getData().length) + break; + pending.remove(0); + notePreprocessing(cur.getMessageId(), cur.getFragmentNumber(), cur.getData().length, cur.getMessageIds(), "flushed remaining"); + _context.statManager().addRateData("tunnel.batchFragmentation", cur.getFragmentNumber() + 1, 0); + _context.statManager().addRateData("tunnel.writeDelay", cur.getLifetime(), cur.getData().length); } + if (pending.size() > 0) { + // rare _pendingSince = _context.clock().now(); _context.statManager().addRateData("tunnel.batchFlushRemaining", pending.size(), beforeSize); - display(allocated, pending, "flushed, some remain"); + if (_log.shouldLog(Log.INFO)) + display(allocated, pending, "flushed, some remain"); if (timingBuf != null) { timingBuf.append(" flushed, some remain (displayed to now: " + (System.currentTimeMillis()-afterDisplayed) + ")"); @@ -239,12 +266,15 @@ public class BatchedPreprocessor extends TrivialPreprocessor { } return true; } else { - long delayAmount = _context.clock().now() - _pendingSince; - _pendingSince = 0; + long delayAmount = 0; + if (_pendingSince > 0) { + delayAmount = _context.clock().now() - _pendingSince; + _pendingSince = 0; + } if (batchCount > 1) _context.statManager().addRateData("tunnel.batchCount", batchCount, 0); if (_log.shouldLog(Log.INFO)) - display(allocated, pending, "flushed " + (beforeSize) + ", no remaining after " + delayAmount); + display(allocated, pending, "flushed " + (beforeSize) + ", no remaining after " + delayAmount + "ms"); if (timingBuf != null) { timingBuf.append(" flushed, none remain (displayed to now: " + (System.currentTimeMillis()-afterDisplayed) + ")"); @@ -262,7 +292,8 @@ public class BatchedPreprocessor extends TrivialPreprocessor { if (batchCount > 1) _context.statManager().addRateData("tunnel.batchCount", batchCount, 0); // not yet time to send the delayed flush - display(allocated, pending, "dont flush"); + if (_log.shouldLog(Log.INFO)) + display(allocated, pending, "dont flush"); if (timingBuf != null) { timingBuf.append(" dont flush (displayed to now: " + (System.currentTimeMillis()-afterDisplayed) + ")"); @@ -293,20 +324,25 @@ public class BatchedPreprocessor extends TrivialPreprocessor { return false; } + /* + * Only if Log.INFO + * + * title: allocated: X pending: X (delay: X) [0]:offset/length/lifetime [1]:etc. + */ private void display(long allocated, List pending, String title) { if (_log.shouldLog(Log.INFO)) { long highestDelay = 0; - StringBuilder buf = new StringBuilder(); + StringBuilder buf = new StringBuilder(128); buf.append(_name).append(": "); buf.append(title); - buf.append(" allocated: ").append(allocated); + buf.append(" - allocated: ").append(allocated); buf.append(" pending: ").append(pending.size()); if (_pendingSince > 0) buf.append(" delay: ").append(getDelayAmount(false)); for (int i = 0; i < pending.size(); i++) { TunnelGateway.Pending curPending = pending.get(i); - buf.append(" pending[").append(i).append("]: "); - buf.append(curPending.getOffset()).append("/").append(curPending.getData().length).append('/'); + buf.append(" [").append(i).append("]:"); + buf.append(curPending.getOffset()).append('/').append(curPending.getData().length).append('/'); buf.append(curPending.getLifetime()); if (curPending.getLifetime() > highestDelay) highestDelay = curPending.getLifetime(); diff --git a/router/java/src/net/i2p/router/tunnel/BatchedRouterPreprocessor.java b/router/java/src/net/i2p/router/tunnel/BatchedRouterPreprocessor.java index 6af65e88e..71d80b011 100644 --- a/router/java/src/net/i2p/router/tunnel/BatchedRouterPreprocessor.java +++ b/router/java/src/net/i2p/router/tunnel/BatchedRouterPreprocessor.java @@ -14,68 +14,96 @@ public class BatchedRouterPreprocessor extends BatchedPreprocessor { protected RouterContext _routerContext; private TunnelCreatorConfig _config; protected HopConfig _hopConfig; + private final long _sendDelay; /** * How frequently should we flush non-full messages, in milliseconds + * This goes in I2CP custom options for the pool. + * Only applies to OBGWs. */ public static final String PROP_BATCH_FREQUENCY = "batchFrequency"; + /** This goes in router advanced config */ public static final String PROP_ROUTER_BATCH_FREQUENCY = "router.batchFrequency"; - public static final int DEFAULT_BATCH_FREQUENCY = 100; + /** for client OBGWs only (our data) */ + public static final int OB_CLIENT_BATCH_FREQ = 100; + /** for exploratory OBGWs only (our tunnel tests and build messages) */ + public static final int OB_EXPL_BATCH_FREQ = 150; + /** for IBGWs for efficiency (not our data) */ + public static final int DEFAULT_BATCH_FREQUENCY = 250; - public BatchedRouterPreprocessor(RouterContext ctx) { - this(ctx, (HopConfig)null); - } + /** for OBGWs */ public BatchedRouterPreprocessor(RouterContext ctx, TunnelCreatorConfig cfg) { super(ctx, getName(cfg)); _routerContext = ctx; _config = cfg; + _sendDelay = initialSendDelay(); } + + /** for IBGWs */ public BatchedRouterPreprocessor(RouterContext ctx, HopConfig cfg) { super(ctx, getName(cfg)); _routerContext = ctx; _hopConfig = cfg; + _sendDelay = initialSendDelay(); } private static String getName(HopConfig cfg) { - if (cfg == null) return "[unknown]"; + if (cfg == null) return "IB??"; if (cfg.getReceiveTunnel() != null) - return cfg.getReceiveTunnel().getTunnelId() + ""; + return "IB " + cfg.getReceiveTunnel().getTunnelId(); else if (cfg.getSendTunnel() != null) - return cfg.getSendTunnel().getTunnelId() + ""; + return "IB " + cfg.getSendTunnel().getTunnelId(); else - return "[n/a]"; + return "IB??"; } private static String getName(TunnelCreatorConfig cfg) { - if (cfg == null) return "[unknown]"; + if (cfg == null) return "OB??"; if (cfg.getReceiveTunnelId(0) != null) - return cfg.getReceiveTunnelId(0).getTunnelId() + ""; + return "OB " + cfg.getReceiveTunnelId(0).getTunnelId(); else if (cfg.getSendTunnelId(0) != null) - return cfg.getSendTunnelId(0).getTunnelId() + ""; + return "OB " + cfg.getSendTunnelId(0).getTunnelId(); else - return "[n/a]"; + return "OB??"; } - /** how long should we wait before flushing */ + /** + * how long should we wait before flushing + */ @Override - protected long getSendDelay() { - String freq = null; + protected long getSendDelay() { return _sendDelay; } + + /* + * Extend the batching time for exploratory OBGWs, they have a lot of small + * tunnel test messages, and build messages that don't fit perfectly. + * And these are not as delay-sensitive. + * + * We won't pick up config changes after the preprocessor is created, + * but a preprocessor lifetime is only 10 minutes, so just wait... + */ + private long initialSendDelay() { if (_config != null) { Properties opts = _config.getOptions(); - if (opts != null) - freq = opts.getProperty(PROP_BATCH_FREQUENCY); - } - if (freq == null) - freq = _routerContext.getProperty(PROP_ROUTER_BATCH_FREQUENCY); - - if (freq != null) { - try { - return Integer.parseInt(freq); - } catch (NumberFormatException nfe) { - return DEFAULT_BATCH_FREQUENCY; + if (opts != null) { + String freq = opts.getProperty(PROP_BATCH_FREQUENCY); + if (freq != null) { + try { + return Integer.parseInt(freq); + } catch (NumberFormatException nfe) {} + } } } - return DEFAULT_BATCH_FREQUENCY; + + int def; + if (_config != null) { + if (_config.getDestination() != null) + def = OB_CLIENT_BATCH_FREQ; + else + def = OB_EXPL_BATCH_FREQ; + } else { + def = DEFAULT_BATCH_FREQUENCY; + } + return _routerContext.getProperty(PROP_ROUTER_BATCH_FREQUENCY, def); } @Override diff --git a/router/java/src/net/i2p/router/tunnel/FragmentHandler.java b/router/java/src/net/i2p/router/tunnel/FragmentHandler.java index f294d5c78..6e4c00e1c 100644 --- a/router/java/src/net/i2p/router/tunnel/FragmentHandler.java +++ b/router/java/src/net/i2p/router/tunnel/FragmentHandler.java @@ -39,29 +39,32 @@ data into the raw tunnel payload:

  • a series of zero or more { instructions, message } pairs
  • +

    Note that the padding, if any, must be before the instruction/message pairs. +there is no provision for padding at the end.

    +

    The instructions are encoded with a single control byte, followed by any necessary additional information. The first bit in that control byte determines how the remainder of the header is interpreted - if it is not set, the message is either not fragmented or this is the first fragment in the message. If it is set, this is a follow on fragment.

    -

    With the first bit being 0, the instructions are:

    +

    With the first (leftmost or MSB) bit being 0, the instructions are:

    • 1 byte control byte:
             bit 0: is follow on fragment?  (1 = true, 0 = false, must be 0)
          bits 1-2: delivery type
                    (0x0 = LOCAL, 0x01 = TUNNEL, 0x02 = ROUTER)
      -      bit 3: delay included?  (1 = true, 0 = false)
      +      bit 3: delay included?  (1 = true, 0 = false) (unimplemented)
             bit 4: fragmented?  (1 = true, 0 = false)
      -      bit 5: extended options?  (1 = true, 0 = false)
      +      bit 5: extended options?  (1 = true, 0 = false) (unimplemented)
          bits 6-7: reserved
    • if the delivery type was TUNNEL, a 4 byte tunnel ID
    • if the delivery type was TUNNEL or ROUTER, a 32 byte router hash
    • -
    • if the delay included flag is true, a 1 byte value:
      +
    • if the delay included flag is true, a 1 byte value (unimplemented):
             bit 0: type (0 = strict, 1 = randomized)
          bits 1-7: delay exponent (2^value minutes)
    • if the fragmented flag is true, a 4 byte message ID
    • -
    • if the extended options flag is true:
      +
    • if the extended options flag is true (unimplemented):
          = a 1 byte option size (in bytes)
          = that many bytes
    • 2 byte size of the I2NP message or this fragment
    • @@ -85,10 +88,11 @@ preprocessed payload must be padded to a multiple of 16 bytes.

      public class FragmentHandler { private I2PAppContext _context; private Log _log; - private final Map _fragmentedMessages; + private final Map _fragmentedMessages; private DefragmentedReceiver _receiver; private int _completed; private int _failed; + private static final long[] RATES = { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000 }; /** don't wait more than 60s to defragment the partial message */ static long MAX_DEFRAGMENT_TIME = 60*1000; @@ -97,18 +101,18 @@ public class FragmentHandler { public FragmentHandler(I2PAppContext context, DefragmentedReceiver receiver) { _context = context; _log = context.logManager().getLog(FragmentHandler.class); - _fragmentedMessages = new HashMap(4); + _fragmentedMessages = new HashMap(8); _receiver = receiver; _context.statManager().createRateStat("tunnel.smallFragments", "How many pad bytes are in small fragments?", - "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000 }); + "Tunnels", RATES); _context.statManager().createRateStat("tunnel.fullFragments", "How many tunnel messages use the full data area?", - "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000 }); + "Tunnels", RATES); _context.statManager().createRateStat("tunnel.fragmentedComplete", "How many fragments were in a completely received message?", - "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000 }); + "Tunnels", RATES); _context.statManager().createRateStat("tunnel.fragmentedDropped", "How many fragments were in a partially received yet failed message?", - "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000 }); + "Tunnels", RATES); _context.statManager().createRateStat("tunnel.corruptMessage", "How many corrupted messages arrived?", - "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000 }); + "Tunnels", RATES); } /** @@ -233,7 +237,7 @@ public class FragmentHandler { + " != " + Base64.encode(v.getData(), 0, 4)); _log.warn("No matching endpoint: # pad bytes: " + (paddingEnd-(HopProcessor.IV_LENGTH+4)-1) + " offset=" + offset + " length=" + length + " paddingEnd=" + paddingEnd + ' ' - + Base64.encode(preprocessed, offset, length)); + + Base64.encode(preprocessed, offset, length), new Exception("trace")); } } @@ -254,11 +258,12 @@ public class FragmentHandler { static final byte MASK_TYPE = (byte)(3 << 5); /** is this the first of a fragmented message? */ static final byte MASK_FRAGMENTED = (byte)(1 << 3); - /** are there follow up headers? */ + /** are there follow up headers? UNIMPLEMENTED */ static final byte MASK_EXTENDED = (byte)(1 << 2); /** for subsequent fragments, which bits contain the fragment #? */ private static final int MASK_FRAGMENT_NUM = (byte)((1 << 7) - 2); // 0x7E; + /** LOCAL isn't explicitly used anywhere, because the code knows that it is 0 */ static final short TYPE_LOCAL = 0; static final short TYPE_TUNNEL = 1; static final short TYPE_ROUTER = 2; @@ -268,8 +273,8 @@ public class FragmentHandler { */ private int receiveFragment(byte preprocessed[], int offset, int length) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("CONTROL: " + Integer.toHexString(preprocessed[offset]) + " / " - + "/" + Base64.encode(preprocessed, offset, 1) + " at offset " + offset); + _log.debug("CONTROL: 0x" + Integer.toHexString(preprocessed[offset] & 0xff) + + " at offset " + offset); if (0 == (preprocessed[offset] & MASK_IS_SUBSEQUENT)) return receiveInitialFragment(preprocessed, offset, length); else @@ -330,42 +335,48 @@ public class FragmentHandler { int size = (int)DataHelper.fromLong(preprocessed, offset, 2); offset += 2; - boolean isNew = false; FragmentedMessage msg = null; if (fragmented) { synchronized (_fragmentedMessages) { - msg = (FragmentedMessage)_fragmentedMessages.get(new Long(messageId)); + msg = _fragmentedMessages.get(new Long(messageId)); if (msg == null) { msg = new FragmentedMessage(_context); _fragmentedMessages.put(new Long(messageId), msg); - isNew = true; } } } else { msg = new FragmentedMessage(_context); } - boolean ok = msg.receive(messageId, preprocessed, offset, size, !fragmented, router, tunnelId); - if (!ok) return -1; - if (msg.isComplete()) { - if (fragmented) { - synchronized (_fragmentedMessages) { - _fragmentedMessages.remove(new Long(messageId)); + if (fragmented) { + // synchronized is required, fragments may be arriving in different threads + synchronized(msg) { + boolean ok = msg.receive(messageId, preprocessed, offset, size, false, router, tunnelId); + if (!ok) return -1; + if (msg.isComplete()) { + synchronized (_fragmentedMessages) { + _fragmentedMessages.remove(new Long(messageId)); + } + if (msg.getExpireEvent() != null) + SimpleTimer.getInstance().removeEvent(msg.getExpireEvent()); + receiveComplete(msg); + } else { + noteReception(msg.getMessageId(), 0, msg); + if (msg.getExpireEvent() == null) { + RemoveFailed evt = new RemoveFailed(msg); + msg.setExpireEvent(evt); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("In " + MAX_DEFRAGMENT_TIME + " dropping " + messageId); + SimpleTimer.getInstance().addEvent(evt, MAX_DEFRAGMENT_TIME); + } } } - if (msg.getExpireEvent() != null) - SimpleTimer.getInstance().removeEvent(msg.getExpireEvent()); + } else { + // synchronized not required if !fragmented + boolean ok = msg.receive(messageId, preprocessed, offset, size, true, router, tunnelId); + if (!ok) return -1; + // always complete, never an expire event receiveComplete(msg); - } else { - noteReception(msg.getMessageId(), 0, msg); - } - - if (isNew && fragmented && !msg.isComplete()) { - RemoveFailed evt = new RemoveFailed(msg); - msg.setExpireEvent(evt); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("In " + MAX_DEFRAGMENT_TIME + " dropping " + messageId); - SimpleTimer.getInstance().addEvent(evt, MAX_DEFRAGMENT_TIME); } offset += size; @@ -397,38 +408,38 @@ public class FragmentHandler { throw new RuntimeException("Preprocessed message was invalid [messageId =" + messageId + " size=" + size + " offset=" + offset + " fragment=" + fragmentNum); - boolean isNew = false; FragmentedMessage msg = null; synchronized (_fragmentedMessages) { - msg = (FragmentedMessage)_fragmentedMessages.get(new Long(messageId)); + msg = _fragmentedMessages.get(new Long(messageId)); if (msg == null) { msg = new FragmentedMessage(_context); _fragmentedMessages.put(new Long(messageId), msg); - isNew = true; } } - boolean ok = msg.receive(messageId, fragmentNum, preprocessed, offset, size, isLast); - if (!ok) return -1; - - if (msg.isComplete()) { - synchronized (_fragmentedMessages) { - _fragmentedMessages.remove(new Long(messageId)); + // synchronized is required, fragments may be arriving in different threads + synchronized(msg) { + boolean ok = msg.receive(messageId, fragmentNum, preprocessed, offset, size, isLast); + if (!ok) return -1; + + if (msg.isComplete()) { + synchronized (_fragmentedMessages) { + _fragmentedMessages.remove(new Long(messageId)); + } + if (msg.getExpireEvent() != null) + SimpleTimer.getInstance().removeEvent(msg.getExpireEvent()); + _context.statManager().addRateData("tunnel.fragmentedComplete", msg.getFragmentCount(), msg.getLifetime()); + receiveComplete(msg); + } else { + noteReception(msg.getMessageId(), fragmentNum, msg); + if (msg.getExpireEvent() == null) { + RemoveFailed evt = new RemoveFailed(msg); + msg.setExpireEvent(evt); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("In " + MAX_DEFRAGMENT_TIME + " dropping " + msg.getMessageId() + "/" + fragmentNum); + SimpleTimer.getInstance().addEvent(evt, MAX_DEFRAGMENT_TIME); + } } - if (msg.getExpireEvent() != null) - SimpleTimer.getInstance().removeEvent(msg.getExpireEvent()); - _context.statManager().addRateData("tunnel.fragmentedComplete", msg.getFragmentCount(), msg.getLifetime()); - receiveComplete(msg); - } else { - noteReception(msg.getMessageId(), fragmentNum, msg); - } - - if (isNew && !msg.isComplete()) { - RemoveFailed evt = new RemoveFailed(msg); - msg.setExpireEvent(evt); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("In " + MAX_DEFRAGMENT_TIME + " dropping " + msg.getMessageId() + "/" + fragmentNum); - SimpleTimer.getInstance().addEvent(evt, MAX_DEFRAGMENT_TIME); } offset += size; @@ -449,8 +460,8 @@ public class FragmentHandler { if (data == null) throw new I2NPMessageException("null data"); // fragments already released??? if (_log.shouldLog(Log.DEBUG)) - _log.debug("RECV(" + data.length + "): " + Base64.encode(data) - + " " + _context.sha().calculateHash(data).toBase64()); + _log.debug("RECV(" + data.length + "): "); // + Base64.encode(data) + //+ " " + _context.sha().calculateHash(data).toBase64()); I2NPMessage m = new I2NPMessageHandler(_context).readMessage(data); noteReception(m.getUniqueId(), fragmentCount-1, "complete: ");// + msg.toString()); noteCompletion(m.getUniqueId()); @@ -498,15 +509,17 @@ public class FragmentHandler { synchronized (_fragmentedMessages) { removed = (null != _fragmentedMessages.remove(new Long(_msg.getMessageId()))); } - if (removed && !_msg.getReleased()) { - _failed++; - noteFailure(_msg.getMessageId(), _msg.toString()); - if (_log.shouldLog(Log.WARN)) - _log.warn("Dropped failed fragmented message: " + _msg); - _context.statManager().addRateData("tunnel.fragmentedDropped", _msg.getFragmentCount(), _msg.getLifetime()); - _msg.failed(); - } else { - // succeeded before timeout + synchronized (_msg) { + if (removed && !_msg.getReleased()) { + _failed++; + noteFailure(_msg.getMessageId(), _msg.toString()); + if (_log.shouldLog(Log.WARN)) + _log.warn("Dropped failed fragmented message: " + _msg); + _context.statManager().addRateData("tunnel.fragmentedDropped", _msg.getFragmentCount(), _msg.getLifetime()); + _msg.failed(); + } else { + // succeeded before timeout + } } } diff --git a/router/java/src/net/i2p/router/tunnel/FragmentedMessage.java b/router/java/src/net/i2p/router/tunnel/FragmentedMessage.java index 77f54bb2c..10841701d 100644 --- a/router/java/src/net/i2p/router/tunnel/FragmentedMessage.java +++ b/router/java/src/net/i2p/router/tunnel/FragmentedMessage.java @@ -21,6 +21,8 @@ import net.i2p.util.SimpleTimer; * Gather fragments of I2NPMessages at a tunnel endpoint, making them available * for reading when complete. * + * Warning - this is all unsynchronized here - receivers must implement synchronization + * */ public class FragmentedMessage { private I2PAppContext _context; @@ -282,7 +284,7 @@ public class FragmentedMessage { if (ba != null) buf.append(i).append(":").append(ba.getValid()).append(" bytes "); else - buf.append(i).append(": missing "); + buf.append(i).append(":missing "); } buf.append(" highest received: ").append(_highFragmentNum); buf.append(" last received? ").append(_lastReceived); diff --git a/router/java/src/net/i2p/router/tunnel/InboundGatewayReceiver.java b/router/java/src/net/i2p/router/tunnel/InboundGatewayReceiver.java index 382be541f..ab39adf89 100644 --- a/router/java/src/net/i2p/router/tunnel/InboundGatewayReceiver.java +++ b/router/java/src/net/i2p/router/tunnel/InboundGatewayReceiver.java @@ -35,8 +35,9 @@ public class InboundGatewayReceiver implements TunnelGateway.Receiver { } } - if (_context.tunnelDispatcher().shouldDropParticipatingMessage("IBGW", encrypted.length)) - return -1; + // We do this before the preprocessor now (i.e. before fragmentation) + //if (_context.tunnelDispatcher().shouldDropParticipatingMessage("IBGW", encrypted.length)) + // return -1; _config.incrementSentMessages(); TunnelDataMessage msg = new TunnelDataMessage(_context); msg.setData(encrypted); diff --git a/router/java/src/net/i2p/router/tunnel/InboundSender.java b/router/java/src/net/i2p/router/tunnel/InboundSender.java index 1fd1b915d..14c911f93 100644 --- a/router/java/src/net/i2p/router/tunnel/InboundSender.java +++ b/router/java/src/net/i2p/router/tunnel/InboundSender.java @@ -1,7 +1,6 @@ package net.i2p.router.tunnel; import net.i2p.I2PAppContext; -import net.i2p.util.Log; /** * Receive the preprocessed data for an inbound gateway, encrypt it, and forward @@ -9,16 +8,12 @@ import net.i2p.util.Log; * */ public class InboundSender implements TunnelGateway.Sender { - private I2PAppContext _context; - private Log _log; private InboundGatewayProcessor _processor; static final boolean USE_ENCRYPTION = HopProcessor.USE_ENCRYPTION; public InboundSender(I2PAppContext ctx, HopConfig config) { - _context = ctx; - _log = ctx.logManager().getLog(InboundSender.class); - _processor = new InboundGatewayProcessor(_context, config); + _processor = new InboundGatewayProcessor(ctx, config); } public long sendPreprocessed(byte[] preprocessed, TunnelGateway.Receiver receiver) { diff --git a/router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java b/router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java index 9364092df..2d4f53855 100644 --- a/router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java +++ b/router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java @@ -3,14 +3,16 @@ package net.i2p.router.tunnel; import java.util.ArrayList; import java.util.List; -import net.i2p.I2PAppContext; import net.i2p.data.Hash; import net.i2p.data.TunnelId; import net.i2p.data.i2np.I2NPMessage; import net.i2p.router.Router; +import net.i2p.router.RouterContext; import net.i2p.util.Log; /** + * This is used for all gateways with more than zero hops. + * * Serve as the gatekeeper for a tunnel, accepting messages, coallescing and/or * fragmenting them before wrapping them up for tunnel delivery. The flow here * is:
        @@ -32,7 +34,7 @@ import net.i2p.util.Log; * */ public class PumpedTunnelGateway extends TunnelGateway { - private final List _prequeue; + private final List _prequeue; private TunnelGatewayPumper _pumper; /** @@ -43,7 +45,7 @@ public class PumpedTunnelGateway extends TunnelGateway { * @param receiver this receives the encrypted message and forwards it off * to the first hop */ - public PumpedTunnelGateway(I2PAppContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver, TunnelGatewayPumper pumper) { + public PumpedTunnelGateway(RouterContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver, TunnelGatewayPumper pumper) { super(context, preprocessor, sender, receiver); _prequeue = new ArrayList(4); _pumper = pumper; @@ -78,7 +80,7 @@ public class PumpedTunnelGateway extends TunnelGateway { * go quickly, rather than blocking its callers on potentially substantial * processing. */ - void pump(List queueBuf) { + void pump(List queueBuf) { synchronized (_prequeue) { if (_prequeue.size() > 0) { queueBuf.addAll(_prequeue); @@ -88,7 +90,7 @@ public class PumpedTunnelGateway extends TunnelGateway { } } long startAdd = System.currentTimeMillis(); - long beforeLock = System.currentTimeMillis(); + long beforeLock = startAdd; long afterAdded = -1; boolean delayedFlush = false; long delayAmount = -1; @@ -108,7 +110,7 @@ public class PumpedTunnelGateway extends TunnelGateway { // expire any as necessary, even if its framented for (int i = 0; i < _queue.size(); i++) { - Pending m = (Pending)_queue.get(i); + Pending m = _queue.get(i); if (m.getExpiration() + Router.CLOCK_FUDGE_FACTOR < _lastFlush) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Expire on the queue (size=" + _queue.size() + "): " + m); diff --git a/router/java/src/net/i2p/router/tunnel/ThrottledPumpedTunnelGateway.java b/router/java/src/net/i2p/router/tunnel/ThrottledPumpedTunnelGateway.java new file mode 100644 index 000000000..5f415b9c3 --- /dev/null +++ b/router/java/src/net/i2p/router/tunnel/ThrottledPumpedTunnelGateway.java @@ -0,0 +1,49 @@ +package net.i2p.router.tunnel; + +import java.util.ArrayList; +import java.util.List; + +import net.i2p.data.Hash; +import net.i2p.data.TunnelId; +import net.i2p.data.i2np.I2NPMessage; +import net.i2p.router.RouterContext; + +/** + * Same as PTG, but check to see if a message should be dropped before queueing it. + * Used for IBGWs. + * + */ +public class ThrottledPumpedTunnelGateway extends PumpedTunnelGateway { + /** saved so we can note messages that get dropped */ + private HopConfig _config; + + public ThrottledPumpedTunnelGateway(RouterContext context, QueuePreprocessor preprocessor, Sender sender, + Receiver receiver, TunnelGatewayPumper pumper, HopConfig config) { + super(context, preprocessor, sender, receiver, pumper); + _config = config; + } + + /** + * Possibly drop a message due to bandwidth before adding it to the preprocessor queue. + * We do this here instead of in the InboundGatewayReceiver because it is much smarter to drop + * whole I2NP messages, where we know the message type and length, rather than + * tunnel messages containing I2NP fragments. + */ + @Override + public void add(I2NPMessage msg, Hash toRouter, TunnelId toTunnel) { + //_log.error("IBGW count: " + _config.getProcessedMessagesCount() + " type: " + msg.getType() + " size: " + msg.getMessageSize()); + + // Hard to do this exactly, but we'll assume 2:1 batching + // for the purpose of estimating outgoing size. + // We assume that it's the outbound bandwidth that is the issue... + int size = Math.max(msg.getMessageSize(), 1024/2); + if (_context.tunnelDispatcher().shouldDropParticipatingMessage("IBGW " + msg.getType(), size)) { + // this overstates the stat somewhat, but ok for now + int kb = (size + 1023) / 1024; + for (int i = 0; i < kb; i++) + _config.incrementProcessedMessages(); + return; + } + super.add(msg, toRouter,toTunnel); + } +} diff --git a/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java b/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java index bd7bc7362..6cda3071f 100644 --- a/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java +++ b/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java @@ -17,6 +17,7 @@ import net.i2p.util.Log; * each of those out. This does not coallesce message fragments or delay for more * optimal throughput. * + * See FragmentHandler Javadoc for tunnel message fragment format */ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor { protected RouterContext _context; @@ -175,6 +176,8 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor { //_log.debug("# pad bytes: " + numPadBytes + " payloadLength: " + payloadLength + " instructions: " + instructionsLength); int paddingRemaining = numPadBytes; + // FIXME inefficient, waste of 3/4 of the entropy + // Should get a byte array of random, change all the zeros to something else, and ArrayCopy while (paddingRemaining > 0) { byte b = (byte)(_context.random().nextInt() & 0xFF); if (b != 0x00) { @@ -196,7 +199,11 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor { private static final byte MASK_TYPE = FragmentHandler.MASK_TYPE; /** is this the first of a fragmented message? */ private static final byte MASK_FRAGMENTED = FragmentHandler.MASK_FRAGMENTED; - /** are there follow up headers? */ + + /** + * are there follow up headers? + * @deprecated unimplemented + */ private static final byte MASK_EXTENDED = FragmentHandler.MASK_EXTENDED; private static final byte MASK_TUNNEL = (byte)(FragmentHandler.TYPE_TUNNEL << 5); private static final byte MASK_ROUTER = (byte)(FragmentHandler.TYPE_ROUTER << 5); @@ -311,19 +318,30 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor { return offset; } + /** + * @return generally 3 or 35 or 39 for first fragment, 7 for subsequent fragments. + * + * Does NOT include 4 for the message ID if the message will be fragmented; + * call getInstructionAugmentationSize() for that. + */ protected int getInstructionsSize(TunnelGateway.Pending msg) { if (msg.getFragmentNumber() > 0) return 7; + // control byte int header = 1; + // tunnel ID if (msg.getToTunnel() != null) header += 4; + // router hash if (msg.getToRouter() != null) header += 32; + // size header += 2; return header; } + /** @return 0 or 4 */ protected int getInstructionAugmentationSize(TunnelGateway.Pending msg, int offset, int instructionsSize) { int payloadLength = msg.getData().length - msg.getOffset(); if (offset + payloadLength + instructionsSize + IV_SIZE + 1 + 4 > PREPROCESSED_SIZE) { diff --git a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java index 80426c238..05721d0fb 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java @@ -240,7 +240,7 @@ public class TunnelDispatcher implements Service { TunnelGateway.Sender sender = new InboundSender(_context, cfg); TunnelGateway.Receiver receiver = new InboundGatewayReceiver(_context, cfg); //TunnelGateway gw = new TunnelGateway(_context, preproc, sender, receiver); - TunnelGateway gw = new PumpedTunnelGateway(_context, preproc, sender, receiver, _pumper); + TunnelGateway gw = new ThrottledPumpedTunnelGateway(_context, preproc, sender, receiver, _pumper, cfg); TunnelId recvId = cfg.getReceiveTunnel(); _inboundGateways.put(recvId, gw); _participatingConfig.put(recvId, cfg); diff --git a/router/java/src/net/i2p/router/tunnel/TunnelGateway.java b/router/java/src/net/i2p/router/tunnel/TunnelGateway.java index 2b394e6da..1eb9897e9 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelGateway.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelGateway.java @@ -3,12 +3,12 @@ package net.i2p.router.tunnel; import java.util.ArrayList; import java.util.List; -import net.i2p.I2PAppContext; import net.i2p.data.Hash; import net.i2p.data.TunnelId; import net.i2p.data.i2np.I2NPMessage; import net.i2p.data.i2np.TunnelGatewayMessage; import net.i2p.router.Router; +import net.i2p.router.RouterContext; import net.i2p.util.Log; import net.i2p.util.SimpleTimer; @@ -34,9 +34,9 @@ import net.i2p.util.SimpleTimer; * */ public class TunnelGateway { - protected I2PAppContext _context; + protected RouterContext _context; protected Log _log; - protected final List _queue; + protected final List _queue; protected QueuePreprocessor _preprocessor; protected Sender _sender; protected Receiver _receiver; @@ -53,7 +53,7 @@ public class TunnelGateway { * @param receiver this receives the encrypted message and forwards it off * to the first hop */ - public TunnelGateway(I2PAppContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver) { + public TunnelGateway(RouterContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver) { _context = context; _log = context.logManager().getLog(getClass()); _queue = new ArrayList(4); @@ -110,7 +110,7 @@ public class TunnelGateway { // expire any as necessary, even if its framented for (int i = 0; i < _queue.size(); i++) { - Pending m = (Pending)_queue.get(i); + Pending m = _queue.get(i); if (m.getExpiration() + Router.CLOCK_FUDGE_FACTOR < _lastFlush) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Expire on the queue (size=" + _queue.size() + "): " + m); @@ -280,11 +280,11 @@ public class TunnelGateway { long beforeLock = _context.clock().now(); long afterChecked = -1; long delayAmount = -1; - if (_queue.size() > 10000) // stay out of the synchronized block - System.out.println("foo!"); + //if (_queue.size() > 10000) // stay out of the synchronized block + // System.out.println("foo!"); synchronized (_queue) { - if (_queue.size() > 10000) // stay in the synchronized block - System.out.println("foo!"); + //if (_queue.size() > 10000) // stay in the synchronized block + // System.out.println("foo!"); afterChecked = _context.clock().now(); if (_queue.size() > 0) { if ( (remaining > 0) && (_log.shouldLog(Log.DEBUG)) ) diff --git a/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java b/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java index b840dad28..205086472 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java @@ -5,7 +5,6 @@ import java.util.List; import net.i2p.router.RouterContext; import net.i2p.util.I2PThread; -import net.i2p.util.Log; /** * run through the tunnel gateways that have had messages added to them and push @@ -13,14 +12,12 @@ import net.i2p.util.Log; */ public class TunnelGatewayPumper implements Runnable { private RouterContext _context; - private Log _log; - private final List _wantsPumping; + private final List _wantsPumping; private boolean _stop; /** Creates a new instance of TunnelGatewayPumper */ public TunnelGatewayPumper(RouterContext ctx) { _context = ctx; - _log = ctx.logManager().getLog(getClass()); _wantsPumping = new ArrayList(64); _stop = false; for (int i = 0; i < 4; i++) @@ -40,12 +37,12 @@ public class TunnelGatewayPumper implements Runnable { public void run() { PumpedTunnelGateway gw = null; - List queueBuf = new ArrayList(32); + List queueBuf = new ArrayList(32); while (!_stop) { try { synchronized (_wantsPumping) { if (_wantsPumping.size() > 0) - gw = (PumpedTunnelGateway)_wantsPumping.remove(0); + gw = _wantsPumping.remove(0); else _wantsPumping.wait(); } diff --git a/router/java/src/net/i2p/router/tunnel/pool/PooledTunnelCreatorConfig.java b/router/java/src/net/i2p/router/tunnel/pool/PooledTunnelCreatorConfig.java index dfbdd3eca..afe149fc6 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/PooledTunnelCreatorConfig.java +++ b/router/java/src/net/i2p/router/tunnel/pool/PooledTunnelCreatorConfig.java @@ -55,6 +55,9 @@ public class PooledTunnelCreatorConfig extends TunnelCreatorConfig { // remove us from the pool (but not the dispatcher) so that we aren't // selected again. _expireJob is left to do its thing, in case there // are any straggling messages coming down the tunnel + // + // Todo: Maybe delay or prevent failing if we are near tunnel build capacity, + // to prevent collapse (loss of all tunnels) _pool.tunnelFailed(this); if (_testJob != null) // just in case... _context.jobQueue().removeJob(_testJob);