* Tunnels:

- Increase timeouts on all deferred netdb lookups to 15s; add lookup stats
    - Cleanups, javadocs, log tweaks
This commit is contained in:
zzz
2011-08-29 17:51:00 +00:00
parent 92ffea2237
commit 20855c9c44
9 changed files with 167 additions and 90 deletions

View File

@ -46,25 +46,12 @@ import net.i2p.util.Log;
*/ */
public class BatchedPreprocessor extends TrivialPreprocessor { public class BatchedPreprocessor extends TrivialPreprocessor {
private long _pendingSince; private long _pendingSince;
private String _name; private final String _name;
public BatchedPreprocessor(RouterContext ctx, String name) { public BatchedPreprocessor(RouterContext ctx, String name) {
super(ctx); super(ctx);
_log = ctx.logManager().getLog(BatchedPreprocessor.class);
_name = name; _name = name;
_pendingSince = 0; // all createRateStat() moved to TunnelDispatcher
ctx.statManager().createRateStat("tunnel.batchMultipleCount", "How many messages are batched into a tunnel message", "Tunnels", new long[] { 10*60*1000, 60*60*1000 });
ctx.statManager().createRateStat("tunnel.batchDelay", "How many messages were pending when the batching waited", "Tunnels", new long[] { 10*60*1000, 60*60*1000 });
ctx.statManager().createRateStat("tunnel.batchDelaySent", "How many messages were flushed when the batching delay completed", "Tunnels", new long[] { 10*60*1000, 60*60*1000 });
ctx.statManager().createRateStat("tunnel.batchCount", "How many groups of messages were flushed together", "Tunnels", new long[] { 10*60*1000, 60*60*1000 });
ctx.statManager().createRateStat("tunnel.batchDelayAmount", "How long we should wait before flushing the batch", "Tunnels", new long[] { 10*60*1000, 60*60*1000 });
ctx.statManager().createRateStat("tunnel.batchFlushRemaining", "How many messages remain after flushing", "Tunnels", new long[] { 10*60*1000, 60*60*1000 });
ctx.statManager().createRateStat("tunnel.writeDelay", "How long after a message reaches the gateway is it processed (lifetime is size)", "Tunnels", new long[] { 10*60*1000, 60*60*1000 });
ctx.statManager().createRateStat("tunnel.batchSmallFragments", "How many outgoing pad bytes are in small fragments?",
"Tunnels", new long[] { 10*60*1000l, 60*60*1000l });
ctx.statManager().createRateStat("tunnel.batchFullFragments", "How many outgoing tunnel messages use the full data area?",
"Tunnels", new long[] { 10*60*1000l, 60*60*1000l });
ctx.statManager().createRateStat("tunnel.batchFragmentation", "Avg. number of fragments per msg", "Tunnels", new long[] { 10*60*1000, 60*60*1000 });
} }
/** 1003 */ /** 1003 */
@ -426,7 +413,8 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
int later = msg.getData().length - msg.getOffset(); int later = msg.getData().length - msg.getOffset();
if (later > 0) if (later > 0)
frag--; frag--;
_log.debug("writing " + msg.getMessageId() + " fragment " + frag if (_log.shouldLog(Log.DEBUG))
_log.debug("writing " + msg.getMessageId() + " fragment " + frag
+ ", ending at " + offset + " prev " + prevOffset + ", ending at " + offset + " prev " + prevOffset
+ " leaving " + later + " bytes for later"); + " leaving " + later + " bytes for later");
} }

View File

