* Tunnels:

- Do RED dropping before the IBGW fragmenter, not after
      - Change batch time to 250ms for IBGWs (was 100ms)
      - Change batch time to 150ms for exploratory OBGWs (was 100ms)
      - Start a new message in the fragmenter if almost full
      - Fix a major, longstanding synchronization bug in the FragmentHandler
        which led to corrupt messages at the endpoints
      - More cleanups and comments
This commit is contained in:
zzz
2009-12-22 15:08:10 +00:00
parent 08929752a6
commit 629d12ade1
15 changed files with 312 additions and 158 deletions

View File

@ -1,3 +1,13 @@
2009-12-22 zzz
* Tunnels:
- Do RED dropping before the IBGW fragmenter, not after
- Change batch time to 250ms for IBGWs (was 100ms)
- Change batch time to 150ms for exploratory OBGWs (was 100ms)
- Start a new message in the fragmenter if almost full
- Fix a major, longstanding synchronization bug in the FragmentHandler
which led to corrupt messages at the endpoints
- More cleanups and comments
2009-12-20 zzz 2009-12-20 zzz
* Console: * Console:
- Fix status to show a disconnected network error rather than - Fix status to show a disconnected network error rather than

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */ /** deprecated */
public final static String ID = "Monotone"; public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION; public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 10; public final static long BUILD = 11;
/** for example "-test" */ /** for example "-test" */
public final static String EXTRA = ""; public final static String EXTRA = "";
public final static String FULL_VERSION = VERSION + "-" + BUILD + EXTRA; public final static String FULL_VERSION = VERSION + "-" + BUILD + EXTRA;

View File

