* allow 2 failures in a tunnel before killing the tunnel. this is useful because
much of our tunnel failure detection code itself uses tunnels - send out tunnel 1 and get the reply through tunnel 2. If it fails, which one "broke"? * we now add a failure mark to both in all situations, including during tunnel creation * properly check the tunnel expiration 2-2.5 minutes prior to completion, rather than the old 0.5-1.5 minutes.
This commit is contained in:
@ -55,6 +55,7 @@ public class TunnelInfo extends DataStructureImpl {
|
||||
private boolean _ready;
|
||||
private boolean _wasEverReady;
|
||||
private int _messagesProcessed;
|
||||
private int _tunnelFailures;
|
||||
|
||||
public TunnelInfo(I2PAppContext context) {
|
||||
_context = context;
|
||||
@ -77,6 +78,7 @@ public class TunnelInfo extends DataStructureImpl {
|
||||
_created = _context.clock().now();
|
||||
_lastTested = -1;
|
||||
_messagesProcessed = 0;
|
||||
_tunnelFailures = 0;
|
||||
}
|
||||
|
||||
public TunnelId getTunnelId() { return _id; }
|
||||
@ -182,6 +184,13 @@ public class TunnelInfo extends DataStructureImpl {
|
||||
/** we have just processed a message for this tunnel */
|
||||
public void messageProcessed() { _messagesProcessed++; }
|
||||
|
||||
/**
|
||||
* the tunnel was (potentially) unable to pass a message through.
|
||||
*
|
||||
* @return the new number of tunnel failures ever for this tunnel
|
||||
*/
|
||||
public int incrementFailures() { return ++_tunnelFailures; }
|
||||
|
||||
public void readBytes(InputStream in) throws DataFormatException, IOException {
|
||||
_options = DataHelper.readProperties(in);
|
||||
Boolean includeDest = DataHelper.readBoolean(in);
|
||||
|
@ -131,7 +131,7 @@ class ClientTunnelPool {
|
||||
*
|
||||
*/
|
||||
public int getSafePoolSize() {
|
||||
return getSafePoolSize(0);
|
||||
return getSafePoolSize(2*60*1000);
|
||||
}
|
||||
/**
|
||||
* Get the safe # pools at some point in the future
|
||||
@ -140,7 +140,7 @@ class ClientTunnelPool {
|
||||
*/
|
||||
public int getSafePoolSize(long futureMs) {
|
||||
int numSafe = 0;
|
||||
long expireAfter = _context.clock().now() + Router.CLOCK_FUDGE_FACTOR + futureMs;
|
||||
long expireAfter = _context.clock().now() + futureMs;
|
||||
for (Iterator iter = getInboundTunnelIds().iterator(); iter.hasNext(); ) {
|
||||
TunnelId id = (TunnelId)iter.next();
|
||||
TunnelInfo info = getInboundTunnel(id);
|
||||
|
@ -50,7 +50,7 @@ class ClientTunnelPoolManagerJob extends JobImpl {
|
||||
return;
|
||||
}
|
||||
int requestedPoolSize = _clientPool.getClientSettings().getNumInboundTunnels();
|
||||
int safePoolSize = _clientPool.getSafePoolSize(POOL_CHECK_DELAY);
|
||||
int safePoolSize = _clientPool.getSafePoolSize(2*60*1000 + POOL_CHECK_DELAY);
|
||||
if (safePoolSize < requestedPoolSize) {
|
||||
requestMoreTunnels(requestedPoolSize-safePoolSize);
|
||||
}
|
||||
|
@ -260,8 +260,8 @@ public class RequestTunnelJob extends JobImpl {
|
||||
_log.info("Sending tunnel create to " + _target.getIdentity().getHash().toBase64() +
|
||||
" to inbound gateway " + _inboundGateway.getGateway().toBase64() +
|
||||
" : " + _inboundGateway.getTunnelId().getTunnelId());
|
||||
ReplyJob onReply = new Success(_participant, _wrappedKey, _wrappedTags, _wrappedTo);
|
||||
Job onFail = new Failure(_participant);
|
||||
ReplyJob onReply = new Success(_participant, _wrappedKey, _wrappedTags, _wrappedTo, _inboundGateway.getTunnelId(), _outboundTunnel);
|
||||
Job onFail = new Failure(_participant, _inboundGateway.getTunnelId(), _outboundTunnel);
|
||||
MessageSelector selector = new Selector(_participant);
|
||||
SendTunnelMessageJob j = new SendTunnelMessageJob(getContext(), _garlicMessage,
|
||||
_outboundTunnel, _target.getIdentity().getHash(),
|
||||
@ -550,9 +550,11 @@ public class RequestTunnelJob extends JobImpl {
|
||||
private SessionKey _wrappedKey;
|
||||
private Set _wrappedTags;
|
||||
private PublicKey _wrappedTo;
|
||||
private TunnelId _replyTunnelId;
|
||||
private TunnelId _outboundTunnelId;
|
||||
private long _started;
|
||||
|
||||
public Success(TunnelInfo tunnel, SessionKey wrappedKey, Set wrappedTags, PublicKey wrappedTo) {
|
||||
public Success(TunnelInfo tunnel, SessionKey wrappedKey, Set wrappedTags, PublicKey wrappedTo, TunnelId replyTunnelId, TunnelId outboundTunnelId) {
|
||||
super(RequestTunnelJob.this.getContext());
|
||||
_tunnel = tunnel;
|
||||
_messages = new LinkedList();
|
||||
@ -560,6 +562,8 @@ public class RequestTunnelJob extends JobImpl {
|
||||
_wrappedKey = wrappedKey;
|
||||
_wrappedTags = wrappedTags;
|
||||
_wrappedTo = wrappedTo;
|
||||
_replyTunnelId = replyTunnelId;
|
||||
_outboundTunnelId = outboundTunnelId;
|
||||
_started = getContext().clock().now();
|
||||
}
|
||||
|
||||
@ -644,10 +648,14 @@ public class RequestTunnelJob extends JobImpl {
|
||||
|
||||
private class Failure extends JobImpl {
|
||||
private TunnelInfo _tunnel;
|
||||
private TunnelId _outboundTunnelId;
|
||||
private TunnelId _replyTunnelId;
|
||||
private long _started;
|
||||
public Failure(TunnelInfo tunnel) {
|
||||
public Failure(TunnelInfo tunnel, TunnelId replyTunnelId, TunnelId outboundTunnelId) {
|
||||
super(RequestTunnelJob.this.getContext());
|
||||
_tunnel = tunnel;
|
||||
_replyTunnelId = replyTunnelId;
|
||||
_outboundTunnelId = outboundTunnelId;
|
||||
_started = getContext().clock().now();
|
||||
}
|
||||
|
||||
@ -669,6 +677,11 @@ public class RequestTunnelJob extends JobImpl {
|
||||
// perhaps not an explicit reject, but an implicit one (due to dropped messages, tunnel failure, etc)
|
||||
getContext().profileManager().tunnelRejected(_tunnel.getThisHop(), responseTime, false);
|
||||
getContext().profileManager().messageFailed(_tunnel.getThisHop());
|
||||
|
||||
// one (or both) of the tunnels used to send the request / receive a reply failed
|
||||
_pool.tunnelFailed(_replyTunnelId);
|
||||
_pool.tunnelFailed(_outboundTunnelId);
|
||||
|
||||
Failure.this.getContext().statManager().updateFrequency("tunnel.buildFailFrequency");
|
||||
fail();
|
||||
}
|
||||
|
@ -524,13 +524,22 @@ class TunnelPool {
|
||||
return found;
|
||||
}
|
||||
|
||||
private static final int MAX_FAILURES_PER_TUNNEL = 2;
|
||||
|
||||
public void tunnelFailed(TunnelId id) {
|
||||
if (!_isLive) return;
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Tunnel " + id + " marked as not ready, since it /failed/", new Exception("Failed tunnel"));
|
||||
TunnelInfo info = getTunnelInfo(id);
|
||||
if (info == null)
|
||||
return;
|
||||
int failures = info.incrementFailures();
|
||||
if (failures <= MAX_FAILURES_PER_TUNNEL) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Tunnel " + id + " failure " + failures + ", but not fatal yet");
|
||||
return;
|
||||
}
|
||||
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Tunnel " + id + " marked as not ready, since it /failed/", new Exception("Failed tunnel"));
|
||||
_context.messageHistory().tunnelFailed(info.getTunnelId());
|
||||
info.setIsReady(false);
|
||||
Hash us = _context.routerHash();
|
||||
|
Reference in New Issue
Block a user