@ -7,33 +7,43 @@ import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
/** /**
* * Handle messages at the IBGW
*/ */
public class InboundGatewayReceiver implements TunnelGateway.Receiver { class InboundGatewayReceiver implements TunnelGateway.Receiver {
private RouterContext _context; private final RouterContext _context;
private HopConfig _config; private final HopConfig _config;
private RouterInfo _target; private RouterInfo _target;
private static final long MAX_LOOKUP_TIME = 15*1000;
public InboundGatewayReceiver(RouterContext ctx, HopConfig cfg) { public InboundGatewayReceiver(RouterContext ctx, HopConfig cfg) {
_context = ctx; _context = ctx;
_config = cfg; _config = cfg;
// all createRateStat() in TunnelDispatcher
} }
public long receiveEncrypted(byte[] encrypted) { public long receiveEncrypted(byte[] encrypted) {
return receiveEncrypted(encrypted, false); return receiveEncrypted(encrypted, false);
} }
public long receiveEncrypted(byte[] encrypted, boolean alreadySearched) { public long receiveEncrypted(byte[] encrypted, boolean alreadySearched) {
if (!alreadySearched) if (!alreadySearched)
_config.incrementProcessedMessages(); _config.incrementProcessedMessages();
if (_target == null) { if (_target == null) {
_target = _context.netDb().lookupRouterInfoLocally(_config.getSendTo()); _target = _context.netDb().lookupRouterInfoLocally(_config.getSendTo());
if (_target == null) { if (_target == null) {
// It should be rare to forget the router info for the next peer
ReceiveJob j = null; ReceiveJob j = null;
if (!alreadySearched) if (alreadySearched)
_context.statManager().addRateData("tunnel.inboundLookupSuccess", 0, 0);
else
j = new ReceiveJob(_context, encrypted); j = new ReceiveJob(_context, encrypted);
_context.netDb().lookupRouterInfo(_config.getSendTo(), j, j, 5*1000); _context.netDb().lookupRouterInfo(_config.getSendTo(), j, j, MAX_LOOKUP_TIME);
return -1; return -1;
} }
} }
if (alreadySearched)
_context.statManager().addRateData("tunnel.inboundLookupSuccess", 1, 0);
// We do this before the preprocessor now (i.e. before fragmentation) // We do this before the preprocessor now (i.e. before fragmentation)
//if (_context.tunnelDispatcher().shouldDropParticipatingMessage("IBGW", encrypted.length)) //if (_context.tunnelDispatcher().shouldDropParticipatingMessage("IBGW", encrypted.length))
@ -53,12 +63,15 @@ public class InboundGatewayReceiver implements TunnelGateway.Receiver {
} }
private class ReceiveJob extends JobImpl { private class ReceiveJob extends JobImpl {
private byte[] _encrypted; private final byte[] _encrypted;
public ReceiveJob(RouterContext ctx, byte data[]) { public ReceiveJob(RouterContext ctx, byte data[]) {
super(ctx); super(ctx);
_encrypted = data; _encrypted = data;
} }
public String getName() { return "lookup first hop"; }
public String getName() { return "IBGW lookup first hop"; }
public void runJob() { public void runJob() {
receiveEncrypted(_encrypted, true); receiveEncrypted(_encrypted, true);
} }

View File

@ -19,12 +19,13 @@ public class OutboundMessageDistributor {
private final int _priority; private final int _priority;
private final Log _log; private final Log _log;
private static final int MAX_DISTRIBUTE_TIME = 10*1000; private static final long MAX_DISTRIBUTE_TIME = 15*1000;
public OutboundMessageDistributor(RouterContext ctx, int priority) { public OutboundMessageDistributor(RouterContext ctx, int priority) {
_context = ctx; _context = ctx;
_priority = priority; _priority = priority;
_log = ctx.logManager().getLog(OutboundMessageDistributor.class); _log = ctx.logManager().getLog(OutboundMessageDistributor.class);
// all createRateStat() in TunnelDispatcher
} }
public void distribute(I2NPMessage msg, Hash target) { public void distribute(I2NPMessage msg, Hash target) {
@ -34,10 +35,12 @@ public class OutboundMessageDistributor {
public void distribute(I2NPMessage msg, Hash target, TunnelId tunnel) { public void distribute(I2NPMessage msg, Hash target, TunnelId tunnel) {
RouterInfo info = _context.netDb().lookupRouterInfoLocally(target); RouterInfo info = _context.netDb().lookupRouterInfoLocally(target);
if (info == null) { if (info == null) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.INFO))
_log.debug("outbound distributor to " + target.toBase64().substring(0,4) _log.info("outbound distributor to " + target
+ "." + (tunnel != null ? tunnel.getTunnelId() + "" : "") + "." + (tunnel != null ? tunnel.getTunnelId() + "" : "")
+ ": no info locally, searching..."); + ": no info locally, searching...");
// TODO - should we set the search timeout based on the message timeout,
// or is that a bad idea due to clock skews?
_context.netDb().lookupRouterInfo(target, new DistributeJob(_context, msg, target, tunnel), null, MAX_DISTRIBUTE_TIME); _context.netDb().lookupRouterInfo(target, new DistributeJob(_context, msg, target, tunnel), null, MAX_DISTRIBUTE_TIME);
return; return;
} else { } else {
@ -68,7 +71,7 @@ public class OutboundMessageDistributor {
out.setPriority(_priority); out.setPriority(_priority);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("queueing outbound message to " + target.getIdentity().calculateHash().toBase64().substring(0,4)); _log.debug("queueing outbound message to " + target.getIdentity().calculateHash());
_context.outNetMessagePool().add(out); _context.outNetMessagePool().add(out);
} }
} }
@ -85,24 +88,26 @@ public class OutboundMessageDistributor {
_tunnel = id; _tunnel = id;
} }
public String getName() { return "Distribute outbound message"; } public String getName() { return "OBEP distribute after lookup"; }
public void runJob() { public void runJob() {
RouterInfo info = getContext().netDb().lookupRouterInfoLocally(_target); RouterInfo info = getContext().netDb().lookupRouterInfoLocally(_target);
int stat;
if (info != null) { if (info != null) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("outbound distributor to " + _target.toBase64().substring(0,4) _log.debug("outbound distributor to " + _target
+ "." + (_tunnel != null ? _tunnel.getTunnelId() + "" : "") + "." + (_tunnel != null ? _tunnel.getTunnelId() + "" : "")
+ ": found on search"); + ": found on search");
distribute(_message, info, _tunnel); distribute(_message, info, _tunnel);
stat = 1;
} else { } else {
// TODO add a stat here
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("outbound distributor to " + _target.toBase64().substring(0,4) _log.warn("outbound distributor to " + _target
+ "." + (_tunnel != null ? _tunnel.getTunnelId() + "" : "") + "." + (_tunnel != null ? _tunnel.getTunnelId() + "" : "")
+ ": NOT found on search"); + ": NOT found on search");
stat = 0;
} }
_context.statManager().addRateData("tunnel.distributeLookupSuccess", stat, 0);
} }
} }
} }

View File

@ -8,6 +8,7 @@ import net.i2p.router.RouterContext;
import net.i2p.util.Log; import net.i2p.util.Log;
/** /**
* We are the outbound gateway - we created this outbound tunnel.
* Receive the outbound message after it has been preprocessed and encrypted, * Receive the outbound message after it has been preprocessed and encrypted,
* then forward it on to the first hop in the tunnel. * then forward it on to the first hop in the tunnel.
* *
@ -18,11 +19,14 @@ class OutboundReceiver implements TunnelGateway.Receiver {
private final TunnelCreatorConfig _config; private final TunnelCreatorConfig _config;
private RouterInfo _nextHopCache; private RouterInfo _nextHopCache;
private static final long MAX_LOOKUP_TIME = 15*1000;
public OutboundReceiver(RouterContext ctx, TunnelCreatorConfig cfg) { public OutboundReceiver(RouterContext ctx, TunnelCreatorConfig cfg) {
_context = ctx; _context = ctx;
_log = ctx.logManager().getLog(OutboundReceiver.class); _log = ctx.logManager().getLog(OutboundReceiver.class);
_config = cfg; _config = cfg;
_nextHopCache = _context.netDb().lookupRouterInfoLocally(_config.getPeer(1)); _nextHopCache = _context.netDb().lookupRouterInfoLocally(_config.getPeer(1));
// all createRateStat() in TunnelDispatcher
} }
public long receiveEncrypted(byte encrypted[]) { public long receiveEncrypted(byte encrypted[]) {
@ -40,11 +44,12 @@ class OutboundReceiver implements TunnelGateway.Receiver {
send(msg, ri); send(msg, ri);
return msg.getUniqueId(); return msg.getUniqueId();
} else { } else {
// TODO add a stat here // It should be rare to forget the router info for a peer in our own tunnel.
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("lookup of " + _config.getPeer(1).toBase64().substring(0,4) _log.warn("lookup of " + _config.getPeer(1)
+ " required for " + msg); + " required for " + msg);
_context.netDb().lookupRouterInfo(_config.getPeer(1), new SendJob(_context, msg), new FailedJob(_context), 10*1000); _context.netDb().lookupRouterInfo(_config.getPeer(1), new SendJob(_context, msg),
new FailedJob(_context), MAX_LOOKUP_TIME);
return -1; return -1;
} }
} }
@ -69,17 +74,22 @@ class OutboundReceiver implements TunnelGateway.Receiver {
_msg = msg; _msg = msg;
} }
public String getName() { return "forward a tunnel message"; } public String getName() { return "OBGW send after lookup"; }
public void runJob() { public void runJob() {
RouterInfo ri = _context.netDb().lookupRouterInfoLocally(_config.getPeer(1)); RouterInfo ri = _context.netDb().lookupRouterInfoLocally(_config.getPeer(1));
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("lookup of " + _config.getPeer(1).toBase64().substring(0,4) _log.debug("lookup of " + _config.getPeer(1)
+ " successful? " + (ri != null)); + " successful? " + (ri != null));
int stat;
if (ri != null) { if (ri != null) {
_nextHopCache = ri; _nextHopCache = ri;
send(_msg, ri); send(_msg, ri);
stat = 1;
} else {
stat = 0;
} }
_context.statManager().addRateData("tunnel.outboundLookupSuccess", stat, 0);
} }
} }
@ -88,13 +98,13 @@ class OutboundReceiver implements TunnelGateway.Receiver {
super(ctx); super(ctx);
} }
public String getName() { return "failed looking for our outbound gateway"; } public String getName() { return "OBGW lookup fail"; }
public void runJob() { public void runJob() {
// TODO add a stat here
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("lookup of " + _config.getPeer(1).toBase64().substring(0,4) _log.warn("lookup of " + _config.getPeer(1)
+ " failed for " + _config); + " failed for " + _config);
_context.statManager().addRateData("tunnel.outboundLookupSuccess", 0, 0);
} }
} }
} }

View File

@ -20,8 +20,8 @@ import net.i2p.util.Log;
* See FragmentHandler Javadoc for tunnel message fragment format * See FragmentHandler Javadoc for tunnel message fragment format
*/ */
public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor { public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
protected RouterContext _context; protected final RouterContext _context;
protected Log _log; protected final Log _log;
public static final int PREPROCESSED_SIZE = 1024; public static final int PREPROCESSED_SIZE = 1024;
protected static final int IV_SIZE = HopProcessor.IV_LENGTH; protected static final int IV_SIZE = HopProcessor.IV_LENGTH;
@ -39,7 +39,7 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
public TrivialPreprocessor(RouterContext ctx) { public TrivialPreprocessor(RouterContext ctx) {
_context = ctx; _context = ctx;
_log = ctx.logManager().getLog(TrivialPreprocessor.class); _log = ctx.logManager().getLog(getClass());
} }
/** how long do we want to wait before flushing */ /** how long do we want to wait before flushing */

View File

@ -10,14 +10,12 @@ import net.i2p.router.RouterContext;
* @deprecated unused * @deprecated unused
*/ */
public class TrivialRouterPreprocessor extends TrivialPreprocessor { public class TrivialRouterPreprocessor extends TrivialPreprocessor {
private RouterContext _routerContext;
public TrivialRouterPreprocessor(RouterContext ctx) { public TrivialRouterPreprocessor(RouterContext ctx) {
super(ctx); super(ctx);
_routerContext = ctx;
} }
protected void notePreprocessing(long messageId, int numFragments, int totalLength, List<Long> messageIds) { protected void notePreprocessing(long messageId, int numFragments, int totalLength, List<Long> messageIds) {
_routerContext.messageHistory().fragmentMessage(messageId, numFragments, totalLength, messageIds, null); _context.messageHistory().fragmentMessage(messageId, numFragments, totalLength, messageIds, null);
} }
} }

View File

@ -52,9 +52,6 @@ public class TunnelDispatcher implements Service {
_participants = new ConcurrentHashMap(); _participants = new ConcurrentHashMap();
_inboundGateways = new ConcurrentHashMap(); _inboundGateways = new ConcurrentHashMap();
_participatingConfig = new ConcurrentHashMap(); _participatingConfig = new ConcurrentHashMap();
_lastParticipatingExpiration = 0;
_lastDropTime = 0;
_validator = null;
_pumper = new TunnelGatewayPumper(ctx); _pumper = new TunnelGatewayPumper(ctx);
_leaveJob = new LeaveTunnel(ctx); _leaveJob = new LeaveTunnel(ctx);
ctx.statManager().createRequiredRateStat("tunnel.participatingTunnels", ctx.statManager().createRequiredRateStat("tunnel.participatingTunnels",
@ -107,7 +104,7 @@ public class TunnelDispatcher implements Service {
new long[] { 60*1000l, 60*60*1000l }); new long[] { 60*1000l, 60*60*1000l });
ctx.statManager().createRateStat("tunnel.dispatchOutboundZeroHopTime", ctx.statManager().createRateStat("tunnel.dispatchOutboundZeroHopTime",
"How long it takes to dispatch an outbound message through a zero hop tunnel", "Tunnels", "How long it takes to dispatch an outbound message through a zero hop tunnel", "Tunnels",
new long[] { 60*1000l, 60*60*1000l }); new long[] { 60*60*1000l });
ctx.statManager().createRequiredRateStat("tunnel.participatingBandwidth", ctx.statManager().createRequiredRateStat("tunnel.participatingBandwidth",
"Participating traffic received (Bytes/sec)", "Tunnels", "Participating traffic received (Bytes/sec)", "Tunnels",
new long[] { 60*1000l, 60*10*1000l }); new long[] { 60*1000l, 60*10*1000l });
@ -129,6 +126,27 @@ public class TunnelDispatcher implements Service {
ctx.statManager().createRateStat("tunnel.failedPartially", ctx.statManager().createRateStat("tunnel.failedPartially",
"How many messages are sent through a tunnel that only failed partially (period == failures)?", "Tunnels", "How many messages are sent through a tunnel that only failed partially (period == failures)?", "Tunnels",
new long[] { 60*1000l, 10*60*1000l, 60*60*1000l }); new long[] { 60*1000l, 10*60*1000l, 60*60*1000l });
// following are for BatchedPreprocessor
ctx.statManager().createRateStat("tunnel.batchMultipleCount", "How many messages are batched into a tunnel message", "Tunnels", new long[] { 10*60*1000, 60*60*1000 });
ctx.statManager().createRateStat("tunnel.batchDelay", "How many messages were pending when the batching waited", "Tunnels", new long[] { 10*60*1000, 60*60*1000 });
ctx.statManager().createRateStat("tunnel.batchDelaySent", "How many messages were flushed when the batching delay completed", "Tunnels", new long[] { 10*60*1000, 60*60*1000 });
ctx.statManager().createRateStat("tunnel.batchCount", "How many groups of messages were flushed together", "Tunnels", new long[] { 10*60*1000, 60*60*1000 });
ctx.statManager().createRateStat("tunnel.batchDelayAmount", "How long we should wait before flushing the batch", "Tunnels", new long[] { 10*60*1000, 60*60*1000 });
ctx.statManager().createRateStat("tunnel.batchFlushRemaining", "How many messages remain after flushing", "Tunnels", new long[] { 10*60*1000, 60*60*1000 });
ctx.statManager().createRateStat("tunnel.writeDelay", "How long after a message reaches the gateway is it processed (lifetime is size)", "Tunnels", new long[] { 10*60*1000, 60*60*1000 });
ctx.statManager().createRateStat("tunnel.batchSmallFragments", "How many outgoing pad bytes are in small fragments?",
"Tunnels", new long[] { 10*60*1000l, 60*60*1000l });
ctx.statManager().createRateStat("tunnel.batchFullFragments", "How many outgoing tunnel messages use the full data area?",
"Tunnels", new long[] { 10*60*1000l, 60*60*1000l });
ctx.statManager().createRateStat("tunnel.batchFragmentation", "Avg. number of fragments per msg", "Tunnels", new long[] { 10*60*1000, 60*60*1000 });
// following is for OutboundMessageDistributor
ctx.statManager().createRateStat("tunnel.distributeLookupSuccess", "Was a deferred lookup successful?", "Tunnels", new long[] { 60*60*1000 });
// following is for OutboundReceiver
ctx.statManager().createRateStat("tunnel.outboundLookupSuccess", "Was a deferred lookup successful?", "Tunnels", new long[] { 60*60*1000 });
// following is for InboundGatewayReceiver
ctx.statManager().createRateStat("tunnel.inboundLookupSuccess", "Was a deferred lookup successful?", "Tunnels", new long[] { 60*60*1000 });
// following is for TunnelParticipant
ctx.statManager().createRateStat("tunnel.participantLookupSuccess", "Was a deferred lookup successful?", "Tunnels", new long[] { 60*60*1000 });
} }
/** for IBGW */ /** for IBGW */
@ -171,6 +189,7 @@ public class TunnelDispatcher implements Service {
_context.messageHistory().tunnelJoined("outboundZeroHop", cfg); _context.messageHistory().tunnelJoined("outboundZeroHop", cfg);
} }
} }
/** /**
* We are the inbound endpoint - we created this tunnel * We are the inbound endpoint - we created this tunnel
*/ */
@ -210,6 +229,7 @@ public class TunnelDispatcher implements Service {
_lastParticipatingExpiration = cfg.getExpiration(); _lastParticipatingExpiration = cfg.getExpiration();
_leaveJob.add(cfg); _leaveJob.add(cfg);
} }
/** /**
* We are the outbound endpoint in this tunnel, and did not create it * We are the outbound endpoint in this tunnel, and did not create it
* *

View File

@ -18,21 +18,34 @@ import net.i2p.util.Log;
* which it will then selectively forward. * which it will then selectively forward.
*/ */
public class TunnelParticipant { public class TunnelParticipant {
private RouterContext _context; private final RouterContext _context;
private Log _log; private final Log _log;
private HopConfig _config; private final HopConfig _config;
private HopProcessor _processor; private final HopProcessor _processor;
private InboundEndpointProcessor _inboundEndpointProcessor; private final InboundEndpointProcessor _inboundEndpointProcessor;
private InboundMessageDistributor _inboundDistributor; private final InboundMessageDistributor _inboundDistributor;
private FragmentHandler _handler; private final FragmentHandler _handler;
private RouterInfo _nextHopCache; private RouterInfo _nextHopCache;
private static final long MAX_LOOKUP_TIME = 15*1000;
/** for next hop when a tunnel is first created */
private static final long LONG_MAX_LOOKUP_TIME = 30*1000;
/** not an inbound endpoint */
public TunnelParticipant(RouterContext ctx, HopConfig config, HopProcessor processor) { public TunnelParticipant(RouterContext ctx, HopConfig config, HopProcessor processor) {
this(ctx, config, processor, null); this(ctx, config, processor, null);
} }
/** inbound endpoint */
public TunnelParticipant(RouterContext ctx, InboundEndpointProcessor inEndProc) { public TunnelParticipant(RouterContext ctx, InboundEndpointProcessor inEndProc) {
this(ctx, null, null, inEndProc); this(ctx, null, null, inEndProc);
} }
/**
* @param config may be null (inbound endpoint if null)
* @param processor may be null (inbound endpoint if null)
* @param inEndProc may be null (inbound endpoint if non-null)
*/
private TunnelParticipant(RouterContext ctx, HopConfig config, HopProcessor processor, InboundEndpointProcessor inEndProc) { private TunnelParticipant(RouterContext ctx, HopConfig config, HopProcessor processor, InboundEndpointProcessor inEndProc) {
_context = ctx; _context = ctx;
_log = ctx.logManager().getLog(TunnelParticipant.class); _log = ctx.logManager().getLog(TunnelParticipant.class);
@ -40,23 +53,32 @@ public class TunnelParticipant {
_processor = processor; _processor = processor;
if ( (config == null) || (config.getSendTo() == null) ) if ( (config == null) || (config.getSendTo() == null) )
_handler = new RouterFragmentHandler(ctx, new DefragmentedHandler()); _handler = new RouterFragmentHandler(ctx, new DefragmentedHandler());
else
_handler = null; // final
_inboundEndpointProcessor = inEndProc; _inboundEndpointProcessor = inEndProc;
if (inEndProc != null) if (inEndProc != null)
_inboundDistributor = new InboundMessageDistributor(ctx, inEndProc.getDestination()); _inboundDistributor = new InboundMessageDistributor(ctx, inEndProc.getDestination());
else
_inboundDistributor = null; // final
if ( (_config != null) && (_config.getSendTo() != null) ) { if ( (_config != null) && (_config.getSendTo() != null) ) {
_nextHopCache = _context.netDb().lookupRouterInfoLocally(_config.getSendTo()); _nextHopCache = _context.netDb().lookupRouterInfoLocally(_config.getSendTo());
if (_nextHopCache == null) if (_nextHopCache == null)
_context.netDb().lookupRouterInfo(_config.getSendTo(), new Found(_context), null, 60*1000); _context.netDb().lookupRouterInfo(_config.getSendTo(), new Found(_context), null, LONG_MAX_LOOKUP_TIME);
} }
// all createRateStat() in TunnelDispatcher
} }
private class Found extends JobImpl { private class Found extends JobImpl {
public Found(RouterContext ctx) { super(ctx); } public Found(RouterContext ctx) { super(ctx); }
public String getName() { return "Next hop info found"; } public String getName() { return "Next hop info found"; }
public void runJob() { public void runJob() {
if (_nextHopCache == null) if (_nextHopCache == null) {
_nextHopCache = _context.netDb().lookupRouterInfoLocally(_config.getSendTo()); _nextHopCache = _context.netDb().lookupRouterInfoLocally(_config.getSendTo());
// nothing for failure since fail job is null
_context.statManager().addRateData("tunnel.participantLookupSuccess", 1, 0);
}
} }
} }
@ -84,17 +106,19 @@ public class TunnelParticipant {
ri = _context.netDb().lookupRouterInfoLocally(_config.getSendTo()); ri = _context.netDb().lookupRouterInfoLocally(_config.getSendTo());
if (ri != null) { if (ri != null) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Send off to nextHop directly (" + _config.getSendTo().toBase64().substring(0,4) _log.debug("Send off to nextHop directly (" + _config.getSendTo()
+ " for " + msg); + " for " + msg);
send(_config, msg, ri); send(_config, msg, ri);
// see comments below // see comments below
//if (_config != null) //if (_config != null)
// incrementThroughput(_config.getReceiveFrom()); // incrementThroughput(_config.getReceiveFrom());
} else { } else {
// It should be rare to forget the router info for the next peer
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Lookup the nextHop (" + _config.getSendTo().toBase64().substring(0,4) _log.warn("Lookup the nextHop (" + _config.getSendTo()
+ " for " + msg); + " for " + msg);
_context.netDb().lookupRouterInfo(_config.getSendTo(), new SendJob(_context, msg), new TimeoutJob(_context, msg), 10*1000); _context.netDb().lookupRouterInfo(_config.getSendTo(), new SendJob(_context, msg),
new TimeoutJob(_context, msg), MAX_LOOKUP_TIME);
} }
} else { } else {
_inboundEndpointProcessor.getConfig().incrementProcessedMessages(); _inboundEndpointProcessor.getConfig().incrementProcessedMessages();
@ -139,6 +163,7 @@ public class TunnelParticipant {
else else
return 0; return 0;
} }
public int getFailedCount() { public int getFailedCount() {
if (_handler != null) if (_handler != null)
return _handler.getFailedCount(); return _handler.getFailedCount();
@ -176,36 +201,46 @@ public class TunnelParticipant {
} }
private class SendJob extends JobImpl { private class SendJob extends JobImpl {
private TunnelDataMessage _msg; private final TunnelDataMessage _msg;
public SendJob(RouterContext ctx, TunnelDataMessage msg) { public SendJob(RouterContext ctx, TunnelDataMessage msg) {
super(ctx); super(ctx);
_msg = msg; _msg = msg;
} }
public String getName() { return "forward a tunnel message"; }
public String getName() { return "Participant send after lookup"; }
public void runJob() { public void runJob() {
if (_nextHopCache != null) { if (_nextHopCache != null) {
send(_config, _msg, _nextHopCache); send(_config, _msg, _nextHopCache);
} else { } else {
RouterInfo ri = _context.netDb().lookupRouterInfoLocally(_config.getSendTo()); RouterInfo ri = _context.netDb().lookupRouterInfoLocally(_config.getSendTo());
int stat;
if (ri != null) { if (ri != null) {
_nextHopCache = ri; _nextHopCache = ri;
send(_config, _msg, ri); send(_config, _msg, ri);
stat = 1;
} else { } else {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.WARN))
_log.error("Lookup the nextHop (" + _config.getSendTo().toBase64().substring(0,4) _log.warn("Lookup the nextHop (" + _config.getSendTo()
+ " failed! where do we go for " + _config + "? msg dropped: " + _msg); + " failed! where do we go for " + _config + "? msg dropped: " + _msg);
stat = 0;
} }
_context.statManager().addRateData("tunnel.participantLookupSuccess", stat, 0);
} }
} }
} }
private class TimeoutJob extends JobImpl { private class TimeoutJob extends JobImpl {
private TunnelDataMessage _msg; private final TunnelDataMessage _msg;
public TimeoutJob(RouterContext ctx, TunnelDataMessage msg) { public TimeoutJob(RouterContext ctx, TunnelDataMessage msg) {
super(ctx); super(ctx);
_msg = msg; _msg = msg;
} }
public String getName() { return "timeout looking for next hop info"; }
public String getName() { return "Participant next hop lookup timeout"; }
public void runJob() { public void runJob() {
if (_nextHopCache != null) if (_nextHopCache != null)
return; return;
@ -213,14 +248,15 @@ public class TunnelParticipant {
RouterInfo ri = _context.netDb().lookupRouterInfoLocally(_config.getSendTo()); RouterInfo ri = _context.netDb().lookupRouterInfoLocally(_config.getSendTo());
if (ri != null) { if (ri != null) {
_nextHopCache = ri; _nextHopCache = ri;
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.WARN))
_log.error("Lookup the nextHop (" + _config.getSendTo().toBase64().substring(0,4) _log.warn("Lookup the nextHop (" + _config.getSendTo()
+ " failed, but we found it!! where do we go for " + _config + "? msg dropped: " + _msg); + " failed, but we found it!! where do we go for " + _config + "? msg dropped: " + _msg);
} else { } else {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.WARN))
_log.error("Lookup the nextHop (" + _config.getSendTo().toBase64().substring(0,4) _log.warn("Lookup the nextHop (" + _config.getSendTo()
+ " failed! where do we go for " + _config + "? msg dropped: " + _msg); + " failed! where do we go for " + _config + "? msg dropped: " + _msg);
} }
_context.statManager().addRateData("tunnel.participantLookupSuccess", 0, 0);
} }
} }

