2006-01-22 jrandom

* New tunnel build process - does not use the new crypto or new peer
      selection strategies.  However, it does drop the fallback tunnel
      procedure, except for tunnels who are configured to allow them, or for
      the exploratory pool during bootstrapping or after a catastrophic
      failure.  This new process prefers to fail rather than use too-short
      tunnels, so while it can do some pretty aggressive tunnel rebuilding,
      it may expose more tunnel failures to the user.
    * Always prefer normal tunnels to fallback tunnels.
    * Potential fix for a bug while changing i2cp settings on I2PSnark (thanks
      bar!)
    * Do all of the netDb entry writing in a separate thread, avoiding
      duplicates and batching them up.
This commit is contained in:
jrandom
2006-01-23 00:51:54 +00:00
committed by zzz
parent cd235e5902
commit 13fe45b489
20 changed files with 696 additions and 557 deletions

View File

@ -125,7 +125,13 @@ public class ConnectionAcceptor implements Runnable
if (socketChanged) {
continue;
} else {
Snark.debug("Null socket accepted, but socket wasn't changed?", Snark.ERROR);
I2PServerSocket ss = I2PSnarkUtil.instance().getServerSocket();
if (ss != serverSocket) {
serverSocket = ss;
} else {
Snark.debug("Null socket accepted, but socket wasn't changed?", Snark.ERROR);
try { Thread.sleep(10*1000); } catch (InterruptedException ie) {}
}
}
} else {
Thread t = new I2PThread(new Handler(socket), "Connection-" + socket);

View File

@ -52,7 +52,8 @@ public class SnarkManager implements Snark.CompleteListener {
while (_messages.size() > MAX_MESSAGES)
_messages.remove(0);
}
_log.info("MSG: " + message);
if (_log.shouldLog(Log.INFO))
_log.info("MSG: " + message);
}
/** newest last */

View File

@ -1,4 +1,18 @@
$Id: history.txt,v 1.392 2006/01/18 01:37:53 cervantes Exp $
$Id: history.txt,v 1.393 2006/01/19 23:40:24 complication Exp $
2006-01-22 jrandom
* New tunnel build process - does not use the new crypto or new peer
selection strategies. However, it does drop the fallback tunnel
procedure, except for tunnels who are configured to allow them, or for
the exploratory pool during bootstrapping or after a catastrophic
failure. This new process prefers to fail rather than use too-short
tunnels, so while it can do some pretty aggressive tunnel rebuilding,
it may expose more tunnel failures to the user.
* Always prefer normal tunnels to fallback tunnels.
* Potential fix for a bug while changing i2cp settings on I2PSnark (thanks
bar!)
* Do all of the netDb entry writing in a separate thread, avoiding
duplicates and batching them up.
2006-01-19 Complication
* Explain better where eepsite's destkey can be found

View File