@ -67,6 +67,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
ctx.statManager().createRateStat("tunnel.batchFragmentation", "Avg. number of fragments per msg", "Tunnels", new long[] { 10*60*1000, 60*60*1000 }); ctx.statManager().createRateStat("tunnel.batchFragmentation", "Avg. number of fragments per msg", "Tunnels", new long[] { 10*60*1000, 60*60*1000 });
} }
/** 1003 */
private static final int FULL_SIZE = PREPROCESSED_SIZE private static final int FULL_SIZE = PREPROCESSED_SIZE
- IV_SIZE - IV_SIZE
- 1 // 0x00 ending the padding - 1 // 0x00 ending the padding
@ -76,11 +77,28 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
/* not final or private so the test code can adjust */ /* not final or private so the test code can adjust */
static long DEFAULT_DELAY = 100; static long DEFAULT_DELAY = 100;
/** Wait up to this long before sending (flushing) a small tunnel message */ /**
* Wait up to this long before sending (flushing) a small tunnel message
* Warning - overridden in BatchedRouterPreprocessor
*/
protected long getSendDelay() { return DEFAULT_DELAY; } protected long getSendDelay() { return DEFAULT_DELAY; }
/** if we have 50 messages queued that are too small, flush them anyway */ /**
private static final int FORCE_BATCH_FLUSH = 50; * if we have this many messages queued that are too small, flush them anyway
* Even small messages take up about 200 bytes or so.
*/
private static final int FORCE_BATCH_FLUSH = 5;
/** If we have this much allocated, flush anyway.
* Tune this to trade off padding vs. fragmentation.
* The lower the value, the more we are willing to send off
* a tunnel msg that isn't full so the next message can start
* in a new tunnel msg to minimize fragmentation.
*
* This should be at most FULL_SIZE - (39 + a few), since
* you want to at least fit in the instructions and a few bytes.
*/
private static final int FULL_ENOUGH_SIZE = (FULL_SIZE * 80) / 100;
/** how long do we want to wait before flushing */ /** how long do we want to wait before flushing */
@Override @Override
@ -100,6 +118,8 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
/* See TunnelGateway.QueuePreprocessor for Javadoc */ /* See TunnelGateway.QueuePreprocessor for Javadoc */
@Override @Override
public boolean preprocessQueue(List<TunnelGateway.Pending> pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) { public boolean preprocessQueue(List<TunnelGateway.Pending> pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
if (_log.shouldLog(Log.INFO))
display(0, pending, "Starting");
StringBuilder timingBuf = null; StringBuilder timingBuf = null;
if (_log.shouldLog(Log.DEBUG)) { if (_log.shouldLog(Log.DEBUG)) {
_log.debug("Preprocess queue with " + pending.size() + " to send"); _log.debug("Preprocess queue with " + pending.size() + " to send");
@ -155,9 +175,10 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
_context.statManager().addRateData("tunnel.batchFullFragments", 1, 0); _context.statManager().addRateData("tunnel.batchFullFragments", 1, 0);
long afterSend = System.currentTimeMillis(); long afterSend = System.currentTimeMillis();
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Allocated=" + allocated + " so we sent " + (i+1) display(allocated, pending, "Sent the message with " + (i+1));
+ " (last complete? " + (msg.getOffset() >= msg.getData().length) //_log.info(_name + ": Allocated=" + allocated + "B, Sent " + (i+1)
+ ", off=" + msg.getOffset() + ", count=" + pending.size() + ")"); // + " msgs (last complete? " + (msg.getOffset() >= msg.getData().length)
// + ", off=" + msg.getOffset() + ", pending=" + pending.size() + ")");
// Remove what we sent from the pending queue // Remove what we sent from the pending queue
for (int j = 0; j < i; j++) { for (int j = 0; j < i; j++) {
@ -197,7 +218,6 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
timingBuf.append(" After pending loop " + (System.currentTimeMillis()-beforePendingLoop)).append("."); timingBuf.append(" After pending loop " + (System.currentTimeMillis()-beforePendingLoop)).append(".");
} // for } // for
long afterCleared = System.currentTimeMillis();
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
display(allocated, pending, "after looping to clear " + (beforeLooping - pending.size())); display(allocated, pending, "after looping to clear " + (beforeLooping - pending.size()));
long afterDisplayed = System.currentTimeMillis(); long afterDisplayed = System.currentTimeMillis();
@ -205,7 +225,12 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
// After going through the entire pending list, we have only a partial message. // After going through the entire pending list, we have only a partial message.
// We might flush it or might not, but we are returning either way. // We might flush it or might not, but we are returning either way.
if ( (pending.size() > FORCE_BATCH_FLUSH) || ( (_pendingSince > 0) && (getDelayAmount() <= 0) ) ) { if ( (pending.size() > FORCE_BATCH_FLUSH) || // enough msgs - or
( (_pendingSince > 0) && (getDelayAmount() <= 0) ) || // time to flush - or
(allocated >= FULL_ENOUGH_SIZE)) { // full enough
//(pending.get(0).getFragmentNumber() > 0)) { // don't delay anybody's last fragment,
// // which would be the first fragment in the message
// not even a full message, but we want to flush it anyway // not even a full message, but we want to flush it anyway
if (pending.size() > 1) if (pending.size() > 1)
@ -215,22 +240,24 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
send(pending, 0, pending.size()-1, sender, rec); send(pending, 0, pending.size()-1, sender, rec);
_context.statManager().addRateData("tunnel.batchSmallFragments", FULL_SIZE - allocated, 0); _context.statManager().addRateData("tunnel.batchSmallFragments", FULL_SIZE - allocated, 0);
// Remove everything in the message from the pending queue // Remove everything in the outgoing message from the pending queue
int beforeSize = pending.size(); int beforeSize = pending.size();
for (int i = 0; i < pending.size(); i++) { for (int i = 0; i < beforeSize; i++) {
TunnelGateway.Pending cur = pending.get(i); TunnelGateway.Pending cur = pending.get(0);
if (cur.getOffset() >= cur.getData().length) { if (cur.getOffset() < cur.getData().length)
pending.remove(i); break;
notePreprocessing(cur.getMessageId(), cur.getFragmentNumber(), cur.getData().length, cur.getMessageIds(), "flushed remaining"); pending.remove(0);
_context.statManager().addRateData("tunnel.batchFragmentation", cur.getFragmentNumber() + 1, 0); notePreprocessing(cur.getMessageId(), cur.getFragmentNumber(), cur.getData().length, cur.getMessageIds(), "flushed remaining");
_context.statManager().addRateData("tunnel.writeDelay", cur.getLifetime(), cur.getData().length); _context.statManager().addRateData("tunnel.batchFragmentation", cur.getFragmentNumber() + 1, 0);
i--; _context.statManager().addRateData("tunnel.writeDelay", cur.getLifetime(), cur.getData().length);
}
} }
if (pending.size() > 0) { if (pending.size() > 0) {
// rare
_pendingSince = _context.clock().now(); _pendingSince = _context.clock().now();
_context.statManager().addRateData("tunnel.batchFlushRemaining", pending.size(), beforeSize); _context.statManager().addRateData("tunnel.batchFlushRemaining", pending.size(), beforeSize);
display(allocated, pending, "flushed, some remain"); if (_log.shouldLog(Log.INFO))
display(allocated, pending, "flushed, some remain");
if (timingBuf != null) { if (timingBuf != null) {
timingBuf.append(" flushed, some remain (displayed to now: " + (System.currentTimeMillis()-afterDisplayed) + ")"); timingBuf.append(" flushed, some remain (displayed to now: " + (System.currentTimeMillis()-afterDisplayed) + ")");
@ -239,12 +266,15 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
} }
return true; return true;
} else { } else {
long delayAmount = _context.clock().now() - _pendingSince; long delayAmount = 0;
_pendingSince = 0; if (_pendingSince > 0) {
delayAmount = _context.clock().now() - _pendingSince;
_pendingSince = 0;
}
if (batchCount > 1) if (batchCount > 1)
_context.statManager().addRateData("tunnel.batchCount", batchCount, 0); _context.statManager().addRateData("tunnel.batchCount", batchCount, 0);
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
display(allocated, pending, "flushed " + (beforeSize) + ", no remaining after " + delayAmount); display(allocated, pending, "flushed " + (beforeSize) + ", no remaining after " + delayAmount + "ms");
if (timingBuf != null) { if (timingBuf != null) {
timingBuf.append(" flushed, none remain (displayed to now: " + (System.currentTimeMillis()-afterDisplayed) + ")"); timingBuf.append(" flushed, none remain (displayed to now: " + (System.currentTimeMillis()-afterDisplayed) + ")");
@ -262,7 +292,8 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
if (batchCount > 1) if (batchCount > 1)
_context.statManager().addRateData("tunnel.batchCount", batchCount, 0); _context.statManager().addRateData("tunnel.batchCount", batchCount, 0);
// not yet time to send the delayed flush // not yet time to send the delayed flush
display(allocated, pending, "dont flush"); if (_log.shouldLog(Log.INFO))
display(allocated, pending, "dont flush");
if (timingBuf != null) { if (timingBuf != null) {
timingBuf.append(" dont flush (displayed to now: " + (System.currentTimeMillis()-afterDisplayed) + ")"); timingBuf.append(" dont flush (displayed to now: " + (System.currentTimeMillis()-afterDisplayed) + ")");
@ -293,20 +324,25 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
return false; return false;
} }
/*
* Only if Log.INFO
*
* title: allocated: X pending: X (delay: X) [0]:offset/length/lifetime [1]:etc.
*/
private void display(long allocated, List<TunnelGateway.Pending> pending, String title) { private void display(long allocated, List<TunnelGateway.Pending> pending, String title) {
if (_log.shouldLog(Log.INFO)) { if (_log.shouldLog(Log.INFO)) {
long highestDelay = 0; long highestDelay = 0;
StringBuilder buf = new StringBuilder(); StringBuilder buf = new StringBuilder(128);
buf.append(_name).append(": "); buf.append(_name).append(": ");
buf.append(title); buf.append(title);
buf.append(" allocated: ").append(allocated); buf.append(" - allocated: ").append(allocated);
buf.append(" pending: ").append(pending.size()); buf.append(" pending: ").append(pending.size());
if (_pendingSince > 0) if (_pendingSince > 0)
buf.append(" delay: ").append(getDelayAmount(false)); buf.append(" delay: ").append(getDelayAmount(false));
for (int i = 0; i < pending.size(); i++) { for (int i = 0; i < pending.size(); i++) {
TunnelGateway.Pending curPending = pending.get(i); TunnelGateway.Pending curPending = pending.get(i);
buf.append(" pending[").append(i).append("]: "); buf.append(" [").append(i).append("]:");
buf.append(curPending.getOffset()).append("/").append(curPending.getData().length).append('/'); buf.append(curPending.getOffset()).append('/').append(curPending.getData().length).append('/');
buf.append(curPending.getLifetime()); buf.append(curPending.getLifetime());
if (curPending.getLifetime() > highestDelay) if (curPending.getLifetime() > highestDelay)
highestDelay = curPending.getLifetime(); highestDelay = curPending.getLifetime();

View File

@ -14,68 +14,96 @@ public class BatchedRouterPreprocessor extends BatchedPreprocessor {
protected RouterContext _routerContext; protected RouterContext _routerContext;
private TunnelCreatorConfig _config; private TunnelCreatorConfig _config;
protected HopConfig _hopConfig; protected HopConfig _hopConfig;
private final long _sendDelay;
/** /**
* How frequently should we flush non-full messages, in milliseconds * How frequently should we flush non-full messages, in milliseconds
* This goes in I2CP custom options for the pool.
* Only applies to OBGWs.
*/ */
public static final String PROP_BATCH_FREQUENCY = "batchFrequency"; public static final String PROP_BATCH_FREQUENCY = "batchFrequency";
/** This goes in router advanced config */
public static final String PROP_ROUTER_BATCH_FREQUENCY = "router.batchFrequency"; public static final String PROP_ROUTER_BATCH_FREQUENCY = "router.batchFrequency";
public static final int DEFAULT_BATCH_FREQUENCY = 100; /** for client OBGWs only (our data) */
public static final int OB_CLIENT_BATCH_FREQ = 100;
/** for exploratory OBGWs only (our tunnel tests and build messages) */
public static final int OB_EXPL_BATCH_FREQ = 150;
/** for IBGWs for efficiency (not our data) */
public static final int DEFAULT_BATCH_FREQUENCY = 250;
public BatchedRouterPreprocessor(RouterContext ctx) { /** for OBGWs */
this(ctx, (HopConfig)null);
}
public BatchedRouterPreprocessor(RouterContext ctx, TunnelCreatorConfig cfg) { public BatchedRouterPreprocessor(RouterContext ctx, TunnelCreatorConfig cfg) {
super(ctx, getName(cfg)); super(ctx, getName(cfg));
_routerContext = ctx; _routerContext = ctx;
_config = cfg; _config = cfg;
_sendDelay = initialSendDelay();
} }
/** for IBGWs */
public BatchedRouterPreprocessor(RouterContext ctx, HopConfig cfg) { public BatchedRouterPreprocessor(RouterContext ctx, HopConfig cfg) {
super(ctx, getName(cfg)); super(ctx, getName(cfg));
_routerContext = ctx; _routerContext = ctx;
_hopConfig = cfg; _hopConfig = cfg;
_sendDelay = initialSendDelay();
} }
private static String getName(HopConfig cfg) { private static String getName(HopConfig cfg) {
if (cfg == null) return "[unknown]"; if (cfg == null) return "IB??";
if (cfg.getReceiveTunnel() != null) if (cfg.getReceiveTunnel() != null)
return cfg.getReceiveTunnel().getTunnelId() + ""; return "IB " + cfg.getReceiveTunnel().getTunnelId();
else if (cfg.getSendTunnel() != null) else if (cfg.getSendTunnel() != null)
return cfg.getSendTunnel().getTunnelId() + ""; return "IB " + cfg.getSendTunnel().getTunnelId();
else else
return "[n/a]"; return "IB??";
} }
private static String getName(TunnelCreatorConfig cfg) { private static String getName(TunnelCreatorConfig cfg) {
if (cfg == null) return "[unknown]"; if (cfg == null) return "OB??";
if (cfg.getReceiveTunnelId(0) != null) if (cfg.getReceiveTunnelId(0) != null)
return cfg.getReceiveTunnelId(0).getTunnelId() + ""; return "OB " + cfg.getReceiveTunnelId(0).getTunnelId();
else if (cfg.getSendTunnelId(0) != null) else if (cfg.getSendTunnelId(0) != null)
return cfg.getSendTunnelId(0).getTunnelId() + ""; return "OB " + cfg.getSendTunnelId(0).getTunnelId();
else else
return "[n/a]"; return "OB??";
} }
/** how long should we wait before flushing */ /**
* how long should we wait before flushing
*/
@Override @Override
protected long getSendDelay() { protected long getSendDelay() { return _sendDelay; }
String freq = null;
/*
* Extend the batching time for exploratory OBGWs, they have a lot of small
* tunnel test messages, and build messages that don't fit perfectly.
* And these are not as delay-sensitive.
*
* We won't pick up config changes after the preprocessor is created,
* but a preprocessor lifetime is only 10 minutes, so just wait...
*/
private long initialSendDelay() {
if (_config != null) { if (_config != null) {
Properties opts = _config.getOptions(); Properties opts = _config.getOptions();
if (opts != null) if (opts != null) {
freq = opts.getProperty(PROP_BATCH_FREQUENCY); String freq = opts.getProperty(PROP_BATCH_FREQUENCY);
} if (freq != null) {
if (freq == null) try {
freq = _routerContext.getProperty(PROP_ROUTER_BATCH_FREQUENCY); return Integer.parseInt(freq);
} catch (NumberFormatException nfe) {}
if (freq != null) { }
try {
return Integer.parseInt(freq);
} catch (NumberFormatException nfe) {
return DEFAULT_BATCH_FREQUENCY;
} }
} }
return DEFAULT_BATCH_FREQUENCY;
int def;
if (_config != null) {
if (_config.getDestination() != null)
def = OB_CLIENT_BATCH_FREQ;
else
def = OB_EXPL_BATCH_FREQ;
} else {
def = DEFAULT_BATCH_FREQUENCY;
}
return _routerContext.getProperty(PROP_ROUTER_BATCH_FREQUENCY, def);
} }
@Override @Override

View File

@ -39,29 +39,32 @@ data into the raw tunnel payload:</p>
<li>a series of zero or more { instructions, message } pairs</li> <li>a series of zero or more { instructions, message } pairs</li>
</ul> </ul>
<p>Note that the padding, if any, must be before the instruction/message pairs.
there is no provision for padding at the end.</p>
<p>The instructions are encoded with a single control byte, followed by any <p>The instructions are encoded with a single control byte, followed by any
necessary additional information. The first bit in that control byte determines necessary additional information. The first bit in that control byte determines
how the remainder of the header is interpreted - if it is not set, the message how the remainder of the header is interpreted - if it is not set, the message
is either not fragmented or this is the first fragment in the message. If it is is either not fragmented or this is the first fragment in the message. If it is
set, this is a follow on fragment.</p> set, this is a follow on fragment.</p>
<p>With the first bit being 0, the instructions are:</p> <p>With the first (leftmost or MSB) bit being 0, the instructions are:</p>
<ul> <ul>
<li>1 byte control byte:<pre> <li>1 byte control byte:<pre>
bit 0: is follow on fragment? (1 = true, 0 = false, must be 0) bit 0: is follow on fragment? (1 = true, 0 = false, must be 0)
bits 1-2: delivery type bits 1-2: delivery type
(0x0 = LOCAL, 0x01 = TUNNEL, 0x02 = ROUTER) (0x0 = LOCAL, 0x01 = TUNNEL, 0x02 = ROUTER)
bit 3: delay included? (1 = true, 0 = false) bit 3: delay included? (1 = true, 0 = false) (unimplemented)
bit 4: fragmented? (1 = true, 0 = false) bit 4: fragmented? (1 = true, 0 = false)
bit 5: extended options? (1 = true, 0 = false) bit 5: extended options? (1 = true, 0 = false) (unimplemented)
bits 6-7: reserved</pre></li> bits 6-7: reserved</pre></li>
<li>if the delivery type was TUNNEL, a 4 byte tunnel ID</li> <li>if the delivery type was TUNNEL, a 4 byte tunnel ID</li>
<li>if the delivery type was TUNNEL or ROUTER, a 32 byte router hash</li> <li>if the delivery type was TUNNEL or ROUTER, a 32 byte router hash</li>
<li>if the delay included flag is true, a 1 byte value:<pre> <li>if the delay included flag is true, a 1 byte value (unimplemented):<pre>
bit 0: type (0 = strict, 1 = randomized) bit 0: type (0 = strict, 1 = randomized)
bits 1-7: delay exponent (2^value minutes)</pre></li> bits 1-7: delay exponent (2^value minutes)</pre></li>
<li>if the fragmented flag is true, a 4 byte message ID</li> <li>if the fragmented flag is true, a 4 byte message ID</li>
<li>if the extended options flag is true:<pre> <li>if the extended options flag is true (unimplemented):<pre>
= a 1 byte option size (in bytes) = a 1 byte option size (in bytes)
= that many bytes</pre></li> = that many bytes</pre></li>
<li>2 byte size of the I2NP message or this fragment</li> <li>2 byte size of the I2NP message or this fragment</li>
@ -85,10 +88,11 @@ preprocessed payload must be padded to a multiple of 16 bytes.</p>
public class FragmentHandler { public class FragmentHandler {
private I2PAppContext _context; private I2PAppContext _context;
private Log _log; private Log _log;
private final Map _fragmentedMessages; private final Map<Long, FragmentedMessage> _fragmentedMessages;
private DefragmentedReceiver _receiver; private DefragmentedReceiver _receiver;
private int _completed; private int _completed;
private int _failed; private int _failed;
private static final long[] RATES = { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000 };
/** don't wait more than 60s to defragment the partial message */ /** don't wait more than 60s to defragment the partial message */
static long MAX_DEFRAGMENT_TIME = 60*1000; static long MAX_DEFRAGMENT_TIME = 60*1000;
@ -97,18 +101,18 @@ public class FragmentHandler {
public FragmentHandler(I2PAppContext context, DefragmentedReceiver receiver) { public FragmentHandler(I2PAppContext context, DefragmentedReceiver receiver) {
_context = context; _context = context;
_log = context.logManager().getLog(FragmentHandler.class); _log = context.logManager().getLog(FragmentHandler.class);
_fragmentedMessages = new HashMap(4); _fragmentedMessages = new HashMap(8);
_receiver = receiver; _receiver = receiver;
_context.statManager().createRateStat("tunnel.smallFragments", "How many pad bytes are in small fragments?", _context.statManager().createRateStat("tunnel.smallFragments", "How many pad bytes are in small fragments?",
"Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000 }); "Tunnels", RATES);
_context.statManager().createRateStat("tunnel.fullFragments", "How many tunnel messages use the full data area?", _context.statManager().createRateStat("tunnel.fullFragments", "How many tunnel messages use the full data area?",
"Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000 }); "Tunnels", RATES);
_context.statManager().createRateStat("tunnel.fragmentedComplete", "How many fragments were in a completely received message?", _context.statManager().createRateStat("tunnel.fragmentedComplete", "How many fragments were in a completely received message?",
"Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000 }); "Tunnels", RATES);
_context.statManager().createRateStat("tunnel.fragmentedDropped", "How many fragments were in a partially received yet failed message?", _context.statManager().createRateStat("tunnel.fragmentedDropped", "How many fragments were in a partially received yet failed message?",
"Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000 }); "Tunnels", RATES);
_context.statManager().createRateStat("tunnel.corruptMessage", "How many corrupted messages arrived?", _context.statManager().createRateStat("tunnel.corruptMessage", "How many corrupted messages arrived?",
"Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000 }); "Tunnels", RATES);
} }
/** /**
@ -233,7 +237,7 @@ public class FragmentHandler {
+ " != " + Base64.encode(v.getData(), 0, 4)); + " != " + Base64.encode(v.getData(), 0, 4));
_log.warn("No matching endpoint: # pad bytes: " + (paddingEnd-(HopProcessor.IV_LENGTH+4)-1) _log.warn("No matching endpoint: # pad bytes: " + (paddingEnd-(HopProcessor.IV_LENGTH+4)-1)
+ " offset=" + offset + " length=" + length + " paddingEnd=" + paddingEnd + ' ' + " offset=" + offset + " length=" + length + " paddingEnd=" + paddingEnd + ' '
+ Base64.encode(preprocessed, offset, length)); + Base64.encode(preprocessed, offset, length), new Exception("trace"));
} }
} }
@ -254,11 +258,12 @@ public class FragmentHandler {
static final byte MASK_TYPE = (byte)(3 << 5); static final byte MASK_TYPE = (byte)(3 << 5);
/** is this the first of a fragmented message? */ /** is this the first of a fragmented message? */
static final byte MASK_FRAGMENTED = (byte)(1 << 3); static final byte MASK_FRAGMENTED = (byte)(1 << 3);
/** are there follow up headers? */ /** are there follow up headers? UNIMPLEMENTED */
static final byte MASK_EXTENDED = (byte)(1 << 2); static final byte MASK_EXTENDED = (byte)(1 << 2);
/** for subsequent fragments, which bits contain the fragment #? */ /** for subsequent fragments, which bits contain the fragment #? */
private static final int MASK_FRAGMENT_NUM = (byte)((1 << 7) - 2); // 0x7E; private static final int MASK_FRAGMENT_NUM = (byte)((1 << 7) - 2); // 0x7E;
/** LOCAL isn't explicitly used anywhere, because the code knows that it is 0 */
static final short TYPE_LOCAL = 0; static final short TYPE_LOCAL = 0;
static final short TYPE_TUNNEL = 1; static final short TYPE_TUNNEL = 1;
static final short TYPE_ROUTER = 2; static final short TYPE_ROUTER = 2;
@ -268,8 +273,8 @@ public class FragmentHandler {
*/ */
private int receiveFragment(byte preprocessed[], int offset, int length) { private int receiveFragment(byte preprocessed[], int offset, int length) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("CONTROL: " + Integer.toHexString(preprocessed[offset]) + " / " _log.debug("CONTROL: 0x" + Integer.toHexString(preprocessed[offset] & 0xff) +
+ "/" + Base64.encode(preprocessed, offset, 1) + " at offset " + offset); " at offset " + offset);
if (0 == (preprocessed[offset] & MASK_IS_SUBSEQUENT)) if (0 == (preprocessed[offset] & MASK_IS_SUBSEQUENT))
return receiveInitialFragment(preprocessed, offset, length); return receiveInitialFragment(preprocessed, offset, length);
else else
@ -330,42 +335,48 @@ public class FragmentHandler {
int size = (int)DataHelper.fromLong(preprocessed, offset, 2); int size = (int)DataHelper.fromLong(preprocessed, offset, 2);
offset += 2; offset += 2;
boolean isNew = false;
FragmentedMessage msg = null; FragmentedMessage msg = null;
if (fragmented) { if (fragmented) {
synchronized (_fragmentedMessages) { synchronized (_fragmentedMessages) {
msg = (FragmentedMessage)_fragmentedMessages.get(new Long(messageId)); msg = _fragmentedMessages.get(new Long(messageId));
if (msg == null) { if (msg == null) {
msg = new FragmentedMessage(_context); msg = new FragmentedMessage(_context);
_fragmentedMessages.put(new Long(messageId), msg); _fragmentedMessages.put(new Long(messageId), msg);
isNew = true;
} }
} }
} else { } else {
msg = new FragmentedMessage(_context); msg = new FragmentedMessage(_context);
} }
boolean ok = msg.receive(messageId, preprocessed, offset, size, !fragmented, router, tunnelId); if (fragmented) {
if (!ok) return -1; // synchronized is required, fragments may be arriving in different threads
if (msg.isComplete()) { synchronized(msg) {
if (fragmented) { boolean ok = msg.receive(messageId, preprocessed, offset, size, false, router, tunnelId);
synchronized (_fragmentedMessages) { if (!ok) return -1;
_fragmentedMessages.remove(new Long(messageId)); if (msg.isComplete()) {
synchronized (_fragmentedMessages) {
_fragmentedMessages.remove(new Long(messageId));
}
if (msg.getExpireEvent() != null)
SimpleTimer.getInstance().removeEvent(msg.getExpireEvent());
receiveComplete(msg);
} else {
noteReception(msg.getMessageId(), 0, msg);
if (msg.getExpireEvent() == null) {
RemoveFailed evt = new RemoveFailed(msg);
msg.setExpireEvent(evt);
if (_log.shouldLog(Log.DEBUG))
_log.debug("In " + MAX_DEFRAGMENT_TIME + " dropping " + messageId);
SimpleTimer.getInstance().addEvent(evt, MAX_DEFRAGMENT_TIME);
}
} }
} }
if (msg.getExpireEvent() != null) } else {
SimpleTimer.getInstance().removeEvent(msg.getExpireEvent()); // synchronized not required if !fragmented
boolean ok = msg.receive(messageId, preprocessed, offset, size, true, router, tunnelId);
if (!ok) return -1;
// always complete, never an expire event
receiveComplete(msg); receiveComplete(msg);
} else {
noteReception(msg.getMessageId(), 0, msg);
}
if (isNew && fragmented && !msg.isComplete()) {
RemoveFailed evt = new RemoveFailed(msg);
msg.setExpireEvent(evt);
if (_log.shouldLog(Log.DEBUG))
_log.debug("In " + MAX_DEFRAGMENT_TIME + " dropping " + messageId);
SimpleTimer.getInstance().addEvent(evt, MAX_DEFRAGMENT_TIME);
} }
offset += size; offset += size;
@ -397,38 +408,38 @@ public class FragmentHandler {
throw new RuntimeException("Preprocessed message was invalid [messageId =" + messageId + " size=" throw new RuntimeException("Preprocessed message was invalid [messageId =" + messageId + " size="
+ size + " offset=" + offset + " fragment=" + fragmentNum); + size + " offset=" + offset + " fragment=" + fragmentNum);
boolean isNew = false;
FragmentedMessage msg = null; FragmentedMessage msg = null;
synchronized (_fragmentedMessages) { synchronized (_fragmentedMessages) {
msg = (FragmentedMessage)_fragmentedMessages.get(new Long(messageId)); msg = _fragmentedMessages.get(new Long(messageId));
if (msg == null) { if (msg == null) {
msg = new FragmentedMessage(_context); msg = new FragmentedMessage(_context);
_fragmentedMessages.put(new Long(messageId), msg); _fragmentedMessages.put(new Long(messageId), msg);
isNew = true;
} }
} }
boolean ok = msg.receive(messageId, fragmentNum, preprocessed, offset, size, isLast); // synchronized is required, fragments may be arriving in different threads
if (!ok) return -1; synchronized(msg) {
boolean ok = msg.receive(messageId, fragmentNum, preprocessed, offset, size, isLast);
if (msg.isComplete()) { if (!ok) return -1;
synchronized (_fragmentedMessages) {
_fragmentedMessages.remove(new Long(messageId)); if (msg.isComplete()) {
synchronized (_fragmentedMessages) {
_fragmentedMessages.remove(new Long(messageId));
}
if (msg.getExpireEvent() != null)
SimpleTimer.getInstance().removeEvent(msg.getExpireEvent());
_context.statManager().addRateData("tunnel.fragmentedComplete", msg.getFragmentCount(), msg.getLifetime());
receiveComplete(msg);
} else {
noteReception(msg.getMessageId(), fragmentNum, msg);
if (msg.getExpireEvent() == null) {
RemoveFailed evt = new RemoveFailed(msg);
msg.setExpireEvent(evt);
if (_log.shouldLog(Log.DEBUG))
_log.debug("In " + MAX_DEFRAGMENT_TIME + " dropping " + msg.getMessageId() + "/" + fragmentNum);
SimpleTimer.getInstance().addEvent(evt, MAX_DEFRAGMENT_TIME);
}
} }
if (msg.getExpireEvent() != null)
SimpleTimer.getInstance().removeEvent(msg.getExpireEvent());
_context.statManager().addRateData("tunnel.fragmentedComplete", msg.getFragmentCount(), msg.getLifetime());
receiveComplete(msg);
} else {
noteReception(msg.getMessageId(), fragmentNum, msg);
}
if (isNew && !msg.isComplete()) {
RemoveFailed evt = new RemoveFailed(msg);
msg.setExpireEvent(evt);
if (_log.shouldLog(Log.DEBUG))
_log.debug("In " + MAX_DEFRAGMENT_TIME + " dropping " + msg.getMessageId() + "/" + fragmentNum);
SimpleTimer.getInstance().addEvent(evt, MAX_DEFRAGMENT_TIME);
} }
offset += size; offset += size;
@ -449,8 +460,8 @@ public class FragmentHandler {
if (data == null) if (data == null)
throw new I2NPMessageException("null data"); // fragments already released??? throw new I2NPMessageException("null data"); // fragments already released???
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("RECV(" + data.length + "): " + Base64.encode(data) _log.debug("RECV(" + data.length + "): "); // + Base64.encode(data)
+ " " + _context.sha().calculateHash(data).toBase64()); //+ " " + _context.sha().calculateHash(data).toBase64());
I2NPMessage m = new I2NPMessageHandler(_context).readMessage(data); I2NPMessage m = new I2NPMessageHandler(_context).readMessage(data);
noteReception(m.getUniqueId(), fragmentCount-1, "complete: ");// + msg.toString()); noteReception(m.getUniqueId(), fragmentCount-1, "complete: ");// + msg.toString());
noteCompletion(m.getUniqueId()); noteCompletion(m.getUniqueId());
@ -498,15 +509,17 @@ public class FragmentHandler {
synchronized (_fragmentedMessages) { synchronized (_fragmentedMessages) {
removed = (null != _fragmentedMessages.remove(new Long(_msg.getMessageId()))); removed = (null != _fragmentedMessages.remove(new Long(_msg.getMessageId())));
} }
if (removed && !_msg.getReleased()) { synchronized (_msg) {
_failed++; if (removed && !_msg.getReleased()) {
noteFailure(_msg.getMessageId(), _msg.toString()); _failed++;
if (_log.shouldLog(Log.WARN)) noteFailure(_msg.getMessageId(), _msg.toString());
_log.warn("Dropped failed fragmented message: " + _msg); if (_log.shouldLog(Log.WARN))
_context.statManager().addRateData("tunnel.fragmentedDropped", _msg.getFragmentCount(), _msg.getLifetime()); _log.warn("Dropped failed fragmented message: " + _msg);
_msg.failed(); _context.statManager().addRateData("tunnel.fragmentedDropped", _msg.getFragmentCount(), _msg.getLifetime());
} else { _msg.failed();
// succeeded before timeout } else {
// succeeded before timeout
}
} }
} }

View File

@ -21,6 +21,8 @@ import net.i2p.util.SimpleTimer;
* Gather fragments of I2NPMessages at a tunnel endpoint, making them available * Gather fragments of I2NPMessages at a tunnel endpoint, making them available
* for reading when complete. * for reading when complete.
* *
* Warning - this is all unsynchronized here - receivers must implement synchronization
*
*/ */
public class FragmentedMessage { public class FragmentedMessage {
private I2PAppContext _context; private I2PAppContext _context;
@ -282,7 +284,7 @@ public class FragmentedMessage {
if (ba != null) if (ba != null)
buf.append(i).append(":").append(ba.getValid()).append(" bytes "); buf.append(i).append(":").append(ba.getValid()).append(" bytes ");
else else
buf.append(i).append(": missing "); buf.append(i).append(":missing ");
} }
buf.append(" highest received: ").append(_highFragmentNum); buf.append(" highest received: ").append(_highFragmentNum);
buf.append(" last received? ").append(_lastReceived); buf.append(" last received? ").append(_lastReceived);

View File

@ -35,8 +35,9 @@ public class InboundGatewayReceiver implements TunnelGateway.Receiver {
} }
} }
if (_context.tunnelDispatcher().shouldDropParticipatingMessage("IBGW", encrypted.length)) // We do this before the preprocessor now (i.e. before fragmentation)
return -1; //if (_context.tunnelDispatcher().shouldDropParticipatingMessage("IBGW", encrypted.length))
// return -1;
_config.incrementSentMessages(); _config.incrementSentMessages();
TunnelDataMessage msg = new TunnelDataMessage(_context); TunnelDataMessage msg = new TunnelDataMessage(_context);
msg.setData(encrypted); msg.setData(encrypted);

View File

@ -1,7 +1,6 @@
package net.i2p.router.tunnel; package net.i2p.router.tunnel;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
import net.i2p.util.Log;
/** /**
* Receive the preprocessed data for an inbound gateway, encrypt it, and forward * Receive the preprocessed data for an inbound gateway, encrypt it, and forward
@ -9,16 +8,12 @@ import net.i2p.util.Log;
* *
*/ */
public class InboundSender implements TunnelGateway.Sender { public class InboundSender implements TunnelGateway.Sender {
private I2PAppContext _context;
private Log _log;
private InboundGatewayProcessor _processor; private InboundGatewayProcessor _processor;
static final boolean USE_ENCRYPTION = HopProcessor.USE_ENCRYPTION; static final boolean USE_ENCRYPTION = HopProcessor.USE_ENCRYPTION;
public InboundSender(I2PAppContext ctx, HopConfig config) { public InboundSender(I2PAppContext ctx, HopConfig config) {
_context = ctx; _processor = new InboundGatewayProcessor(ctx, config);
_log = ctx.logManager().getLog(InboundSender.class);
_processor = new InboundGatewayProcessor(_context, config);
} }
public long sendPreprocessed(byte[] preprocessed, TunnelGateway.Receiver receiver) { public long sendPreprocessed(byte[] preprocessed, TunnelGateway.Receiver receiver) {

View File

@ -3,14 +3,16 @@ package net.i2p.router.tunnel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import net.i2p.I2PAppContext;
import net.i2p.data.Hash; import net.i2p.data.Hash;
import net.i2p.data.TunnelId; import net.i2p.data.TunnelId;
import net.i2p.data.i2np.I2NPMessage; import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.Router; import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.util.Log; import net.i2p.util.Log;
/** /**
* This is used for all gateways with more than zero hops.
*
* Serve as the gatekeeper for a tunnel, accepting messages, coallescing and/or * Serve as the gatekeeper for a tunnel, accepting messages, coallescing and/or
* fragmenting them before wrapping them up for tunnel delivery. The flow here * fragmenting them before wrapping them up for tunnel delivery. The flow here
* is: <ol> * is: <ol>
@ -32,7 +34,7 @@ import net.i2p.util.Log;
* *
*/ */
public class PumpedTunnelGateway extends TunnelGateway { public class PumpedTunnelGateway extends TunnelGateway {
private final List _prequeue; private final List<Pending> _prequeue;
private TunnelGatewayPumper _pumper; private TunnelGatewayPumper _pumper;
/** /**
@ -43,7 +45,7 @@ public class PumpedTunnelGateway extends TunnelGateway {
* @param receiver this receives the encrypted message and forwards it off * @param receiver this receives the encrypted message and forwards it off
* to the first hop * to the first hop
*/ */
public PumpedTunnelGateway(I2PAppContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver, TunnelGatewayPumper pumper) { public PumpedTunnelGateway(RouterContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver, TunnelGatewayPumper pumper) {
super(context, preprocessor, sender, receiver); super(context, preprocessor, sender, receiver);
_prequeue = new ArrayList(4); _prequeue = new ArrayList(4);
_pumper = pumper; _pumper = pumper;
@ -78,7 +80,7 @@ public class PumpedTunnelGateway extends TunnelGateway {
* go quickly, rather than blocking its callers on potentially substantial * go quickly, rather than blocking its callers on potentially substantial
* processing. * processing.
*/ */
void pump(List queueBuf) { void pump(List<Pending> queueBuf) {
synchronized (_prequeue) { synchronized (_prequeue) {
if (_prequeue.size() > 0) { if (_prequeue.size() > 0) {
queueBuf.addAll(_prequeue); queueBuf.addAll(_prequeue);
@ -88,7 +90,7 @@ public class PumpedTunnelGateway extends TunnelGateway {
} }
} }
long startAdd = System.currentTimeMillis(); long startAdd = System.currentTimeMillis();
long beforeLock = System.currentTimeMillis(); long beforeLock = startAdd;
long afterAdded = -1; long afterAdded = -1;
boolean delayedFlush = false; boolean delayedFlush = false;
long delayAmount = -1; long delayAmount = -1;
@ -108,7 +110,7 @@ public class PumpedTunnelGateway extends TunnelGateway {
// expire any as necessary, even if its framented // expire any as necessary, even if its framented
for (int i = 0; i < _queue.size(); i++) { for (int i = 0; i < _queue.size(); i++) {
Pending m = (Pending)_queue.get(i); Pending m = _queue.get(i);
if (m.getExpiration() + Router.CLOCK_FUDGE_FACTOR < _lastFlush) { if (m.getExpiration() + Router.CLOCK_FUDGE_FACTOR < _lastFlush) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Expire on the queue (size=" + _queue.size() + "): " + m); _log.debug("Expire on the queue (size=" + _queue.size() + "): " + m);

View File

@ -0,0 +1,49 @@
package net.i2p.router.tunnel;
import java.util.ArrayList;
import java.util.List;
import net.i2p.data.Hash;
import net.i2p.data.TunnelId;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.RouterContext;
/**
* Same as PTG, but check to see if a message should be dropped before queueing it.
* Used for IBGWs.
*
*/
public class ThrottledPumpedTunnelGateway extends PumpedTunnelGateway {
/** saved so we can note messages that get dropped */
private HopConfig _config;
public ThrottledPumpedTunnelGateway(RouterContext context, QueuePreprocessor preprocessor, Sender sender,
Receiver receiver, TunnelGatewayPumper pumper, HopConfig config) {
super(context, preprocessor, sender, receiver, pumper);
_config = config;
}
/**
* Possibly drop a message due to bandwidth before adding it to the preprocessor queue.
* We do this here instead of in the InboundGatewayReceiver because it is much smarter to drop
* whole I2NP messages, where we know the message type and length, rather than
* tunnel messages containing I2NP fragments.
*/
@Override
public void add(I2NPMessage msg, Hash toRouter, TunnelId toTunnel) {
//_log.error("IBGW count: " + _config.getProcessedMessagesCount() + " type: " + msg.getType() + " size: " + msg.getMessageSize());
// Hard to do this exactly, but we'll assume 2:1 batching
// for the purpose of estimating outgoing size.
// We assume that it's the outbound bandwidth that is the issue...
int size = Math.max(msg.getMessageSize(), 1024/2);
if (_context.tunnelDispatcher().shouldDropParticipatingMessage("IBGW " + msg.getType(), size)) {
// this overstates the stat somewhat, but ok for now
int kb = (size + 1023) / 1024;
for (int i = 0; i < kb; i++)
_config.incrementProcessedMessages();
return;
}
super.add(msg, toRouter,toTunnel);
}
}

View File

@ -17,6 +17,7 @@ import net.i2p.util.Log;
* each of those out. This does not coallesce message fragments or delay for more * each of those out. This does not coallesce message fragments or delay for more
* optimal throughput. * optimal throughput.
* *
* See FragmentHandler Javadoc for tunnel message fragment format
*/ */
public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor { public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
protected RouterContext _context; protected RouterContext _context;
@ -175,6 +176,8 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
//_log.debug("# pad bytes: " + numPadBytes + " payloadLength: " + payloadLength + " instructions: " + instructionsLength); //_log.debug("# pad bytes: " + numPadBytes + " payloadLength: " + payloadLength + " instructions: " + instructionsLength);
int paddingRemaining = numPadBytes; int paddingRemaining = numPadBytes;
// FIXME inefficient, waste of 3/4 of the entropy
// Should get a byte array of random, change all the zeros to something else, and ArrayCopy
while (paddingRemaining > 0) { while (paddingRemaining > 0) {
byte b = (byte)(_context.random().nextInt() & 0xFF); byte b = (byte)(_context.random().nextInt() & 0xFF);
if (b != 0x00) { if (b != 0x00) {
@ -196,7 +199,11 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
private static final byte MASK_TYPE = FragmentHandler.MASK_TYPE; private static final byte MASK_TYPE = FragmentHandler.MASK_TYPE;
/** is this the first of a fragmented message? */ /** is this the first of a fragmented message? */
private static final byte MASK_FRAGMENTED = FragmentHandler.MASK_FRAGMENTED; private static final byte MASK_FRAGMENTED = FragmentHandler.MASK_FRAGMENTED;
/** are there follow up headers? */
/**
* are there follow up headers?
* @deprecated unimplemented
*/
private static final byte MASK_EXTENDED = FragmentHandler.MASK_EXTENDED; private static final byte MASK_EXTENDED = FragmentHandler.MASK_EXTENDED;
private static final byte MASK_TUNNEL = (byte)(FragmentHandler.TYPE_TUNNEL << 5); private static final byte MASK_TUNNEL = (byte)(FragmentHandler.TYPE_TUNNEL << 5);
private static final byte MASK_ROUTER = (byte)(FragmentHandler.TYPE_ROUTER << 5); private static final byte MASK_ROUTER = (byte)(FragmentHandler.TYPE_ROUTER << 5);
@ -311,19 +318,30 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
return offset; return offset;
} }
/**
* @return generally 3 or 35 or 39 for first fragment, 7 for subsequent fragments.
*
* Does NOT include 4 for the message ID if the message will be fragmented;
* call getInstructionAugmentationSize() for that.
*/
protected int getInstructionsSize(TunnelGateway.Pending msg) { protected int getInstructionsSize(TunnelGateway.Pending msg) {
if (msg.getFragmentNumber() > 0) if (msg.getFragmentNumber() > 0)
return 7; return 7;
// control byte
int header = 1; int header = 1;
// tunnel ID
if (msg.getToTunnel() != null) if (msg.getToTunnel() != null)
header += 4; header += 4;
// router hash
if (msg.getToRouter() != null) if (msg.getToRouter() != null)
header += 32; header += 32;
// size
header += 2; header += 2;
return header; return header;
} }
/** @return 0 or 4 */
protected int getInstructionAugmentationSize(TunnelGateway.Pending msg, int offset, int instructionsSize) { protected int getInstructionAugmentationSize(TunnelGateway.Pending msg, int offset, int instructionsSize) {
int payloadLength = msg.getData().length - msg.getOffset(); int payloadLength = msg.getData().length - msg.getOffset();
if (offset + payloadLength + instructionsSize + IV_SIZE + 1 + 4 > PREPROCESSED_SIZE) { if (offset + payloadLength + instructionsSize + IV_SIZE + 1 + 4 > PREPROCESSED_SIZE) {

View File

@ -240,7 +240,7 @@ public class TunnelDispatcher implements Service {
TunnelGateway.Sender sender = new InboundSender(_context, cfg); TunnelGateway.Sender sender = new InboundSender(_context, cfg);
TunnelGateway.Receiver receiver = new InboundGatewayReceiver(_context, cfg); TunnelGateway.Receiver receiver = new InboundGatewayReceiver(_context, cfg);
//TunnelGateway gw = new TunnelGateway(_context, preproc, sender, receiver); //TunnelGateway gw = new TunnelGateway(_context, preproc, sender, receiver);
TunnelGateway gw = new PumpedTunnelGateway(_context, preproc, sender, receiver, _pumper); TunnelGateway gw = new ThrottledPumpedTunnelGateway(_context, preproc, sender, receiver, _pumper, cfg);
TunnelId recvId = cfg.getReceiveTunnel(); TunnelId recvId = cfg.getReceiveTunnel();
_inboundGateways.put(recvId, gw); _inboundGateways.put(recvId, gw);
_participatingConfig.put(recvId, cfg); _participatingConfig.put(recvId, cfg);

View File

@ -3,12 +3,12 @@ package net.i2p.router.tunnel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import net.i2p.I2PAppContext;
import net.i2p.data.Hash; import net.i2p.data.Hash;
import net.i2p.data.TunnelId; import net.i2p.data.TunnelId;
import net.i2p.data.i2np.I2NPMessage; import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.i2np.TunnelGatewayMessage; import net.i2p.data.i2np.TunnelGatewayMessage;
import net.i2p.router.Router; import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.util.SimpleTimer; import net.i2p.util.SimpleTimer;
@ -34,9 +34,9 @@ import net.i2p.util.SimpleTimer;
* *
*/ */
public class TunnelGateway { public class TunnelGateway {
protected I2PAppContext _context; protected RouterContext _context;
protected Log _log; protected Log _log;
protected final List _queue; protected final List<Pending> _queue;
protected QueuePreprocessor _preprocessor; protected QueuePreprocessor _preprocessor;
protected Sender _sender; protected Sender _sender;
protected Receiver _receiver; protected Receiver _receiver;
@ -53,7 +53,7 @@ public class TunnelGateway {
* @param receiver this receives the encrypted message and forwards it off * @param receiver this receives the encrypted message and forwards it off
* to the first hop * to the first hop
*/ */
public TunnelGateway(I2PAppContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver) { public TunnelGateway(RouterContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver) {
_context = context; _context = context;
_log = context.logManager().getLog(getClass()); _log = context.logManager().getLog(getClass());
_queue = new ArrayList(4); _queue = new ArrayList(4);
@ -110,7 +110,7 @@ public class TunnelGateway {
// expire any as necessary, even if its framented // expire any as necessary, even if its framented
for (int i = 0; i < _queue.size(); i++) { for (int i = 0; i < _queue.size(); i++) {
Pending m = (Pending)_queue.get(i); Pending m = _queue.get(i);
if (m.getExpiration() + Router.CLOCK_FUDGE_FACTOR < _lastFlush) { if (m.getExpiration() + Router.CLOCK_FUDGE_FACTOR < _lastFlush) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Expire on the queue (size=" + _queue.size() + "): " + m); _log.debug("Expire on the queue (size=" + _queue.size() + "): " + m);
@ -280,11 +280,11 @@ public class TunnelGateway {
long beforeLock = _context.clock().now(); long beforeLock = _context.clock().now();
long afterChecked = -1; long afterChecked = -1;
long delayAmount = -1; long delayAmount = -1;
if (_queue.size() > 10000) // stay out of the synchronized block //if (_queue.size() > 10000) // stay out of the synchronized block
System.out.println("foo!"); // System.out.println("foo!");
synchronized (_queue) { synchronized (_queue) {
if (_queue.size() > 10000) // stay in the synchronized block //if (_queue.size() > 10000) // stay in the synchronized block
System.out.println("foo!"); // System.out.println("foo!");
afterChecked = _context.clock().now(); afterChecked = _context.clock().now();
if (_queue.size() > 0) { if (_queue.size() > 0) {
if ( (remaining > 0) && (_log.shouldLog(Log.DEBUG)) ) if ( (remaining > 0) && (_log.shouldLog(Log.DEBUG)) )

View File

@ -5,7 +5,6 @@ import java.util.List;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.util.I2PThread; import net.i2p.util.I2PThread;
import net.i2p.util.Log;
/** /**
* run through the tunnel gateways that have had messages added to them and push * run through the tunnel gateways that have had messages added to them and push
@ -13,14 +12,12 @@ import net.i2p.util.Log;
*/ */
public class TunnelGatewayPumper implements Runnable { public class TunnelGatewayPumper implements Runnable {
private RouterContext _context; private RouterContext _context;
private Log _log; private final List<PumpedTunnelGateway> _wantsPumping;
private final List _wantsPumping;
private boolean _stop; private boolean _stop;
/** Creates a new instance of TunnelGatewayPumper */ /** Creates a new instance of TunnelGatewayPumper */
public TunnelGatewayPumper(RouterContext ctx) { public TunnelGatewayPumper(RouterContext ctx) {
_context = ctx; _context = ctx;
_log = ctx.logManager().getLog(getClass());
_wantsPumping = new ArrayList(64); _wantsPumping = new ArrayList(64);
_stop = false; _stop = false;
for (int i = 0; i < 4; i++) for (int i = 0; i < 4; i++)
@ -40,12 +37,12 @@ public class TunnelGatewayPumper implements Runnable {
public void run() { public void run() {
PumpedTunnelGateway gw = null; PumpedTunnelGateway gw = null;
List queueBuf = new ArrayList(32); List<TunnelGateway.Pending> queueBuf = new ArrayList(32);
while (!_stop) { while (!_stop) {
try { try {
synchronized (_wantsPumping) { synchronized (_wantsPumping) {
if (_wantsPumping.size() > 0) if (_wantsPumping.size() > 0)
gw = (PumpedTunnelGateway)_wantsPumping.remove(0); gw = _wantsPumping.remove(0);
else else
_wantsPumping.wait(); _wantsPumping.wait();
} }

View File

@ -55,6 +55,9 @@ public class PooledTunnelCreatorConfig extends TunnelCreatorConfig {
// remove us from the pool (but not the dispatcher) so that we aren't // remove us from the pool (but not the dispatcher) so that we aren't
// selected again. _expireJob is left to do its thing, in case there // selected again. _expireJob is left to do its thing, in case there
// are any straggling messages coming down the tunnel // are any straggling messages coming down the tunnel
//
// Todo: Maybe delay or prevent failing if we are near tunnel build capacity,
// to prevent collapse (loss of all tunnels)
_pool.tunnelFailed(this); _pool.tunnelFailed(this);
if (_testJob != null) // just in case... if (_testJob != null) // just in case...
_context.jobQueue().removeJob(_testJob); _context.jobQueue().removeJob(_testJob);