2005-02-22 jrandom

* Reworked the tunnel (re)building process to remove the tokens and
      provide cleaner controls on the tunnels built.
    * Fixed situations where the timestamper wanted to test more servers than
      were provided (thanks Tracker!)
    * Get rid of the dead SAM sessions by using the streaming lib's callbacks
      (thanks Tracker!)
This commit is contained in:
jrandom
2005-02-23 04:20:28 +00:00
committed by zzz
parent 8a21f0efec
commit 10ed058c2e
11 changed files with 111 additions and 200 deletions

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.146 $ $Date: 2005/02/22 02:07:33 $";
public final static String ID = "$Revision: 1.147 $ $Date: 2005/02/22 17:58:22 $";
public final static String VERSION = "0.5";
public final static long BUILD = 5;
public final static long BUILD = 6;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -8,13 +8,11 @@ import net.i2p.router.tunnel.TunnelCreatorConfig;
class ExpireJob extends JobImpl {
private TunnelPool _pool;
private TunnelCreatorConfig _cfg;
private Object _buildToken;
private boolean _leaseUpdated;
public ExpireJob(RouterContext ctx, TunnelCreatorConfig cfg, TunnelPool pool, Object buildToken) {
public ExpireJob(RouterContext ctx, TunnelCreatorConfig cfg, TunnelPool pool) {
super(ctx);
_pool = pool;
_cfg = cfg;
_buildToken = buildToken;
_leaseUpdated = false;
// give 'em some extra time before dropping 'em
getTiming().setStartAfter(cfg.getExpiration()); // + Router.CLOCK_FUDGE_FACTOR);

View File

@ -13,17 +13,13 @@ import net.i2p.util.Log;
class OnCreatedJob extends JobImpl {
private Log _log;
private TunnelPool _pool;
private Object _buildToken;
private PooledTunnelCreatorConfig _cfg;
private boolean _fake;
public OnCreatedJob(RouterContext ctx, TunnelPool pool, PooledTunnelCreatorConfig cfg, boolean fake, Object buildToken) {
public OnCreatedJob(RouterContext ctx, TunnelPool pool, PooledTunnelCreatorConfig cfg) {
super(ctx);
_log = ctx.logManager().getLog(OnCreatedJob.class);
_pool = pool;
_cfg = cfg;
_fake = fake;
_buildToken = buildToken;
}
public String getName() { return "Tunnel built"; }
public void runJob() {
@ -34,17 +30,16 @@ class OnCreatedJob extends JobImpl {
getContext().tunnelDispatcher().joinOutbound(_cfg);
}
_pool.addTunnel(_cfg);
TestJob testJob = (_cfg.getLength() > 1 ? new TestJob(getContext(), _cfg, _pool, _buildToken) : null);
RebuildJob rebuildJob = (_fake ? null : new RebuildJob(getContext(), _cfg, _pool, _buildToken));
ExpireJob expireJob = new ExpireJob(getContext(), _cfg, _pool, _buildToken);
TestJob testJob = (_cfg.getLength() > 1 ? new TestJob(getContext(), _cfg, _pool) : null);
RebuildJob rebuildJob = new RebuildJob(getContext(), _cfg, _pool);
ExpireJob expireJob = new ExpireJob(getContext(), _cfg, _pool);
_cfg.setTunnelPool(_pool);
_cfg.setTestJob(testJob);
_cfg.setRebuildJob(rebuildJob);
_cfg.setExpireJob(expireJob);
if (_cfg.getLength() > 1) // no need to test 0 hop tunnels
getContext().jobQueue().addJob(testJob);
if (!_fake) // if we built a 0 hop tunnel in response to a failure, don't rebuild
getContext().jobQueue().addJob(rebuildJob);
getContext().jobQueue().addJob(rebuildJob); // always try to rebuild (ignored if too many)
getContext().jobQueue().addJob(expireJob);
}
}

View File

@ -11,20 +11,18 @@ import net.i2p.router.tunnel.TunnelCreatorConfig;
*/
class RebuildJob extends JobImpl {
private TunnelPool _pool;
private Object _buildToken;
private TunnelCreatorConfig _cfg;
public RebuildJob(RouterContext ctx, TunnelCreatorConfig cfg, TunnelPool pool, Object buildToken) {
public RebuildJob(RouterContext ctx, TunnelCreatorConfig cfg, TunnelPool pool) {
super(ctx);
_pool = pool;
_cfg = cfg;
_buildToken = buildToken;
long rebuildOn = cfg.getExpiration() - pool.getSettings().getRebuildPeriod();
rebuildOn -= ctx.random().nextInt(pool.getSettings().getRebuildPeriod()*2);
getTiming().setStartAfter(rebuildOn);
}
public String getName() { return "Rebuild tunnel"; }
public void runJob() {
_pool.getBuilder().buildTunnel(getContext(), _pool, _buildToken);
_pool.getBuilder().buildTunnel(getContext(), _pool);
}
}

View File

@ -24,19 +24,17 @@ import net.i2p.util.Log;
class TestJob extends JobImpl {
private Log _log;
private TunnelPool _pool;
private Object _buildToken;
private PooledTunnelCreatorConfig _cfg;
private boolean _found;
/** base to randomize the test delay on */
private static final int TEST_DELAY = 60*1000;
public TestJob(RouterContext ctx, PooledTunnelCreatorConfig cfg, TunnelPool pool, Object buildToken) {
public TestJob(RouterContext ctx, PooledTunnelCreatorConfig cfg, TunnelPool pool) {
super(ctx);
_log = ctx.logManager().getLog(TestJob.class);
_pool = pool;
_cfg = cfg;
_buildToken = buildToken;
getTiming().setStartAfter(getDelay() + ctx.clock().now());
ctx.statManager().createRateStat("tunnel.testFailedTime", "How long did the failure take (max of 60s for full timeout)?", "Tunnels",
new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });

View File

@ -27,50 +27,42 @@ public class TunnelBuilder {
* jobs are built. This call does not block.
*
*/
public void buildTunnel(RouterContext ctx, TunnelPool pool, Object poolToken) {
buildTunnel(ctx, pool, false, poolToken);
public void buildTunnel(RouterContext ctx, TunnelPool pool) {
buildTunnel(ctx, pool, false);
}
public void buildTunnel(RouterContext ctx, TunnelPool pool, boolean fake, Object poolToken) {
if (!pool.keepBuilding(poolToken))
return;
public void buildTunnel(RouterContext ctx, TunnelPool pool, boolean zeroHop) {
// this is probably overkill (ya think?)
pool.refreshSettings();
PooledTunnelCreatorConfig cfg = configTunnel(ctx, pool, fake);
if ( (cfg == null) && (!fake) ) {
RetryJob j = new RetryJob(ctx, pool, poolToken);
PooledTunnelCreatorConfig cfg = configTunnel(ctx, pool, zeroHop);
if (cfg == null) {
RetryJob j = new RetryJob(ctx, pool);
j.getTiming().setStartAfter(ctx.clock().now() + ctx.random().nextInt(30*1000));
ctx.jobQueue().addJob(j);
return;
}
OnCreatedJob onCreated = new OnCreatedJob(ctx, pool, cfg, fake, poolToken);
RetryJob onFailed= (fake ? null : new RetryJob(ctx, pool, poolToken));
OnCreatedJob onCreated = new OnCreatedJob(ctx, pool, cfg);
RetryJob onFailed= (zeroHop ? null : new RetryJob(ctx, pool));
// queue up a job to request the endpoint to join the tunnel, which then
// requeues up another job for earlier hops, etc, until it reaches the
// gateway. after the gateway is confirmed, onCreated is fired
RequestTunnelJob req = new RequestTunnelJob(ctx, cfg, onCreated, onFailed, cfg.getLength()-1, fake, pool.getSettings().isExploratory());
if (fake) // lets get it done inline, as we /need/ it asap
RequestTunnelJob req = new RequestTunnelJob(ctx, cfg, onCreated, onFailed, cfg.getLength()-1, zeroHop, pool.getSettings().isExploratory());
if (zeroHop || (cfg.getLength() <= 1) ) // lets get it done inline, as we /need/ it asap
req.runJob();
else
ctx.jobQueue().addJob(req);
}
private PooledTunnelCreatorConfig configTunnel(RouterContext ctx, TunnelPool pool, boolean fake) {
private PooledTunnelCreatorConfig configTunnel(RouterContext ctx, TunnelPool pool, boolean zeroHop) {
Log log = ctx.logManager().getLog(TunnelBuilder.class);
TunnelPoolSettings settings = pool.getSettings();
long expiration = ctx.clock().now() + settings.getDuration();
List peers = null;
long failures = countFailures(ctx);
boolean failing = (failures > 5) && (pool.getSettings().getAllowZeroHop());
boolean failsafe = false;
if (failing && (ctx.random().nextInt(100) < failures) )
failsafe = true;
if (fake || failsafe) {
if (zeroHop) {
peers = new ArrayList(1);
peers.add(ctx.routerHash());
if ( (failsafe) && (log.shouldLog(Log.WARN)) )
if (log.shouldLog(Log.WARN))
log.warn("Building failsafe tunnel for " + pool);
} else {
peers = pool.getSelector().selectPeers(ctx, settings);
@ -80,10 +72,10 @@ public class TunnelBuilder {
// the pool is refusing 0 hop tunnels
if (peers == null) {
if (log.shouldLog(Log.ERROR))
log.error("No peers to put in the new tunnel! selectPeers returned null! boo, hiss! fake=" + fake);
log.error("No peers to put in the new tunnel! selectPeers returned null! boo, hiss! fake=" + zeroHop);
} else {
if (log.shouldLog(Log.ERROR))
log.error("No peers to put in the new tunnel! selectPeers returned an empty list?! fake=" + fake);
log.error("No peers to put in the new tunnel! selectPeers returned an empty list?! fake=" + zeroHop);
}
return null;
}
@ -108,36 +100,20 @@ public class TunnelBuilder {
return cfg;
}
private long countFailures(RouterContext ctx) {
RateStat rs = ctx.statManager().getRate("tunnel.testFailedTime");
if (rs == null)
return 0;
Rate r = rs.getRate(10*60*1000);
if (r == null)
return 0;
else
return r.getCurrentEventCount();
}
/**
* If the building fails, try, try again.
*
*/
private class RetryJob extends JobImpl {
private TunnelPool _pool;
private Object _buildToken;
public RetryJob(RouterContext ctx, TunnelPool pool, Object buildToken) {
public RetryJob(RouterContext ctx, TunnelPool pool) {
super(ctx);
_pool = pool;
_buildToken = buildToken;
}
public String getName() { return "tunnel create failed"; }
public String getName() { return "Tunnel create failed"; }
public void runJob() {
// yikes, nothing left, lets get some backup (if we're allowed)
if ( (_pool.selectTunnel() == null) && (_pool.getSettings().getAllowZeroHop()) )
_pool.buildFake();
buildTunnel(getContext(), _pool, _buildToken);
_pool.refreshBuilders();
}
}
}

View File

@ -30,14 +30,6 @@ public class TunnelPool {
private boolean _alive;
private long _lifetimeProcessed;
/**
* list of pool tokens (Object) passed around during building/rebuilding/etc.
* if/when the token is removed from this list, that sequence of building/rebuilding/etc
* should cease (though others may continue).
*
*/
private List _tokens;
public TunnelPool(RouterContext ctx, TunnelPoolManager mgr, TunnelPoolSettings settings, TunnelPeerSelector sel, TunnelBuilder builder) {
_context = ctx;
_log = ctx.logManager().getLog(TunnelPool.class);
@ -46,7 +38,6 @@ public class TunnelPool {
_tunnels = new ArrayList(settings.getLength() + settings.getBackupQuantity());
_peerSelector = sel;
_builder = builder;
_tokens = new ArrayList(settings.getBackupQuantity() + settings.getQuantity());
_alive = false;
_lifetimeProcessed = 0;
refreshSettings();
@ -70,102 +61,39 @@ public class TunnelPool {
}
public void shutdown() {
_alive = false;
synchronized (_tokens) { _tokens.clear(); }
}
private int countUsableTunnels() {
int valid = 0;
synchronized (_tunnels) {
for (int i = 0; i < _tunnels.size(); i++) {
TunnelInfo info = (TunnelInfo)_tunnels.get(i);
if (info.getExpiration() > _context.clock().now() + 3*_settings.getRebuildPeriod())
valid++;
}
}
return valid;
}
private int refreshBuilders() {
/**
* Fire up as many buildTunnel tasks as necessary, returning how many
* were added
*
*/
int refreshBuilders() {
if (!_alive) return 0;
// only start up new build tasks if we need more of 'em
int target = _settings.getQuantity() + _settings.getBackupQuantity();
int oldTokenCount = 0;
List newTokens = null;
synchronized (_tokens) {
oldTokenCount = _tokens.size();
while (_tokens.size() > target)
_tokens.remove(0);
if (_tokens.size() < target) {
int wanted = target - _tokens.size();
newTokens = new ArrayList(wanted);
for (int i = 0; i < wanted; i++) {
Object token = new Object();
newTokens.add(token);
_tokens.add(token);
}
}
}
int usableTunnels = countUsableTunnels();
if (newTokens != null) {
if (_log.shouldLog(Log.INFO))
_log.info(toString() + ": refreshing builders, previously had " + oldTokenCount
if ( (target > usableTunnels) && (_log.shouldLog(Log.INFO)) )
_log.info(toString() + ": refreshing builders, previously had " + usableTunnels
+ ", want a total of " + target + ", creating "
+ newTokens.size() + " new ones.");
for (int i = 0; i < newTokens.size(); i++) {
Object token = newTokens.get(i);
if (_log.shouldLog(Log.DEBUG))
_log.debug(toString() + ": Building a tunnel with the token " + token);
_builder.buildTunnel(_context, this, token);
}
return newTokens.size();
} else {
return 0;
}
}
/** do we still need this sequence of build/rebuild/etc to continue? */
public boolean keepBuilding(Object token) {
boolean connected = true;
boolean rv = false;
int remaining = 0;
int wanted = _settings.getQuantity() + _settings.getBackupQuantity();
if ( (_settings.getDestination() != null) && (!_context.clientManager().isLocal(_settings.getDestination())) )
connected = false;
synchronized (_tokens) {
if (!connected) {
// client disconnected, so stop rebuilding this series
_tokens.remove(token);
rv = false;
} else {
rv = _tokens.contains(token);
}
remaining = _tokens.size();
}
+ (target-usableTunnels) + " new ones.");
for (int i = usableTunnels; i < target; i++)
_builder.buildTunnel(_context, this);
if (remaining <= 0) {
_manager.removeTunnels(_settings.getDestination());
}
if (!rv) {
if (_log.shouldLog(Log.INFO))
_log.info(toString() + ": keepBuilding does NOT want building to continue (want "
+ wanted + ", have " + remaining);
} else {
boolean needed = true;
int valid = 0;
synchronized (_tunnels) {
if (_tunnels.size() > wanted) {
for (int i = 0; i < _tunnels.size(); i++) {
TunnelInfo info = (TunnelInfo)_tunnels.get(i);
if (info.getExpiration() > _context.clock().now() + 3*_settings.getRebuildPeriod()) {
valid++;
if (valid >= wanted*2)
break;
}
}
if (valid >= wanted*2)
needed = false;
}
}
if (!needed) {
if (_log.shouldLog(Log.WARN))
_log.warn(toString() + ": keepBuilding wants building to continue, but not "
+ " with the current object... # tunnels = " + valid + ", wanted = " + wanted);
synchronized (_tokens) {
_tokens.remove(token);
}
return false;
}
}
return rv;
return (target > usableTunnels ? target-usableTunnels : 0);
}
void refreshSettings() {
@ -210,7 +138,7 @@ public class TunnelPool {
}
if (_alive && _settings.getAllowZeroHop())
buildFake();
buildFallback();
if (allowRecurseOnFail)
return selectTunnel(false);
else
@ -244,6 +172,8 @@ public class TunnelPool {
}
}
int getTunnelCount() { synchronized (_tunnels) { return _tunnels.size(); } }
public TunnelBuilder getBuilder() { return _builder; }
public TunnelPoolSettings getSettings() { return _settings; }
public void setSettings(TunnelPoolSettings settings) {
@ -274,6 +204,8 @@ public class TunnelPool {
if (ls != null)
_context.clientManager().requestLeaseSet(_settings.getDestination(), ls);
refreshBuilders();
}
public void removeTunnel(TunnelInfo info) {
@ -298,9 +230,17 @@ public class TunnelPool {
_log.warn(toString() + ": unable to build a new leaseSet on removal (" + remaining
+ " remaining), request a new tunnel");
if (_settings.getAllowZeroHop())
buildFake();
buildFallback();
}
}
boolean connected = true;
if ( (_settings.getDestination() != null) && (!_context.clientManager().isLocal(_settings.getDestination())) )
connected = false;
if ( (getTunnelCount() <= 0) && (!connected) ) {
_manager.removeTunnels(_settings.getDestination());
return;
}
refreshBuilders();
}
@ -326,7 +266,7 @@ public class TunnelPool {
_log.warn(toString() + ": unable to build a new leaseSet on failure (" + remaining
+ " remaining), request a new tunnel");
if (remaining < _settings.getBackupQuantity() + _settings.getQuantity())
buildFake(false);
buildFallback();
}
}
refreshBuilders();
@ -350,43 +290,23 @@ public class TunnelPool {
+ " remaining), request a new tunnel");
if ( (remaining < _settings.getBackupQuantity() + _settings.getQuantity())
&& (_settings.getAllowZeroHop()) )
buildFake();
buildFallback();
}
}
}
void buildFake() { buildFake(true); }
void buildFake(boolean zeroHop) {
void buildFallback() {
int quantity = _settings.getBackupQuantity() + _settings.getQuantity();
boolean needed = true;
synchronized (_tunnels) {
if (_tunnels.size() > quantity) {
int valid = 0;
for (int i = 0; i < _tunnels.size(); i++) {
TunnelInfo info = (TunnelInfo)_tunnels.get(i);
if (info.getExpiration() > _context.clock().now() + 3*_settings.getRebuildPeriod()) {
valid++;
if (valid >= quantity)
break;
}
}
if (valid >= quantity)
needed = false;
}
}
if (!needed) return;
int usable = countUsableTunnels();
if (usable >= quantity) return;
if (_log.shouldLog(Log.INFO))
_log.info(toString() + ": building a fake tunnel (allow zeroHop? " + zeroHop + ")");
Object tempToken = new Object();
synchronized (_tokens) {
_tokens.add(tempToken);
}
_builder.buildTunnel(_context, this, zeroHop, tempToken);
synchronized (_tokens) {
_tokens.remove(tempToken);
}
_log.info(toString() + ": building a fallback tunnel (usable: " + usable + " needed: " + quantity + ")");
if ( (usable == 0) && (_settings.getAllowZeroHop()) )
_builder.buildTunnel(_context, this, true);
else
_builder.buildTunnel(_context, this);
refreshBuilders();
}
/**

View File

@ -26,12 +26,14 @@ import net.i2p.router.TunnelInfo;
import net.i2p.router.TunnelManagerFacade;
import net.i2p.router.TunnelPoolSettings;
import net.i2p.router.tunnel.HopConfig;
import net.i2p.util.Log;
/**
*
*/
public class TunnelPoolManager implements TunnelManagerFacade {
private RouterContext _context;
private Log _log;
/** Hash (destination) to TunnelPool */
private Map _clientInboundPools;
/** Hash (destination) to TunnelPool */
@ -41,6 +43,7 @@ public class TunnelPoolManager implements TunnelManagerFacade {
public TunnelPoolManager(RouterContext ctx) {
_context = ctx;
_log = ctx.logManager().getLog(TunnelPoolManager.class);
HandlerJobBuilder builder = new HandleTunnelCreateMessageJob.Builder(ctx);
ctx.inNetMessagePool().registerHandlerJobBuilder(TunnelCreateMessage.MESSAGE_TYPE, builder);
@ -63,7 +66,7 @@ public class TunnelPoolManager implements TunnelManagerFacade {
public TunnelInfo selectInboundTunnel() {
TunnelInfo info = _inboundExploratory.selectTunnel();
if (info == null) {
_inboundExploratory.buildFake();
_inboundExploratory.buildFallback();
// still can be null, but probably not
info = _inboundExploratory.selectTunnel();
}
@ -80,6 +83,9 @@ public class TunnelPoolManager implements TunnelManagerFacade {
if (pool != null) {
return pool.selectTunnel();
}
if (_log.shouldLog(Log.CRIT))
_log.log(Log.CRIT, "wtf, want the inbound tunnel for " + destination.calculateHash().toBase64() +
" but there isn't a pool?");
return null;
}
@ -87,7 +93,7 @@ public class TunnelPoolManager implements TunnelManagerFacade {
public TunnelInfo selectOutboundTunnel() {
TunnelInfo info = _outboundExploratory.selectTunnel();
if (info == null) {
_outboundExploratory.buildFake();
_outboundExploratory.buildFallback();
// still can be null, but probably not
info = _outboundExploratory.selectTunnel();
}
@ -224,6 +230,10 @@ public class TunnelPoolManager implements TunnelManagerFacade {
public void removeTunnels(Hash destination) {
if (destination == null) return;
if (_context.clientManager().isLocal(destination)) {
if (_log.shouldLog(Log.CRIT))
_log.log(Log.CRIT, "wtf, why are you removing the pool for " + destination.toBase64(), new Exception("i did it"));
}
TunnelPool inbound = null;
TunnelPool outbound = null;
synchronized (_clientInboundPools) {
@ -268,7 +278,7 @@ public class TunnelPoolManager implements TunnelManagerFacade {
}
public String getName() { return "Bootstrap tunnel pool"; }
public void runJob() {
_pool.buildFake(false);
_pool.buildFallback();
}
}