View File

@ -90,6 +90,7 @@ class BuildHandler {
_context.statManager().createRateStat("tunnel.receiveRejectionCritical", "How often we are rejected due to critical failure?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); _context.statManager().createRateStat("tunnel.receiveRejectionCritical", "How often we are rejected due to critical failure?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("tunnel.corruptBuildReply", "", "Tunnels", new long[] { 24*60*60*1000l }); _context.statManager().createRateStat("tunnel.corruptBuildReply", "", "Tunnels", new long[] { 24*60*60*1000l });
ctx.statManager().createRateStat("tunnel.buildLookupSuccess", "Was a deferred lookup successful?", "Tunnels", new long[] { 60*60*1000 });
_processor = new BuildMessageProcessor(ctx); _processor = new BuildMessageProcessor(ctx);
_throttler = new ParticipatingThrottler(ctx); _throttler = new ParticipatingThrottler(ctx);
@ -104,7 +105,7 @@ class BuildHandler {
} }
private static final int MAX_HANDLE_AT_ONCE = 2; private static final int MAX_HANDLE_AT_ONCE = 2;
private static final int NEXT_HOP_LOOKUP_TIMEOUT = 5*1000; private static final int NEXT_HOP_LOOKUP_TIMEOUT = 15*1000;
/** /**
* Blocking call to handle a few of the pending inbound requests, returning how many * Blocking call to handle a few of the pending inbound requests, returning how many
@ -197,7 +198,7 @@ class BuildHandler {
_context.statManager().addRateData("tunnel.tierReject" + bwTier, 1, 0); _context.statManager().addRateData("tunnel.tierReject" + bwTier, 1, 0);
} }
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info(msg.getUniqueId() + ": Peer " + peer.toBase64() + " replied with status " + howBad); _log.info(msg.getUniqueId() + ": Peer " + peer + " replied with status " + howBad);
if (howBad == 0) { if (howBad == 0) {
// w3wt // w3wt
@ -300,8 +301,9 @@ class BuildHandler {
if (nextPeerInfo == null) { if (nextPeerInfo == null) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Request " + state.msg.getUniqueId() + "/" + req.readReceiveTunnelId() + "/" + req.readNextTunnelId() _log.debug("Request " + state.msg.getUniqueId() + "/" + req.readReceiveTunnelId() + "/" + req.readNextTunnelId()
+ " handled, looking for the next peer " + nextPeer.toBase64()); + " handled, looking for the next peer " + nextPeer);
_context.netDb().lookupRouterInfo(nextPeer, new HandleReq(_context, state, req, nextPeer), new TimeoutReq(_context, state, req, nextPeer), NEXT_HOP_LOOKUP_TIMEOUT); _context.netDb().lookupRouterInfo(nextPeer, new HandleReq(_context, state, req, nextPeer),
new TimeoutReq(_context, state, req, nextPeer), NEXT_HOP_LOOKUP_TIMEOUT);
return -1; return -1;
} else { } else {
long beforeHandle = System.currentTimeMillis(); long beforeHandle = System.currentTimeMillis();
@ -309,7 +311,7 @@ class BuildHandler {
long handleTime = System.currentTimeMillis() - beforeHandle; long handleTime = System.currentTimeMillis() - beforeHandle;
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Request " + state.msg.getUniqueId() + " handled and we know the next peer " _log.debug("Request " + state.msg.getUniqueId() + " handled and we know the next peer "
+ nextPeer.toBase64() + " after " + handleTime + nextPeer + " after " + handleTime
+ "/" + decryptTime + "/" + lookupTime + "/" + timeSinceReceived); + "/" + decryptTime + "/" + lookupTime + "/" + timeSinceReceived);
return handleTime; return handleTime;
} }
@ -344,13 +346,17 @@ class BuildHandler {
public String getName() { return "Deferred tunnel join processing"; } public String getName() { return "Deferred tunnel join processing"; }
public void runJob() { public void runJob() {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Request " + _state.msg.getUniqueId() + " handled with a successful deferred lookup for the next peer " + _nextPeer.toBase64()); _log.debug("Request " + _state.msg.getUniqueId() + " handled with a successful deferred lookup for the next peer " + _nextPeer);
RouterInfo ri = getContext().netDb().lookupRouterInfoLocally(_nextPeer); RouterInfo ri = getContext().netDb().lookupRouterInfoLocally(_nextPeer);
if (ri != null) if (ri != null) {
handleReq(ri, _state, _req, _nextPeer); handleReq(ri, _state, _req, _nextPeer);
else if (_log.shouldLog(Log.WARN)) getContext().statManager().addRateData("tunnel.buildLookupSuccess", 1, 0);
_log.warn("Deferred successfully, but we couldnt find " + _nextPeer.toBase64() + "?"); } else {
if (_log.shouldLog(Log.WARN))
_log.warn("Deferred successfully, but we couldnt find " + _nextPeer);
getContext().statManager().addRateData("tunnel.buildLookupSuccess", 0, 0);
}
} }
} }
@ -367,6 +373,7 @@ class BuildHandler {
public String getName() { return "Timeout looking for next peer for tunnel join"; } public String getName() { return "Timeout looking for next peer for tunnel join"; }
public void runJob() { public void runJob() {
getContext().statManager().addRateData("tunnel.rejectTimeout", 1, 0); getContext().statManager().addRateData("tunnel.rejectTimeout", 1, 0);
getContext().statManager().addRateData("tunnel.buildLookupSuccess", 0, 0);
// logging commented out so class can be static // logging commented out so class can be static
//if (_log.shouldLog(Log.WARN)) //if (_log.shouldLog(Log.WARN))
// _log.warn("Request " + _state.msg.getUniqueId() // _log.warn("Request " + _state.msg.getUniqueId()
@ -374,7 +381,7 @@ class BuildHandler {
// ??? should we blame the peer here? getContext().profileManager().tunnelTimedOut(_nextPeer); // ??? should we blame the peer here? getContext().profileManager().tunnelTimedOut(_nextPeer);
getContext().messageHistory().tunnelRejected(_state.fromHash, new TunnelId(_req.readReceiveTunnelId()), _nextPeer, getContext().messageHistory().tunnelRejected(_state.fromHash, new TunnelId(_req.readReceiveTunnelId()), _nextPeer,
"rejected because we couldn't find " + _nextPeer.toBase64() + ": " + "rejected because we couldn't find " + _nextPeer + ": " +
_state.msg.getUniqueId() + "/" + _req.readNextTunnelId()); _state.msg.getUniqueId() + "/" + _req.readNextTunnelId());
} }
} }
@ -524,8 +531,8 @@ class BuildHandler {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Responding to " + state.msg.getUniqueId() + "/" + ourId _log.debug("Responding to " + state.msg.getUniqueId() + "/" + ourId
+ " after " + recvDelay + "/" + proactiveDrops + " with " + response + " after " + recvDelay + "/" + proactiveDrops + " with " + response
+ " from " + (state.fromHash != null ? state.fromHash.toBase64() : + " from " + (state.fromHash != null ? state.fromHash :
state.from != null ? state.from.calculateHash().toBase64() : "tunnel")); state.from != null ? state.from.calculateHash() : "tunnel"));
HopConfig cfg = null; HopConfig cfg = null;
if (response == 0) { if (response == 0) {
@ -600,13 +607,13 @@ class BuildHandler {
} }
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Read slot " + ourSlot + " containing our hop @ " + _context.routerHash().toBase64() _log.debug("Read slot " + ourSlot + " containing our hop @ " + _context.routerHash()
+ " accepted? " + response + " receiving on " + ourId + " accepted? " + response + " receiving on " + ourId
+ " sending to " + nextId + " sending to " + nextId
+ " on " + nextPeer.toBase64() + " on " + nextPeer
+ " inGW? " + isInGW + " outEnd? " + isOutEnd + " time difference " + (now-time) + " inGW? " + isInGW + " outEnd? " + isOutEnd + " time difference " + (now-time)
+ " recvDelay " + recvDelay + " replyMessage " + req.readReplyMessageId() + " recvDelay " + recvDelay + " replyMessage " + req.readReplyMessageId()
+ " replyKey " + req.readReplyKey().toBase64() + " replyIV " + Base64.encode(req.readReplyIV())); + " replyKey " + req.readReplyKey() + " replyIV " + Base64.encode(req.readReplyIV()));
// now actually send the response // now actually send the response
if (!isOutEnd) { if (!isOutEnd) {
@ -672,7 +679,7 @@ class BuildHandler {
PooledTunnelCreatorConfig cfg = _exec.removeFromBuilding(reqId); PooledTunnelCreatorConfig cfg = _exec.removeFromBuilding(reqId);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive tunnel build message " + reqId + " from " _log.debug("Receive tunnel build message " + reqId + " from "
+ (from != null ? from.calculateHash().toBase64() : fromHash != null ? fromHash.toBase64() : "tunnels") + (from != null ? from.calculateHash() : fromHash != null ? fromHash : "tunnels")
+ ", found matching tunnel? " + (cfg != null)); + ", found matching tunnel? " + (cfg != null));
if (cfg != null) { if (cfg != null) {
if (!cfg.isInbound()) { if (!cfg.isInbound()) {
@ -762,7 +769,7 @@ class BuildHandler {
public Job createJob(I2NPMessage receivedMessage, RouterIdentity from, Hash fromHash) { public Job createJob(I2NPMessage receivedMessage, RouterIdentity from, Hash fromHash) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive tunnel build reply message " + receivedMessage.getUniqueId() + " from " _log.debug("Receive tunnel build reply message " + receivedMessage.getUniqueId() + " from "
+ (fromHash != null ? fromHash.toBase64() : from != null ? from.calculateHash().toBase64() : "a tunnel")); + (fromHash != null ? fromHash : from != null ? from.calculateHash() : "a tunnel"));
handleReply(new BuildReplyMessageState(receivedMessage)); handleReply(new BuildReplyMessageState(receivedMessage));
return _buildReplyMessageHandlerJob; return _buildReplyMessageHandlerJob;
} }