@ -364,7 +364,12 @@ public class Router {
public static final char CAPABILITY_UNREACHABLE = 'U';
public static final String PROP_FORCE_UNREACHABLE = "router.forceUnreachable";
public static final char CAPABILITY_NEW_TUNNEL = 'T';
public void addReachabilityCapability(RouterInfo ri) {
// routers who can understand TunnelBuildMessages
////ri.addCapability(CAPABILITY_NEW_TUNNEL);
String forceUnreachable = _context.getProperty(PROP_FORCE_UNREACHABLE);
if ( (forceUnreachable != null) && ("true".equalsIgnoreCase(forceUnreachable)) ) {
ri.addCapability(CAPABILITY_UNREACHABLE);

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.336 $ $Date: 2006/01/18 01:38:49 $";
public final static String ID = "$Revision: 1.337 $ $Date: 2006/01/19 23:40:25 $";
public final static String VERSION = "0.6.1.9";
public final static long BUILD = 5;
public final static long BUILD = 6;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -74,7 +74,7 @@ public class ClientManagerFacadeImpl extends ClientManagerFacade {
startup();
}
private static final long MAX_TIME_TO_REBUILD = 5*60*1000;
private static final long MAX_TIME_TO_REBUILD = 10*60*1000;
public boolean verifyClientLiveliness() {
if (_manager == null) return true;
boolean lively = true;

View File

@ -14,6 +14,7 @@ import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.*;
import net.i2p.data.DataFormatException;
import net.i2p.data.DataStructure;
@ -22,6 +23,7 @@ import net.i2p.data.LeaseSet;
import net.i2p.data.RouterInfo;
import net.i2p.router.JobImpl;
import net.i2p.router.RouterContext;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
/**
@ -33,6 +35,7 @@ class PersistentDataStore extends TransientDataStore {
private Log _log;
private String _dbDir;
private KademliaNetworkDatabaseFacade _facade;
private Writer _writer;
private final static int READ_DELAY = 60*1000;
@ -42,6 +45,12 @@ class PersistentDataStore extends TransientDataStore {
_dbDir = dbDir;
_facade = facade;
_context.jobQueue().addJob(new ReadJob());
ctx.statManager().createRateStat("netDb.writeClobber", "How often we clobber a pending netDb write", "Network Database", new long[] { 60*1000, 10*60*1000 });
ctx.statManager().createRateStat("netDb.writePending", "How many pending writes are there", "Network Database", new long[] { 60*1000, 10*60*1000 });
_writer = new Writer();
I2PThread writer = new I2PThread(_writer, "DBWriter");
writer.setDaemon(true);
writer.start();
}
public void restart() {
@ -56,7 +65,7 @@ class PersistentDataStore extends TransientDataStore {
public void put(Hash key, DataStructure data) {
if ( (data == null) || (key == null) ) return;
super.put(key, data);
_context.jobQueue().addJob(new WriteJob(key, data));
_writer.queue(key, data);
}
public int countLeaseSets() {
@ -103,61 +112,99 @@ class PersistentDataStore extends TransientDataStore {
}
}
private class WriteJob extends JobImpl {
private Hash _key;
private DataStructure _data;
public WriteJob(Hash key, DataStructure data) {
super(PersistentDataStore.this._context);
_key = key;
_data = data;
private class Writer implements Runnable {
private Map _keys;
private List _keyOrder;
public Writer() {
_keys = new HashMap(64);
_keyOrder = new ArrayList(64);
}
public String getName() { return "DB Writer Job"; }
public void runJob() {
_log.info("Writing key " + _key);
FileOutputStream fos = null;
File dbFile = null;
try {
String filename = null;
File dbDir = getDbDir();
if (_data instanceof LeaseSet)
filename = getLeaseSetName(_key);
else if (_data instanceof RouterInfo)
filename = getRouterInfoName(_key);
else
throw new IOException("We don't know how to write objects of type " + _data.getClass().getName());
dbFile = new File(dbDir, filename);
long dataPublishDate = getPublishDate();
if (dbFile.lastModified() < dataPublishDate) {
// our filesystem is out of date, lets replace it
fos = new FileOutputStream(dbFile);
try {
_data.writeBytes(fos);
fos.close();
dbFile.setLastModified(dataPublishDate);
} catch (DataFormatException dfe) {
_log.error("Error writing out malformed object as " + _key + ": "
+ _data, dfe);
dbFile.delete();
public void queue(Hash key, DataStructure data) {
boolean exists = false;
int pending = 0;
synchronized (_keys) {
pending = _keys.size();
exists = (null != _keys.put(key, data));
if (!exists)
_keyOrder.add(key);
_keys.notifyAll();
}
if (exists)
_context.statManager().addRateData("netDb.writeClobber", pending, 0);
_context.statManager().addRateData("netDb.writePending", pending, 0);
}
public void run() {
Hash key = null;
DataStructure data = null;
while (true) { // hmm, probably want a shutdown handle... though this is a daemon thread
try {
synchronized (_keys) {
if (_keyOrder.size() <= 0) {
_keys.wait();
} else {
key = (Hash)_keyOrder.remove(0);
data = (DataStructure)_keys.remove(key);
}
}
} else {
// we've already written the file, no need to waste our time
}
} catch (IOException ioe) {
_log.error("Error writing out the object", ioe);
} finally {
if (fos != null) try { fos.close(); } catch (IOException ioe) {}
} catch (InterruptedException ie) {}
if ( (key != null) && (data != null) )
write(key, data);
key = null;
data = null;
try { Thread.sleep(10*1000); } catch (InterruptedException ie) {}
}
}
private long getPublishDate() {
if (_data instanceof RouterInfo) {
return ((RouterInfo)_data).getPublished();
} else if (_data instanceof LeaseSet) {
return ((LeaseSet)_data).getEarliestLeaseDate();
}
private void write(Hash key, DataStructure data) {
_log.info("Writing key " + key);
FileOutputStream fos = null;
File dbFile = null;
try {
String filename = null;
File dbDir = getDbDir();
if (data instanceof LeaseSet)
filename = getLeaseSetName(key);
else if (data instanceof RouterInfo)
filename = getRouterInfoName(key);
else
throw new IOException("We don't know how to write objects of type " + data.getClass().getName());
dbFile = new File(dbDir, filename);
long dataPublishDate = getPublishDate(data);
if (dbFile.lastModified() < dataPublishDate) {
// our filesystem is out of date, lets replace it
fos = new FileOutputStream(dbFile);
try {
data.writeBytes(fos);
fos.close();
dbFile.setLastModified(dataPublishDate);
} catch (DataFormatException dfe) {
_log.error("Error writing out malformed object as " + key + ": "
+ data, dfe);
dbFile.delete();
}
} else {
return -1;
// we've already written the file, no need to waste our time
if (_log.shouldLog(Log.DEBUG))
_log.debug("Not writing " + key.toBase64() + ", as its up to date on disk (file mod-publish=" +
(dbFile.lastModified()-dataPublishDate) + ")");
}
} catch (IOException ioe) {
_log.error("Error writing out the object", ioe);
} finally {
if (fos != null) try { fos.close(); } catch (IOException ioe) {}
}
}
private long getPublishDate(DataStructure data) {
if (data instanceof RouterInfo) {
return ((RouterInfo)data).getPublished();
} else if (data instanceof LeaseSet) {
return ((LeaseSet)data).getEarliestLeaseDate();
} else {
return -1;
}
}

View File

@ -30,6 +30,7 @@ public class IsFailingCalculator extends Calculator {
public boolean calcBoolean(PeerProfile profile) {
// have we failed in the last 119 seconds?
/*
if ( (profile.getCommError().getRate(60*1000).getCurrentEventCount() > 0) ||
(profile.getCommError().getRate(60*1000).getLastEventCount() > 0) ||
(profile.getCommError().getRate(10*60*1000).getCurrentEventCount() > 0) ) {
@ -38,7 +39,7 @@ public class IsFailingCalculator extends Calculator {
+ " is failing because it had a comm error recently ");
return true;
} else {
*/
//if ( (profile.getDBHistory().getFailedLookupRate().getRate(60*1000).getCurrentEventCount() > 0) ||
// (profile.getDBHistory().getFailedLookupRate().getRate(60*1000).getLastEventCount() > 0) ) {
// // are they overloaded (or disconnected)?
@ -76,6 +77,6 @@ public class IsFailingCalculator extends Calculator {
return true;
return false;
}
//}
}
}

View File

@ -0,0 +1,251 @@
package net.i2p.router.tunnel.pool;
import java.util.*;
import net.i2p.router.Job;
import net.i2p.router.JobImpl;
import net.i2p.router.RouterContext;
import net.i2p.router.tunnel.TunnelCreatorConfig;
import net.i2p.util.Log;
/**
* Single threaded controller of the tunnel creation process, spanning all tunnel pools.
* Essentially, this loops across the pools, sees which want to build tunnels, and fires
* off the necessary activities if the load allows. If nothing wants to build any tunnels,
* it waits for a short period before looping again (or until it is told that something
* changed, such as a tunnel failed, new client started up, or tunnel creation was aborted).
*
*/
class BuildExecutor implements Runnable {
private RouterContext _context;
private Log _log;
private TunnelPoolManager _manager;
/** list of TunnelCreatorConfig elements of tunnels currently being built */
private List _currentlyBuilding;
private boolean _isRunning;
public BuildExecutor(RouterContext ctx, TunnelPoolManager mgr) {
_context = ctx;
_log = ctx.logManager().getLog(getClass());
_manager = mgr;
_currentlyBuilding = new ArrayList(10);
_context.statManager().createRateStat("tunnel.concurrentBuilds", "How many builds are going at once", "Tunnels", new long[] { 60*1000, 5*60*1000, 60*60*1000 });
_context.statManager().createRateStat("tunnel.concurrentBuildsLagged", "How many builds are going at once when we reject further builds, due to job lag (period is lag)", "Tunnels", new long[] { 60*1000, 5*60*1000, 60*60*1000 });
}
private int allowed() {
StringBuffer buf = null;
if (_log.shouldLog(Log.DEBUG)) {
buf = new StringBuffer(128);
buf.append("Allowed: ");
}
int allowed = 20;
String prop = _context.getProperty("router.tunnelConcurrentBuilds");
if (prop != null)
try { allowed = Integer.valueOf(prop).intValue(); } catch (NumberFormatException nfe) {}
int concurrent = 0;
synchronized (_currentlyBuilding) {
concurrent = _currentlyBuilding.size();
allowed -= concurrent;
if (buf != null)
buf.append(allowed).append(" ").append(_currentlyBuilding.toString());
}
if (buf != null)
_log.debug(buf.toString());
_context.statManager().addRateData("tunnel.concurrentBuilds", concurrent, 0);
long lag = _context.jobQueue().getMaxLag();
if ( (lag > 2000) && (_context.router().getUptime() > 5*60*1000) ) {
if (_log.shouldLog(Log.WARN))
_log.warn("Too lagged [" + lag + "], don't allow building");
_context.statManager().addRateData("tunnel.concurrentBuildsLagged", concurrent, lag);
return 0; // if we have a job heavily blocking our jobqueue, ssllloowww dddooowwwnnn
}
//if (isOverloaded())
// return 0;
return allowed;
}
public void run() {
_isRunning = true;
List wanted = new ArrayList(8);
List pools = new ArrayList(8);
while (!_manager.isShutdown()) {
try {
_manager.listPools(pools);
for (int i = 0; i < pools.size(); i++) {
TunnelPool pool = (TunnelPool)pools.get(i);
int howMany = pool.countHowManyToBuild();
for (int j = 0; j < howMany; j++)
wanted.add(pool);
}
int allowed = allowed();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allowed: " + allowed + " wanted: " + wanted);
// zero hop ones can run inline
allowed = buildZeroHopTunnels(wanted, allowed);
if ( (allowed > 0) && (wanted.size() > 0) ) {
Collections.shuffle(wanted, _context.random());
for (int i = 0; (i < allowed) && (wanted.size() > 0); i++) {
TunnelPool pool = (TunnelPool)wanted.remove(0);
//if (pool.countWantedTunnels() <= 0)
// continue;
PooledTunnelCreatorConfig cfg = pool.configureNewTunnel();
if (cfg != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Configuring new tunnel " + i + " for " + pool + ": " + cfg);
synchronized (_currentlyBuilding) {
_currentlyBuilding.add(cfg);
}
buildTunnel(pool, cfg);
if (cfg.getLength() <= 1)
i--; //0hop, we can keep going, as there's no worry about throttling
} else {
i--;
}
}
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Nothin' doin, wait for a while");
try {
synchronized (_currentlyBuilding) {
if (allowed <= 0)
_currentlyBuilding.wait(_context.random().nextInt(5*1000));
else // wanted <= 0
_currentlyBuilding.wait(_context.random().nextInt(30*1000));
}
} catch (InterruptedException ie) {
// someone wanted to build something
}
}
wanted.clear();
pools.clear();
} catch (Exception e) {
if (_log.shouldLog(Log.CRIT))
_log.log(Log.CRIT, "B0rked in the tunnel builder", e);
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Done building");
_isRunning = false;
}
/**
* iterate over the 0hop tunnels, running them all inline regardless of how many are allowed
* @return number of tunnels allowed after processing these zero hop tunnels (almost always the same as before)
*/
private int buildZeroHopTunnels(List wanted, int allowed) {
for (int i = 0; i < wanted.size(); i++) {
TunnelPool pool = (TunnelPool)wanted.get(0);
if (pool.getSettings().getLength() == 0) {
PooledTunnelCreatorConfig cfg = pool.configureNewTunnel();
if (cfg != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Configuring short tunnel " + i + " for " + pool + ": " + cfg);
synchronized (_currentlyBuilding) {
_currentlyBuilding.add(cfg);
}
buildTunnel(pool, cfg);
if (cfg.getLength() > 1) {
allowed--; // oops... shouldn't have done that, but hey, its not that bad...
}
wanted.remove(i);
i--;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Configured a null tunnel");
}
}
}
return allowed;
}
public boolean isRunning() { return _isRunning; }
void buildTunnel(TunnelPool pool, PooledTunnelCreatorConfig cfg) {
// old style here, replace with the new crypto stuff later
CompleteJob onCreated = new CompleteJob(_context, cfg, new SuccessJob(_context, cfg, pool), pool);
CompleteJob onFailed = new CompleteJob(_context, cfg, null, pool);
RequestTunnelJob j = new RequestTunnelJob(_context, cfg, onCreated, onFailed, cfg.getLength()-1, false, cfg.getDestination()==null);
if (cfg.getLength() <= 1) // length == 1 ==> hops = 0, so do it inline (as its immediate)
j.runJob();
else
j.runJob(); // always inline, as this is on its own thread so it can block
//_context.jobQueue().addJob(j);
}
public void buildComplete(PooledTunnelCreatorConfig cfg, TunnelPool pool) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Build complete for " + cfg);
pool.buildComplete(cfg);
synchronized (_currentlyBuilding) {
_currentlyBuilding.remove(cfg);
_currentlyBuilding.notifyAll();
}
}
public void repoll() {
synchronized (_currentlyBuilding) { _currentlyBuilding.notifyAll(); }
}
private class CompleteJob extends JobImpl {
private PooledTunnelCreatorConfig _cfg;
private TunnelPool _pool;
private Job _onRun;
public CompleteJob(RouterContext ctx, PooledTunnelCreatorConfig cfg, Job onRun, TunnelPool pool) {
super(ctx);
_cfg = cfg;
_onRun = onRun;
_pool = pool;
}
public String getName() { return "Tunnel create complete"; }
public void runJob() {
if (_onRun != null)
_onRun.runJob();
//getContext().jobQueue().addJob(_onRun);
buildComplete(_cfg, _pool);
}
}
private class SuccessJob extends JobImpl {
private PooledTunnelCreatorConfig _cfg;
private TunnelPool _pool;
public SuccessJob(RouterContext ctx, PooledTunnelCreatorConfig cfg, TunnelPool pool) {
super(ctx);
_cfg = cfg;
_pool = pool;
}
public String getName() { return "Tunnel create successful"; }
public void runJob() {
_log.debug("Created successfully: " + _cfg);
if (_cfg.isInbound()) {
getContext().tunnelDispatcher().joinInbound(_cfg);
} else {
getContext().tunnelDispatcher().joinOutbound(_cfg);
}
_pool.addTunnel(_cfg);
_pool.getManager().buildComplete(_cfg);
TestJob testJob = (_cfg.getLength() > 1 ? new TestJob(getContext(), _cfg, _pool) : null);
//RebuildJob rebuildJob = new RebuildJob(getContext(), _cfg, _pool);
ExpireJob expireJob = new ExpireJob(getContext(), _cfg, _pool);
_cfg.setTunnelPool(_pool);
_cfg.setTestJob(testJob);
//_cfg.setRebuildJob(rebuildJob);
_cfg.setExpireJob(expireJob);
if (_cfg.getLength() > 1) // no need to test 0 hop tunnels
getContext().jobQueue().addJob(testJob);
//getContext().jobQueue().addJob(rebuildJob); // always try to rebuild (ignored if too many)
getContext().jobQueue().addJob(expireJob);
}
}
}

View File

@ -14,6 +14,8 @@ class ClientPeerSelector extends TunnelPeerSelector {
int length = getLength(ctx, settings);
if (length < 0)
return null;
if ( (length == 0) && (settings.getLength()+settings.getLengthVariance() > 0) )
return null;
HashSet matches = new HashSet(length);
if (length > 0) {

View File

@ -30,7 +30,11 @@ class ExploratoryPeerSelector extends TunnelPeerSelector {
Set exclude = getExclude(ctx, settings.isInbound(), settings.isExploratory());
exclude.add(ctx.routerHash());
HashSet matches = new HashSet(length);
ctx.profileOrganizer().selectNotFailingPeers(length, exclude, matches, false);
boolean exploreHighCap = Boolean.valueOf(ctx.getProperty("router.exploreHighCapacity", "false")).booleanValue();
if (exploreHighCap)
ctx.profileOrganizer().selectHighCapacityPeers(length, exclude, matches);
else
ctx.profileOrganizer().selectNotFailingPeers(length, exclude, matches, false);
if (l.shouldLog(Log.DEBUG))
l.debug("profileOrganizer.selectNotFailing(" + length + ") found " + matches);

View File

@ -128,11 +128,16 @@ public class HandleTunnelCreateMessageJob extends JobImpl {
if (_log.shouldLog(Log.INFO))
_log.info("join as inbound tunnel gateway pointing at "
+ _request.getNextRouter().toBase64().substring(0,4) + ":"
+ _request.getNextTunnelId().getTunnelId()
+ _request.getNextTunnelId()
+ " (nonce=" + _request.getNonce() + ")");
// serve as the inbound tunnel gateway
cfg.setSendTo(_request.getNextRouter());
cfg.setSendTunnelId(DataHelper.toLong(4, _request.getNextTunnelId().getTunnelId()));
TunnelId id = _request.getNextTunnelId();
if (id == null) {
sendRejection(TunnelHistory.TUNNEL_REJECT_CRIT);
return;
}
cfg.setSendTunnelId(DataHelper.toLong(4, id.getTunnelId()));
getContext().tunnelDispatcher().joinInboundGateway(cfg);
} else if (_request.getNextRouter() == null) {
if (_log.shouldLog(Log.INFO))
@ -143,11 +148,16 @@ public class HandleTunnelCreateMessageJob extends JobImpl {
if (_log.shouldLog(Log.INFO))
_log.info("join as tunnel participant pointing at "
+ _request.getNextRouter().toBase64().substring(0,4) + ":"
+ _request.getNextTunnelId().getTunnelId()
+ _request.getNextTunnelId()
+ " (nonce=" + _request.getNonce() + ")");
// serve as a general participant
cfg.setSendTo(_request.getNextRouter());
cfg.setSendTunnelId(DataHelper.toLong(4, _request.getNextTunnelId().getTunnelId()));
TunnelId id = _request.getNextTunnelId();
if (id == null) {
sendRejection(TunnelHistory.TUNNEL_REJECT_CRIT);
return;
}
cfg.setSendTunnelId(DataHelper.toLong(4, id.getTunnelId()));
getContext().tunnelDispatcher().joinParticipant(cfg);
}

View File

@ -1,46 +0,0 @@
package net.i2p.router.tunnel.pool;
import net.i2p.router.JobImpl;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
/**
* The tunnel is fully built, so now add it to our handler, to the pool, and
* build the necessary test and rebuilding jobs.
*
*/
class OnCreatedJob extends JobImpl {
private Log _log;
private TunnelPool _pool;
private PooledTunnelCreatorConfig _cfg;
public OnCreatedJob(RouterContext ctx, TunnelPool pool, PooledTunnelCreatorConfig cfg) {
super(ctx);
_log = ctx.logManager().getLog(OnCreatedJob.class);
_pool = pool;
_cfg = cfg;
}
public String getName() { return "Tunnel built"; }
public void runJob() {
_log.debug("Created successfully: " + _cfg);
if (_cfg.isInbound()) {
getContext().tunnelDispatcher().joinInbound(_cfg);
} else {
getContext().tunnelDispatcher().joinOutbound(_cfg);
}
_pool.getManager().buildComplete(_cfg);
_pool.addTunnel(_cfg);
TestJob testJob = (_cfg.getLength() > 1 ? new TestJob(getContext(), _cfg, _pool) : null);
RebuildJob rebuildJob = new RebuildJob(getContext(), _cfg, _pool);
ExpireJob expireJob = new ExpireJob(getContext(), _cfg, _pool);
_cfg.setTunnelPool(_pool);
_cfg.setTestJob(testJob);
_cfg.setRebuildJob(rebuildJob);
_cfg.setExpireJob(expireJob);
if (_cfg.getLength() > 1) // no need to test 0 hop tunnels
getContext().jobQueue().addJob(testJob);
getContext().jobQueue().addJob(rebuildJob); // always try to rebuild (ignored if too many)
getContext().jobQueue().addJob(expireJob);
}
}

View File

@ -13,7 +13,6 @@ public class PooledTunnelCreatorConfig extends TunnelCreatorConfig {
private TunnelPool _pool;
private boolean _failed;
private TestJob _testJob;
private RebuildJob _rebuildJob;
private Job _expireJob;
/** Creates a new instance of PooledTunnelCreatorConfig */
@ -48,11 +47,6 @@ public class PooledTunnelCreatorConfig extends TunnelCreatorConfig {
// selected again. _expireJob is left to do its thing, in case there
// are any straggling messages coming down the tunnel
_pool.tunnelFailed(this);
if (_rebuildJob != null) {
// rebuild asap (_rebuildJob will be null if we were just a stopgap)
_rebuildJob.getTiming().setStartAfter(_context.clock().now() + 10*1000);
_context.jobQueue().addJob(_rebuildJob);
}
if (_testJob != null) // just in case...
_context.jobQueue().removeJob(_testJob);
}
@ -61,6 +55,5 @@ public class PooledTunnelCreatorConfig extends TunnelCreatorConfig {
public TunnelPool getTunnelPool() { return _pool; }
public void setTestJob(TestJob job) { _testJob = job; }
public void setRebuildJob(RebuildJob job) { _rebuildJob = job; }
public void setExpireJob(Job job) { _expireJob = job; }
}

View File

@ -1,61 +0,0 @@
package net.i2p.router.tunnel.pool;
import net.i2p.router.JobImpl;
import net.i2p.router.RouterContext;
import net.i2p.router.tunnel.TunnelCreatorConfig;
/**
* Build a new tunnel to replace the existing one before it expires. This job
* should be removed (or scheduled to run immediately) if the tunnel fails.
* If an exploratory tunnel build at a random time between 3 1/2 and 4 minutes early;
* else if only one tunnel in pool build 4 minutes early;
* otherwise build at a random time between 2 and 4 minutes early.
* Five build attempts in parallel if an exploratory tunnel.
*/
class RebuildJob extends JobImpl {
private TunnelPool _pool;
private TunnelCreatorConfig _cfg;
public RebuildJob(RouterContext ctx, TunnelCreatorConfig cfg, TunnelPool pool) {
super(ctx);
_pool = pool;
_cfg = cfg;
long rebuildOn;
if (_pool.getSettings().isExploratory()) {
rebuildOn = cfg.getExpiration() - (((pool.getSettings().getRebuildPeriod() * 7) / 2));
rebuildOn -= ctx.random().nextInt(pool.getSettings().getRebuildPeriod() / 2);
} else if ((pool.getSettings().getQuantity() + pool.getSettings().getBackupQuantity()) == 1) {
rebuildOn = cfg.getExpiration() - (pool.getSettings().getRebuildPeriod() * 4);
} else {
rebuildOn = cfg.getExpiration() - (pool.getSettings().getRebuildPeriod() * 2);
rebuildOn -= ctx.random().nextInt(pool.getSettings().getRebuildPeriod() * 2);
}
getTiming().setStartAfter(rebuildOn);
}
public String getName() {
if (_pool.getSettings().isExploratory()) {
if (_pool.getSettings().isInbound()) {
return "Rebuild exploratory inbound tunnel";
} else {
return "Rebuild exploratory outbound tunnel";
}
} else {
StringBuffer rv = new StringBuffer(32);
if (_pool.getSettings().isInbound())
rv.append("Rebuild inbound client tunnel for ");
else
rv.append("Rebuild outbound client tunnel for ");
if (_pool.getSettings().getDestinationNickname() != null)
rv.append(_pool.getSettings().getDestinationNickname());
else
rv.append(_pool.getSettings().getDestination().toBase64().substring(0,4));
return rv.toString();
}
}
public void runJob() {
if (_pool.getSettings().isExploratory())
_pool.refreshBuilders(4, 4);
else
_pool.refreshBuilders(1, 4);
}
}

View File

@ -45,8 +45,9 @@ public class RequestTunnelJob extends JobImpl {
private boolean _isFake;
private boolean _isExploratory;
static final int HOP_REQUEST_TIMEOUT = 20*1000;
private static final int LOOKUP_TIMEOUT = 10*1000;
static final int HOP_REQUEST_TIMEOUT_CLIENT = 15*1000;
static final int HOP_REQUEST_TIMEOUT_EXPLORATORY = 10*1000;
private static final int LOOKUP_TIMEOUT = 5*1000;
public RequestTunnelJob(RouterContext ctx, TunnelCreatorConfig cfg, Job onCreated, Job onFailed, int hop, boolean isFake, boolean isExploratory) {
super(ctx);
@ -58,7 +59,7 @@ public class RequestTunnelJob extends JobImpl {
_currentPeer = null;
_lookups = 0;
_lastSendTime = 0;
_isFake = isFake;
_isFake = isFake || (cfg.getLength() <= 1);
_isExploratory = isExploratory;
ctx.statManager().createRateStat("tunnel.receiveRejectionProbabalistic", "How often we are rejected probabalistically?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
@ -77,6 +78,8 @@ public class RequestTunnelJob extends JobImpl {
ctx.statManager().createRateStat("tunnel.buildExploratorySuccess3Hop", "How often we succeed building a 3 hop exploratory tunnel?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("tunnel.buildPartialTime", "How long a non-exploratory request took to be accepted?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("tunnel.buildExploratoryPartialTime", "How long an exploratory request took to be accepted?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("tunnel.buildExploratoryTimeout", "How often a request for an exploratory peer times out?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("tunnel.buildClientTimeout", "How often a request for an exploratory peer times out?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
if (_log.shouldLog(Log.DEBUG))
_log.debug("Requesting hop " + hop + " in " + cfg);
@ -136,7 +139,7 @@ public class RequestTunnelJob extends JobImpl {
else
getContext().jobQueue().addJob(_onCreated);
}
getContext().statManager().addRateData("tunnel.buildSuccess", 1, 0);
//getContext().statManager().addRateData("tunnel.buildSuccess", 1, 0);
}
} else {
// outbound tunnel, we're the gateway and hence the last person asked
@ -176,7 +179,8 @@ public class RequestTunnelJob extends JobImpl {
_onCreated.runJob();
else
getContext().jobQueue().addJob(_onCreated);
getContext().statManager().addRateData("tunnel.buildSuccess", 1, 0);
if (_config.getLength() > 1)
getContext().statManager().addRateData("tunnel.buildSuccess", 1, 0);
}
}
}
@ -301,15 +305,18 @@ public class RequestTunnelJob extends JobImpl {
getContext().jobQueue().addJob(_onCreated);
if (_isExploratory) {
int i = _config.getLength();
getContext().statManager().addRateData("tunnel.buildExploratorySuccess", 1, 0);
if (i > 1)
getContext().statManager().addRateData("tunnel.buildExploratorySuccess", 1, 0);
if (i == 2)
getContext().statManager().addRateData("tunnel.buildExploratorySuccess1Hop", 1, 0);
else if (i == 3)
getContext().statManager().addRateData("tunnel.buildExploratorySuccess2Hop", 1, 0);
else if (i == 4)
getContext().statManager().addRateData("tunnel.buildExploratorySuccess3Hop", 1, 0);
} else
getContext().statManager().addRateData("tunnel.buildSuccess", 1, 0);
} else {
if (_config.getLength() > 1)
getContext().statManager().addRateData("tunnel.buildSuccess", 1, 0);
}
}
}
@ -357,6 +364,10 @@ public class RequestTunnelJob extends JobImpl {
if (_log.shouldLog(Log.WARN))
_log.warn("request timeout: " + _config + " at hop " + _currentHop
+ " with nonce " + _nonce);
if (_isExploratory)
getContext().statManager().addRateData("tunnel.buildExploratoryTimeout", 1, 0);
else
getContext().statManager().addRateData("tunnel.buildClientTimeout", 1, 0);
peerFail(0);
}
}
@ -369,7 +380,7 @@ public class RequestTunnelJob extends JobImpl {
public ReplySelector(long nonce) {
_nonce = nonce;
_nonceFound = false;
_expiration = getContext().clock().now() + HOP_REQUEST_TIMEOUT;
_expiration = getContext().clock().now() + (_isExploratory ? HOP_REQUEST_TIMEOUT_EXPLORATORY : HOP_REQUEST_TIMEOUT_CLIENT);
}
public boolean continueMatching() {
return (!_nonceFound) && (getContext().clock().now() < _expiration);

View File

@ -35,8 +35,6 @@ class SendGarlicMessageJob extends JobImpl {
private SessionKey _sentKey;
private Set _sentTags;
private static final int TIMEOUT = RequestTunnelJob.HOP_REQUEST_TIMEOUT;
public SendGarlicMessageJob(RouterContext ctx, I2NPMessage payload, RouterInfo target, MessageSelector selector, ReplyJob onReply, Job onTimeout, SessionKey sentKey, Set sentTags) {
super(ctx);
_log = ctx.logManager().getLog(SendGarlicMessageJob.class);
@ -61,14 +59,15 @@ class SendGarlicMessageJob extends JobImpl {
payload.setRecipient(_target);
payload.setDeliveryInstructions(instructions);
payload.setRequestAck(false);
payload.setExpiration(getContext().clock().now() + RequestTunnelJob.HOP_REQUEST_TIMEOUT);
payload.setExpiration(_payload.getMessageExpiration());
int timeout = (int)(payload.getExpiration() - getContext().clock().now());
GarlicMessage msg = GarlicMessageBuilder.buildMessage(getContext(), payload, _sentKey, _sentTags);
// so we will look for the reply
OutNetMessage dummyMessage = getContext().messageRegistry().registerPending(_replySelector, _onReply, _onTimeout, TIMEOUT);
OutNetMessage dummyMessage = getContext().messageRegistry().registerPending(_replySelector, _onReply, _onTimeout, timeout);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Scheduling timeout job (" + _onTimeout + ") to be run in " + TIMEOUT + "ms");
_log.debug("Scheduling timeout job (" + _onTimeout + ") to be run in " + timeout + "ms");
// now find an outbound tunnel and send 'er off
TunnelInfo out = getContext().tunnelManager().selectOutboundTunnel();

View File

@ -1,116 +0,0 @@
package net.i2p.router.tunnel.pool;
import java.util.ArrayList;
import java.util.List;
import net.i2p.data.Hash;
import net.i2p.router.RouterContext;
import net.i2p.router.JobImpl;
import net.i2p.router.tunnel.HopConfig;
import net.i2p.router.TunnelPoolSettings;
import net.i2p.util.Log;
/**
*
*/
public class TunnelBuilder {
/**
* Build a new tunnel per the pool's wishes (using the preferred length,
* peers, ordering, etc). After the tunnel is built, it is added to the
* pool as well as the dispatcher, and the necessary test and maintenance
* jobs are built. This call does not block.
*
*/
public void buildTunnel(RouterContext ctx, TunnelPool pool) {
buildTunnel(ctx, pool, false);
}
public void buildTunnel(RouterContext ctx, TunnelPool pool, boolean zeroHop) {
if (!pool.isAlive()) {
pool.getManager().buildComplete();
return;
}
// this is probably overkill (ya think?)
pool.refreshSettings();
PooledTunnelCreatorConfig cfg = configTunnel(ctx, pool, zeroHop);
if (cfg == null) {
pool.getManager().buildComplete();
return;
}
OnCreatedJob onCreated = new OnCreatedJob(ctx, pool, cfg);
RetryJob onFailed= new RetryJob(ctx, pool);
// queue up a job to request the endpoint to join the tunnel, which then
// requeues up another job for earlier hops, etc, until it reaches the
// gateway. after the gateway is confirmed, onCreated is fired
RequestTunnelJob req = new RequestTunnelJob(ctx, cfg, onCreated, onFailed, cfg.getLength()-1, zeroHop, pool.getSettings().isExploratory());
if (zeroHop || (cfg.getLength() <= 1) ) // lets get it done inline, as we /need/ it asap
req.runJob();
else
ctx.jobQueue().addJob(req);
}
private PooledTunnelCreatorConfig configTunnel(RouterContext ctx, TunnelPool pool, boolean zeroHop) {
Log log = ctx.logManager().getLog(TunnelBuilder.class);
TunnelPoolSettings settings = pool.getSettings();
long expiration = ctx.clock().now() + settings.getDuration();
List peers = null;
if (zeroHop) {
peers = new ArrayList(1);
peers.add(ctx.routerHash());
if (log.shouldLog(Log.WARN))
log.warn("Building failsafe tunnel for " + pool);
} else {
peers = pool.getSelector().selectPeers(ctx, settings);
}
if ( (peers == null) || (peers.size() <= 0) ) {
// no inbound or outbound tunnels to send the request through, and
// the pool is refusing 0 hop tunnels
if (peers == null) {
if (log.shouldLog(Log.ERROR))
log.error("No peers to put in the new tunnel! selectPeers returned null! boo, hiss! fake=" + zeroHop);
} else {
if (log.shouldLog(Log.ERROR))
log.error("No peers to put in the new tunnel! selectPeers returned an empty list?! fake=" + zeroHop);
}
return null;
}
PooledTunnelCreatorConfig cfg = new PooledTunnelCreatorConfig(ctx, peers.size(), settings.isInbound(), settings.getDestination());
// peers[] is ordered endpoint first, but cfg.getPeer() is ordered gateway first
for (int i = 0; i < peers.size(); i++) {
int j = peers.size() - 1 - i;
cfg.setPeer(j, (Hash)peers.get(i));
HopConfig hop = cfg.getConfig(j);
hop.setExpiration(expiration);
hop.setIVKey(ctx.keyGenerator().generateSessionKey());
hop.setLayerKey(ctx.keyGenerator().generateSessionKey());
// tunnelIds will be updated during building, and as the creator, we
// don't need to worry about prev/next hop
}
cfg.setExpiration(expiration);
Log l = ctx.logManager().getLog(TunnelBuilder.class);
if (l.shouldLog(Log.DEBUG))
l.debug("Config contains " + peers + ": " + cfg);
return cfg;
}
/**
* If the building fails, try, try again.
*
*/
private class RetryJob extends JobImpl {
private TunnelPool _pool;
public RetryJob(RouterContext ctx, TunnelPool pool) {
super(ctx);
_pool = pool;
}
public String getName() { return "Tunnel create failed"; }
public void runJob() {
// yikes, nothing left, lets get some backup (if we're allowed)
_pool.getManager().buildComplete();
_pool.refreshBuilders(1, 4);
}
}
}

View File

@ -15,6 +15,7 @@ import net.i2p.router.JobImpl;
import net.i2p.router.RouterContext;
import net.i2p.router.TunnelPoolSettings;
import net.i2p.router.TunnelInfo;
import net.i2p.router.tunnel.HopConfig;
import net.i2p.util.Log;
/**
@ -26,164 +27,50 @@ public class TunnelPool {
private TunnelPoolSettings _settings;
private ArrayList _tunnels;
private TunnelPeerSelector _peerSelector;
private TunnelBuilder _builder;
private TunnelPoolManager _manager;
private boolean _alive;
private long _lifetimeProcessed;
private int _buildsThisMinute;
private long _currentMinute;
private RefreshJob _refreshJob;
private TunnelInfo _lastSelected;
private long _lastSelectionPeriod;
private int _expireSkew;
/**
* Only 10 builds per minute per pool, even if we have failing tunnels,
* etc. On overflow, the necessary additional tunnels are built by the
* RefreshJob
*/
private static final int MAX_BUILDS_PER_MINUTE = 10;
public TunnelPool(RouterContext ctx, TunnelPoolManager mgr, TunnelPoolSettings settings, TunnelPeerSelector sel, TunnelBuilder builder) {
public TunnelPool(RouterContext ctx, TunnelPoolManager mgr, TunnelPoolSettings settings, TunnelPeerSelector sel) {
_context = ctx;
_log = ctx.logManager().getLog(TunnelPool.class);
_manager = mgr;
_settings = settings;
_tunnels = new ArrayList(settings.getLength() + settings.getBackupQuantity());
_peerSelector = sel;
_builder = builder;
_alive = false;
_lastSelectionPeriod = 0;
_lastSelected = null;
_lifetimeProcessed = 0;
_buildsThisMinute = 0;
_currentMinute = ctx.clock().now();
_refreshJob = new RefreshJob(ctx);
_expireSkew = _context.random().nextInt(90*1000);
refreshSettings();
}
public void startup() {
_alive = true;
_refreshJob.getTiming().setStartAfter(_context.clock().now() + 60*1000);
_context.jobQueue().addJob(_refreshJob);
int added = refreshBuilders(0, 0);
if (added <= 0) {
_manager.getExecutor().repoll();
if (_settings.isInbound() && (_settings.getDestination() != null) ) {
// we just reconnected and didn't require any new tunnel builders.
// however, we /do/ want a leaseSet, so build one
LeaseSet ls = null;
synchronized (_tunnels) {
if (_settings.isInbound() && (_settings.getDestination() != null) )
ls = locked_buildNewLeaseSet();
ls = locked_buildNewLeaseSet();
}
if (ls != null)
_context.clientManager().requestLeaseSet(_settings.getDestination(), ls);
}
}
public void shutdown() {
_alive = false;
_lastSelectionPeriod = 0;
_lastSelected = null;
}
/**
* Return number of tunnels expiring greater than
* timeFactor * RebuildPeriod from now
*
*/
private int countUsableTunnels(int timeFactor) {
int valid = 0;
synchronized (_tunnels) {
for (int i = 0; i < _tunnels.size(); i++) {
TunnelInfo info = (TunnelInfo)_tunnels.get(i);
if (info.getExpiration() > _context.clock().now() + (timeFactor * _settings.getRebuildPeriod()))
valid++;
}
}
return valid;
}
/**
* Fire up as many buildTunnel tasks as necessary, returning how many
* were added.
* Build maxBuild tunnels (0 = unlimited), use timeFactor * RebuildPeriod.
* Fire off up to six extra jobs if an exploratory tunnel is
* requested by RebuildJob or tunnelFailed (maxBuild > 1).
* Throttle builds to a maximum per minute; reduce maximum if job lag is high,
* or if we have network errors which indicate we are disconnected from the network.
* Override pool length setting and build a 1-hop tunnel if time is short.
*
*/
int refreshBuilders(int maxBuild, int timeFactor) {
if ( (_settings.getDestination() != null) && (!_context.clientManager().isLocal(_settings.getDestination())) )
_alive = false;
if (!_alive) return 0;
// only start up new build tasks if we need more of 'em
int baseTarget = _settings.getQuantity() + _settings.getBackupQuantity();
int target = baseTarget;
int usableTunnels = countUsableTunnels(timeFactor);
if (_settings.isExploratory() && target > 0 && maxBuild > 1)
target+= 6;
if ( (target > usableTunnels) )
if ( (target > usableTunnels) && (_log.shouldLog(Log.INFO)) )
_log.info(toString() + ": refreshing builders, previously had " + usableTunnels
+ ", want a total of " + target + ", creating "
+ (target-usableTunnels) + " new ones (" + maxBuild + " max).");
if (target > usableTunnels) {
long minute = _context.clock().now();
minute = minute - (minute % 60*1000);
if (_currentMinute < minute) {
_currentMinute = minute;
_buildsThisMinute = 0;
}
int build = (target - usableTunnels);
if (maxBuild > 0 && build > maxBuild)
build = maxBuild;
int buildThrottle = MAX_BUILDS_PER_MINUTE;
long lag = _context.jobQueue().getMaxLag();
int netErrors = (int) _context.statManager().getRate("udp.sendException").getRate(60*1000).getLastEventCount();
if (lag > 3 * 1000 || netErrors > 5) {
if (_log.shouldLog(Log.WARN))
_log.warn("Throttling tunnel builds lag = " + lag + "; net errors = " + netErrors);
if (_settings.isExploratory())
buildThrottle = 3;
else
buildThrottle = 1;
} else if (lag > 1 * 1000) {
if (_settings.isExploratory())
buildThrottle = 5;
else
buildThrottle = 2;
}
if (build > (buildThrottle - _buildsThisMinute))
build = (buildThrottle - _buildsThisMinute);
if (build <= 0) return 0;
if ((_settings.isExploratory() && baseTarget > countUsableTunnels(1)) ||
((!_settings.isExploratory()) && baseTarget > countUsableTunnels(0)) ||
((!_settings.isExploratory()) && countUsableTunnels(1) == 0))
_settings.setLengthOverride(1);
else
_settings.setLengthOverride(0);
int wanted = build;
build = _manager.allocateBuilds(build);
if ( (wanted != build) && (_log.shouldLog(Log.ERROR)) )
_log.error("Wanted to build " + wanted + " tunnels, but throttled down to "
+ build + ", due to concurrent requests (cpu overload?)");
for (int i = 0; i < build; i++)
_builder.buildTunnel(_context, this);
_buildsThisMinute += build;
return build;
} else {
return 0;
}
}
TunnelPoolManager getManager() { return _manager; }
void refreshSettings() {
@ -222,6 +109,8 @@ public class TunnelPool {
*/
public TunnelInfo selectTunnel() { return selectTunnel(true); }
private TunnelInfo selectTunnel(boolean allowRecurseOnFail) {
boolean avoidZeroHop = ((getSettings().getLength() + getSettings().getLengthVariance()) > 0);
long period = curPeriod();
synchronized (_tunnels) {
if (_lastSelectionPeriod == period) {
@ -237,8 +126,21 @@ public class TunnelPool {
if (_log.shouldLog(Log.WARN))
_log.warn(toString() + ": No tunnels to select from");
} else {
// pick 'em randomly
Collections.shuffle(_tunnels, _context.random());
// if there are nonzero hop tunnels and the zero hop tunnels are fallbacks,
// avoid the zero hop tunnels
if (avoidZeroHop) {
for (int i = 0; i < _tunnels.size(); i++) {
TunnelInfo info = (TunnelInfo)_tunnels.get(i);
if ( (info.getLength() > 1) && (info.getExpiration() > _context.clock().now()) ) {
_lastSelected = info;
return info;
}
}
}
// ok, either we are ok using zero hop tunnels, or only fallback tunnels remain. pick 'em
// randomly
for (int i = 0; i < _tunnels.size(); i++) {
TunnelInfo info = (TunnelInfo)_tunnels.get(i);
if (info.getExpiration() > _context.clock().now()) {
@ -287,16 +189,18 @@ public class TunnelPool {
}
}
/** list of tunnelInfo instances of tunnels currently being built */
public List listPending() { synchronized (_inProgress) { return new ArrayList(_inProgress); } }
int getTunnelCount() { synchronized (_tunnels) { return _tunnels.size(); } }
public TunnelBuilder getBuilder() { return _builder; }
public TunnelPoolSettings getSettings() { return _settings; }
public void setSettings(TunnelPoolSettings settings) {
_settings = settings;
if (_settings != null) {
if (_log.shouldLog(Log.INFO))
_log.info(toString() + ": Settings updated on the pool: " + settings);
refreshBuilders(1, 4); // to start/stop new sequences, in case the quantities changed
_manager.getExecutor().repoll(); // in case we need more
}
}
public TunnelPeerSelector getSelector() { return _peerSelector; }
@ -309,7 +213,7 @@ public class TunnelPool {
public void addTunnel(TunnelInfo info) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(toString() + ": Adding tunnel " + info);
_log.debug(toString() + ": Adding tunnel " + info, new Exception("Creator"));
LeaseSet ls = null;
synchronized (_tunnels) {
_tunnels.add(info);
@ -356,6 +260,8 @@ public class TunnelPool {
}
}
_manager.getExecutor().repoll();
boolean connected = true;
if ( (_settings.getDestination() != null) && (!_context.clientManager().isLocal(_settings.getDestination())) )
connected = false;
@ -381,31 +287,14 @@ public class TunnelPool {
}
}
_manager.tunnelFailed();
_lifetimeProcessed += cfg.getProcessedMessagesCount();
if (_settings.isInbound() && (_settings.getDestination() != null) ) {
if (ls != null) {
_context.clientManager().requestLeaseSet(_settings.getDestination(), ls);
if (_settings.isExploratory())
refreshBuilders(3, 4);
else
refreshBuilders(1, 4);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn(toString() + ": unable to build a new leaseSet on failure (" + remaining
+ " remaining), request a new tunnel");
if (remaining < _settings.getBackupQuantity() + _settings.getQuantity())
if (!buildFallback())
if (_settings.isExploratory())
refreshBuilders(3, 4);
else
refreshBuilders(1, 4);
}
} else {
if (_settings.isExploratory())
refreshBuilders(3, 4);
else
refreshBuilders(1, 4);
}
}
@ -421,13 +310,6 @@ public class TunnelPool {
}
if (ls != null) {
_context.clientManager().requestLeaseSet(_settings.getDestination(), ls);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn(toString() + ": unable to build a new leaseSet on expire (" + remaining
+ " remaining), request a new tunnel");
if ( (remaining < _settings.getBackupQuantity() + _settings.getQuantity())
&& (_settings.getAllowZeroHop()) )
buildFallback();
}
}
}
@ -438,17 +320,28 @@ public class TunnelPool {
*/
boolean buildFallback() {
int quantity = _settings.getBackupQuantity() + _settings.getQuantity();
int usable = countUsableTunnels(1);
if (usable >= quantity) return false;
int usable = 0;
synchronized (_tunnels) {
usable = _tunnels.size();
}
if (usable > 0)
return false;
if ( (usable == 0) && (_settings.getAllowZeroHop()) ) {
if (_settings.getAllowZeroHop()) {
if ( (_settings.getLength() + _settings.getLengthVariance() > 0) &&
(_settings.getDestination() != null) &&
(_context.profileOrganizer().countActivePeers() > 0) ) {
// if it is a client tunnel pool and our variance doesn't allow 0 hop, prefer failure to
// 0 hop operation (unless our router is offline)
return false;
}
if (_log.shouldLog(Log.INFO))
_log.info(toString() + ": building a fallback tunnel (usable: " + usable + " needed: " + quantity + ")");
_builder.buildTunnel(_context, this, true);
// runs inline, since its 0hop
_manager.getExecutor().buildTunnel(this, configureNewTunnel(true));
return true;
}
//else
// _builder.buildTunnel(_context, this);
return false;
}
@ -507,6 +400,163 @@ public class TunnelPool {
public long getLifetimeProcessed() { return _lifetimeProcessed; }
/**
* Gather the data to see how many tunnels to build, and then actually compute that value (delegated to
* the countHowManyToBuild function below)
*
*/
public int countHowManyToBuild() {
int wanted = getSettings().getBackupQuantity() + getSettings().getQuantity();
boolean allowZeroHop = ((getSettings().getLength() + getSettings().getLengthVariance()) <= 0);
long expireAfter = _context.clock().now() + (2 * _settings.getRebuildPeriod());
expireAfter += _expireSkew;
long earliestExpire = -1;
int live = 0;
int fallback = 0;
int usable = 0;
synchronized (_tunnels) {
boolean enough = _tunnels.size() > wanted;
for (int i = 0; i < _tunnels.size(); i++) {
TunnelInfo info = (TunnelInfo)_tunnels.get(i);
if (info.getExpiration() > expireAfter) {
if (allowZeroHop || (info.getLength() > 1)) {
usable++;
if ( (info.getExpiration() < earliestExpire) || (earliestExpire < 0) )
earliestExpire = info.getExpiration();
}
}
live++;
if ( (info.getLength() <= 1) && (info.getExpiration() > expireAfter) )
fallback++;
}
}
if (usable < wanted) {
// if we are short on tunnels, build fast
earliestExpire = 0;
}
int inProgress = 0;
synchronized (_inProgress) {
inProgress = _inProgress.size();
for (int i = 0; i < _inProgress.size(); i++) {
PooledTunnelCreatorConfig cfg = (PooledTunnelCreatorConfig)_inProgress.get(i);
if (cfg.getLength() <= 1)
fallback++;
}
}
return countHowManyToBuild(allowZeroHop, earliestExpire, usable, wanted, inProgress, fallback);
}
/**
* This is the big scary function determining how many new tunnels we want to try to build at this
* point in time, as used by the BuildExecutor
*
* @param allowZeroHop do we normally allow zero hop tunnels? If true, treat fallback tunnels like normal ones
* @param earliestExpire how soon do some of our usable tunnels expire, or, if we are missing tunnels, -1
* @param usable how many tunnels will be around for a while (may include fallback tunnels)
* @param standardAmount how many tunnels we want to have, in general
* @param inProgress how many tunnels are being built for this pool right now (may include fallback tunnels)
* @param fallback how many zero hop tunnels do we have, or are being built
*/
private int countHowManyToBuild(boolean allowZeroHop, long earliestExpire, int usable, int standardAmount,
int inProgress, int fallback) {
int howMany = 0;
if (allowZeroHop)
howMany = standardAmount - usable;
else
howMany = standardAmount - (usable - fallback);
int concurrentBuildWeight = 1;
if (howMany > 0) {
long now = _context.clock().now();
if (earliestExpire - now < 60*1000)
concurrentBuildWeight = 4; // right before expiration, allow up to 4x quantity tunnels to be pending
else if (earliestExpire - now < 120*1000)
concurrentBuildWeight = 3; // allow up to 3x quantity tunnels to be pending from 1-2m
else if (earliestExpire - now < 180*1000)
concurrentBuildWeight = 2; // allow up to 2x quantity tunnels to be pending from 2-3m
// e.g. we want 3 tunnels, but only have 1 usable, we'd want 2 more. however, if the tunnels
// expire in 90 seconds, we'd act like we wanted 6 (and assume 4 would fail building).
howMany = (howMany * concurrentBuildWeight) - inProgress;
}
int rv = howMany;
// ok, we're actually swamped with tunnels, so lets hold off on replacing the
// fallback ones for a bit
if ( (usable + inProgress + fallback > 2*standardAmount) && (howMany > 0) )
rv = 0;
if (allowZeroHop && (rv > standardAmount))
rv = standardAmount;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Count: rv: " + rv + " howMany " + howMany + " concurrentWeight " + concurrentBuildWeight
+ " allow? " + allowZeroHop + " usable " + usable
+ " std " + standardAmount + " inProgress " + inProgress + " fallback " + fallback
+ " for " + toString());
if (rv < 0)
return 0;
return rv;
}
PooledTunnelCreatorConfig configureNewTunnel() { return configureNewTunnel(false); }
private PooledTunnelCreatorConfig configureNewTunnel(boolean forceZeroHop) {
TunnelPoolSettings settings = getSettings();
List peers = null;
long expiration = _context.clock().now() + settings.getDuration();
if (!forceZeroHop) {
peers = _peerSelector.selectPeers(_context, settings);
if ( (peers == null) || (peers.size() <= 0) ) {
// no inbound or outbound tunnels to send the request through, and
// the pool is refusing 0 hop tunnels
if (peers == null) {
if (_log.shouldLog(Log.ERROR))
_log.error("No peers to put in the new tunnel! selectPeers returned null! boo, hiss!");
} else {
if (_log.shouldLog(Log.ERROR))
_log.error("No peers to put in the new tunnel! selectPeers returned an empty list?!");
}
return null;
}
} else {
peers = new ArrayList(1);
peers.add(_context.routerHash());
}
PooledTunnelCreatorConfig cfg = new PooledTunnelCreatorConfig(_context, peers.size(), settings.isInbound(), settings.getDestination());
// peers[] is ordered endpoint first, but cfg.getPeer() is ordered gateway first
for (int i = 0; i < peers.size(); i++) {
int j = peers.size() - 1 - i;
cfg.setPeer(j, (Hash)peers.get(i));
HopConfig hop = cfg.getConfig(j);
hop.setExpiration(expiration);
hop.setIVKey(_context.keyGenerator().generateSessionKey());
hop.setLayerKey(_context.keyGenerator().generateSessionKey());
// tunnelIds will be updated during building, and as the creator, we
// don't need to worry about prev/next hop
}
cfg.setExpiration(expiration);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Config contains " + peers + ": " + cfg);
synchronized (_inProgress) {
_inProgress.add(cfg);
}
return cfg;
}
private List _inProgress = new ArrayList();
void buildComplete(PooledTunnelCreatorConfig cfg) {
synchronized (_inProgress) { _inProgress.remove(cfg); }
}
public String toString() {
if (_settings.isExploratory()) {
if (_settings.isInbound())
@ -527,27 +577,4 @@ public class TunnelPool {
}
}
/**
* We choke the # of rebuilds per pool per minute, so we need this to
* make sure to build enough tunnels.
*
*/
private class RefreshJob extends JobImpl {
public RefreshJob(RouterContext ctx) {
super(ctx);
}
public String getName() { return "Refresh " + TunnelPool.this.toString(); }
public void runJob() {
if (!_alive) return;
int added;
if (_settings.isExploratory())
added = refreshBuilders(0, 2);
else
added = refreshBuilders(0, 1);
if ( (added > 0) && (_log.shouldLog(Log.WARN)) )
_log.warn(added + " additional parallel rebuild(s) for " + TunnelPool.this.toString());
requeue(30*1000);
}
}
}

View File

@ -24,6 +24,7 @@ import net.i2p.router.TunnelManagerFacade;
import net.i2p.router.TunnelPoolSettings;
import net.i2p.router.tunnel.HopConfig;
import net.i2p.router.tunnel.TunnelCreatorConfig;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
/**
@ -38,17 +39,9 @@ public class TunnelPoolManager implements TunnelManagerFacade {
private Map _clientOutboundPools;
private TunnelPool _inboundExploratory;
private TunnelPool _outboundExploratory;
/** how many build requests are in process */
private int _outstandingBuilds;
/** max # of concurrent build requests */
private int _maxOutstandingBuilds;
private LoadTestManager _loadTestManager;
private static final String PROP_MAX_OUTSTANDING_BUILDS = "router.tunnel.maxConcurrentBuilds";
private static final int DEFAULT_MAX_OUTSTANDING_BUILDS = 20;
private static final String PROP_THROTTLE_CONCURRENT_TUNNELS = "router.tunnel.shouldThrottle";
private static final boolean DEFAULT_THROTTLE_CONCURRENT_TUNNELS = false;
private BuildExecutor _executor;
private boolean _isShutdown;
public TunnelPoolManager(RouterContext ctx) {
_context = ctx;
@ -62,19 +55,14 @@ public class TunnelPoolManager implements TunnelManagerFacade {
_clientInboundPools = new HashMap(4);
_clientOutboundPools = new HashMap(4);
_outstandingBuilds = 0;
_maxOutstandingBuilds = DEFAULT_MAX_OUTSTANDING_BUILDS;
String max = ctx.getProperty(PROP_MAX_OUTSTANDING_BUILDS, String.valueOf(DEFAULT_MAX_OUTSTANDING_BUILDS));
if (max != null) {
try {
_maxOutstandingBuilds = Integer.parseInt(max);
} catch (NumberFormatException nfe) {
_maxOutstandingBuilds = DEFAULT_MAX_OUTSTANDING_BUILDS;
}
}
_loadTestManager = new LoadTestManager(_context);
_isShutdown = false;
_executor = new BuildExecutor(ctx, this);
I2PThread execThread = new I2PThread(_executor, "BuildExecutor");
execThread.setDaemon(true);
execThread.start();
ctx.statManager().createRateStat("tunnel.testSuccessTime",
"How long do successful tunnel tests take?", "Tunnels",
new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
@ -254,6 +242,8 @@ public class TunnelPoolManager implements TunnelManagerFacade {
}
public void buildTunnels(Destination client, ClientTunnelSettings settings) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Building tunnels for the client " + client.calculateHash().toBase64() + ": " + settings);
Hash dest = client.calculateHash();
settings.getInboundSettings().setDestination(dest);
settings.getOutboundSettings().setDestination(dest);
@ -264,7 +254,7 @@ public class TunnelPoolManager implements TunnelManagerFacade {
inbound = (TunnelPool)_clientInboundPools.get(dest);
if (inbound == null) {
inbound = new TunnelPool(_context, this, settings.getInboundSettings(),
new ClientPeerSelector(), new TunnelBuilder());
new ClientPeerSelector());
_clientInboundPools.put(dest, inbound);
} else {
inbound.setSettings(settings.getInboundSettings());
@ -274,7 +264,7 @@ public class TunnelPoolManager implements TunnelManagerFacade {
outbound = (TunnelPool)_clientOutboundPools.get(dest);
if (outbound == null) {
outbound = new TunnelPool(_context, this, settings.getOutboundSettings(),
new ClientPeerSelector(), new TunnelBuilder());
new ClientPeerSelector());
_clientOutboundPools.put(dest, outbound);
} else {
outbound.setSettings(settings.getOutboundSettings());
@ -306,70 +296,34 @@ public class TunnelPoolManager implements TunnelManagerFacade {
outbound.shutdown();
}
/**
* Check to make sure we can build this many new tunnels (throttled so
* we don't build too many at a time across all pools).
*
* @param wanted how many tunnels the pool wants to build
* @return how many are allowed to be built
*/
int allocateBuilds(int wanted) {
boolean shouldThrottle = shouldThrottleTunnels();
if (!shouldThrottle) return wanted;
synchronized (this) {
if (_outstandingBuilds >= _maxOutstandingBuilds) {
// ok, as a failsafe, always let one through
// nah, its failsafe for a reason. fix the cause.
//_outstandingBuilds++;
//return 1;
return 0;
}
if (_outstandingBuilds + wanted < _maxOutstandingBuilds) {
_outstandingBuilds += wanted;
return wanted;
} else {
int allowed = _maxOutstandingBuilds - _outstandingBuilds;
_outstandingBuilds = _maxOutstandingBuilds;
return allowed;
}
}
}
private boolean shouldThrottleTunnels() {
Boolean rv = Boolean.valueOf(_context.getProperty(PROP_THROTTLE_CONCURRENT_TUNNELS, ""+DEFAULT_THROTTLE_CONCURRENT_TUNNELS));
return rv.booleanValue();
}
void buildComplete(TunnelCreatorConfig cfg) {
buildComplete();
_loadTestManager.addTunnelTestCandidate(cfg);
}
void buildComplete() {
synchronized (this) {
if (_outstandingBuilds > 0)
_outstandingBuilds--;
}
}
void buildComplete() {}
private static final String PROP_LOAD_TEST = "router.loadTest";
public void startup() {
TunnelBuilder builder = new TunnelBuilder();
_isShutdown = false;
if (!_executor.isRunning()) {
I2PThread t = new I2PThread(_executor, "BuildExecutor");
t.setDaemon(true);
t.start();
}
ExploratoryPeerSelector selector = new ExploratoryPeerSelector();
TunnelPoolSettings inboundSettings = new TunnelPoolSettings();
inboundSettings.setIsExploratory(true);
inboundSettings.setIsInbound(true);
_inboundExploratory = new TunnelPool(_context, this, inboundSettings, selector, builder);
_inboundExploratory = new TunnelPool(_context, this, inboundSettings, selector);
_inboundExploratory.startup();
try { Thread.sleep(3*1000); } catch (InterruptedException ie) {}
TunnelPoolSettings outboundSettings = new TunnelPoolSettings();
outboundSettings.setIsExploratory(true);
outboundSettings.setIsInbound(false);
_outboundExploratory = new TunnelPool(_context, this, outboundSettings, selector, builder);
_outboundExploratory = new TunnelPool(_context, this, outboundSettings, selector);
_outboundExploratory.startup();
// try to build up longer tunnels
@ -399,8 +353,26 @@ public class TunnelPoolManager implements TunnelManagerFacade {
_inboundExploratory.shutdown();
if (_outboundExploratory != null)
_outboundExploratory.shutdown();
_isShutdown = true;
}
/** list of TunnelPool instances currently in play */
void listPools(List out) {
synchronized (_clientInboundPools) {
out.addAll(_clientInboundPools.values());
}
synchronized (_clientOutboundPools) {
out.addAll(_clientOutboundPools.values());
}
if (_inboundExploratory != null)
out.add(_inboundExploratory);
if (_outboundExploratory != null)
out.add(_outboundExploratory);
}
void tunnelFailed() { _executor.repoll(); }
BuildExecutor getExecutor() { return _executor; }
boolean isShutdown() { return _isShutdown; }
public void renderStatusHTML(Writer out) throws IOException {
out.write("<h2><a name=\"exploratory\">Exploratory tunnels</a> (<a href=\"/configtunnels.jsp#exploratory\">config</a>):</h2>\n");
renderPool(out, _inboundExploratory, _outboundExploratory);
@ -439,8 +411,13 @@ public class TunnelPoolManager implements TunnelManagerFacade {
RateStat rs = _context.statManager().getRate("tunnel.participatingMessageCount");
if (rs != null)
processed = (long)rs.getRate(10*60*1000).getLifetimeTotalValue();
int inactive = 0;
for (int i = 0; i < participating.size(); i++) {
HopConfig cfg = (HopConfig)participating.get(i);
if (cfg.getProcessedMessagesCount() <= 0) {
inactive++;
continue;
}
out.write("<tr>");
if (cfg.getReceiveTunnel() != null)
out.write("<td>" + cfg.getReceiveTunnel().getTunnelId() +"</td>");
@ -468,6 +445,7 @@ public class TunnelPoolManager implements TunnelManagerFacade {
processed += cfg.getProcessedMessagesCount();
}
out.write("</table>\n");
out.write("Inactive participating tunnels: " + inactive + "<br />\n");
out.write("Lifetime bandwidth usage: " + processed + "KB<br />\n");
}
@ -512,9 +490,22 @@ public class TunnelPoolManager implements TunnelManagerFacade {
else
processedOut += info.getProcessedMessagesCount();
}
if (live <= 0)
out.write("<tr><td colspan=\"3\">No tunnels, waiting for the grace period to end</td></tr>\n");
out.write("</table>\n");
List pending = in.listPending();
for (int i = 0; i < pending.size(); i++) {
TunnelInfo info = (TunnelInfo)pending.get(i);
out.write("In progress: <code>" + info.toString() + "</code><br />\n");
}
live += pending.size();
pending = outPool.listPending();
for (int i = 0; i < pending.size(); i++) {
TunnelInfo info = (TunnelInfo)pending.get(i);
out.write("In progress: <code>" + info.toString() + "</code><br />\n");
}
live += pending.size();
if (live <= 0)
out.write("<b>No tunnels, waiting for the grace period to end</b><br />\n");
out.write("Lifetime bandwidth usage: " + processedIn + "KB in, " + processedOut + "KB out<br />");
}
}