propagate from branch 'i2p.i2p.zzz.test' (head f19c9c4ae55d6ae82d6c028a06c0fae886da2527)
to branch 'i2p.i2p' (head 78d8ece1514216315644bbef224c62e1e9fbe370)
This commit is contained in:
@ -65,7 +65,6 @@ public class Router {
|
||||
private I2PThread.OOMEventListener _oomListener;
|
||||
private ShutdownHook _shutdownHook;
|
||||
private I2PThread _gracefulShutdownDetector;
|
||||
private Set _shutdownTasks;
|
||||
|
||||
public final static String PROP_CONFIG_FILE = "router.configLocation";
|
||||
|
||||
@ -171,7 +170,6 @@ public class Router {
|
||||
watchdog.setDaemon(true);
|
||||
watchdog.start();
|
||||
|
||||
_shutdownTasks = new HashSet(0);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -446,13 +444,14 @@ public class Router {
|
||||
*/
|
||||
private static final String _rebuildFiles[] = new String[] { "router.info",
|
||||
"router.keys",
|
||||
"netDb/my.info",
|
||||
"connectionTag.keys",
|
||||
"netDb/my.info", // no longer used
|
||||
"connectionTag.keys", // never used?
|
||||
"keyBackup/privateEncryption.key",
|
||||
"keyBackup/privateSigning.key",
|
||||
"keyBackup/publicEncryption.key",
|
||||
"keyBackup/publicSigning.key",
|
||||
"sessionKeys.dat" };
|
||||
"sessionKeys.dat" // no longer used
|
||||
};
|
||||
|
||||
static final String IDENTLOG = "identlog.txt";
|
||||
public static void killKeys() {
|
||||
@ -490,13 +489,12 @@ public class Router {
|
||||
*/
|
||||
public void rebuildNewIdentity() {
|
||||
killKeys();
|
||||
try {
|
||||
for (Iterator iter = _shutdownTasks.iterator(); iter.hasNext(); ) {
|
||||
Runnable task = (Runnable)iter.next();
|
||||
for (Runnable task : _context.getShutdownTasks()) {
|
||||
try {
|
||||
task.run();
|
||||
} catch (Throwable t) {
|
||||
_log.log(Log.CRIT, "Error running shutdown task", t);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
_log.log(Log.CRIT, "Error running shutdown task", t);
|
||||
}
|
||||
// hard and ugly
|
||||
finalShutdown(EXIT_HARD_RESTART);
|
||||
@ -781,12 +779,6 @@ public class Router {
|
||||
buf.setLength(0);
|
||||
}
|
||||
|
||||
public void addShutdownTask(Runnable task) {
|
||||
synchronized (_shutdownTasks) {
|
||||
_shutdownTasks.add(task);
|
||||
}
|
||||
}
|
||||
|
||||
public static final int EXIT_GRACEFUL = 2;
|
||||
public static final int EXIT_HARD = 3;
|
||||
public static final int EXIT_OOM = 10;
|
||||
@ -799,13 +791,12 @@ public class Router {
|
||||
I2PThread.removeOOMEventListener(_oomListener);
|
||||
// Run the shutdown hooks first in case they want to send some goodbye messages
|
||||
// Maybe we need a delay after this too?
|
||||
try {
|
||||
for (Iterator iter = _shutdownTasks.iterator(); iter.hasNext(); ) {
|
||||
Runnable task = (Runnable)iter.next();
|
||||
for (Runnable task : _context.getShutdownTasks()) {
|
||||
try {
|
||||
task.run();
|
||||
} catch (Throwable t) {
|
||||
_log.log(Log.CRIT, "Error running shutdown task", t);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
_log.log(Log.CRIT, "Error running shutdown task", t);
|
||||
}
|
||||
try { _context.clientManager().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the client manager", t); }
|
||||
try { _context.jobQueue().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the job queue", t); }
|
||||
@ -859,6 +850,10 @@ public class Router {
|
||||
public void shutdownGracefully() {
|
||||
shutdownGracefully(EXIT_GRACEFUL);
|
||||
}
|
||||
/**
|
||||
* Call this with EXIT_HARD or EXIT_HARD_RESTART for a non-blocking,
|
||||
* hard, non-graceful shutdown with a brief delay to allow a UI response
|
||||
*/
|
||||
public void shutdownGracefully(int exitCode) {
|
||||
_gracefulExitCode = exitCode;
|
||||
_config.setProperty(PROP_SHUTDOWN_IN_PROGRESS, "true");
|
||||
@ -887,7 +882,9 @@ public class Router {
|
||||
}
|
||||
/** How long until the graceful shutdown will kill us? */
|
||||
public long getShutdownTimeRemaining() {
|
||||
if (_gracefulExitCode <= 0) return -1;
|
||||
if (_gracefulExitCode <= 0) return -1; // maybe Long.MAX_VALUE would be better?
|
||||
if (_gracefulExitCode == EXIT_HARD || _gracefulExitCode == EXIT_HARD_RESTART)
|
||||
return 0;
|
||||
long exp = _context.tunnelManager().getLastParticipatingExpiration();
|
||||
if (exp < 0)
|
||||
return -1;
|
||||
@ -906,9 +903,20 @@ public class Router {
|
||||
while (true) {
|
||||
boolean shutdown = (null != _config.getProperty(PROP_SHUTDOWN_IN_PROGRESS));
|
||||
if (shutdown) {
|
||||
if (_context.tunnelManager().getParticipatingCount() <= 0) {
|
||||
if (_log.shouldLog(Log.CRIT))
|
||||
if (_gracefulExitCode == EXIT_HARD || _gracefulExitCode == EXIT_HARD_RESTART ||
|
||||
_context.tunnelManager().getParticipatingCount() <= 0) {
|
||||
if (_gracefulExitCode == EXIT_HARD)
|
||||
_log.log(Log.CRIT, "Shutting down after a brief delay");
|
||||
else if (_gracefulExitCode == EXIT_HARD_RESTART)
|
||||
_log.log(Log.CRIT, "Restarting after a brief delay");
|
||||
else
|
||||
_log.log(Log.CRIT, "Graceful shutdown progress - no more tunnels, safe to die");
|
||||
// Allow time for a UI reponse
|
||||
try {
|
||||
synchronized (Thread.currentThread()) {
|
||||
Thread.currentThread().wait(2*1000);
|
||||
}
|
||||
} catch (InterruptedException ie) {}
|
||||
shutdown(_gracefulExitCode);
|
||||
return;
|
||||
} else {
|
||||
|
@ -34,6 +34,8 @@ import net.i2p.router.Router;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.router.TunnelInfo;
|
||||
import net.i2p.util.Log;
|
||||
import net.i2p.util.SimpleScheduler;
|
||||
import net.i2p.util.SimpleTimer;
|
||||
|
||||
/**
|
||||
* Send a client message out a random outbound tunnel and into a random inbound
|
||||
@ -98,6 +100,10 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
||||
*/
|
||||
private static final int BUNDLE_PROBABILITY_DEFAULT = 100;
|
||||
|
||||
private static final Object _initializeLock = new Object();
|
||||
private static boolean _initialized = false;
|
||||
private static final int CLEAN_INTERVAL = 5*60*1000;
|
||||
|
||||
/**
|
||||
* Send the sucker
|
||||
*/
|
||||
@ -105,20 +111,26 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
||||
super(ctx);
|
||||
_log = ctx.logManager().getLog(OutboundClientMessageOneShotJob.class);
|
||||
|
||||
ctx.statManager().createFrequencyStat("client.sendMessageFailFrequency", "How often does a client fail to send a message?", "ClientMessages", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
ctx.statManager().createRateStat("client.sendMessageSize", "How large are messages sent by the client?", "ClientMessages", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
ctx.statManager().createRateStat("client.sendAckTime", "Message round trip time", "ClientMessages", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
ctx.statManager().createRateStat("client.timeoutCongestionTunnel", "How lagged our tunnels are when a send times out?", "ClientMessages", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
ctx.statManager().createRateStat("client.timeoutCongestionMessage", "How fast we process messages locally when a send times out?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
ctx.statManager().createRateStat("client.timeoutCongestionInbound", "How much faster we are receiving data than our average bps when a send times out?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
ctx.statManager().createRateStat("client.leaseSetFoundLocally", "How often we tried to look for a leaseSet and found it locally?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
ctx.statManager().createRateStat("client.leaseSetFoundRemoteTime", "How long we tried to look for a remote leaseSet (when we succeeded)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
ctx.statManager().createRateStat("client.leaseSetFailedRemoteTime", "How long we tried to look for a remote leaseSet (when we failed)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
ctx.statManager().createRateStat("client.dispatchPrepareTime", "How long until we've queued up the dispatch job (since we started)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
ctx.statManager().createRateStat("client.dispatchTime", "How long until we've dispatched the message (since we started)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
ctx.statManager().createRateStat("client.dispatchSendTime", "How long the actual dispatching takes?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
ctx.statManager().createRateStat("client.dispatchNoTunnels", "How long after start do we run out of tunnels to send/receive with?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
ctx.statManager().createRateStat("client.dispatchNoACK", "Repeated message sends to a peer (no ack required)", "ClientMessages", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l });
|
||||
synchronized (_initializeLock) {
|
||||
if (!_initialized) {
|
||||
SimpleScheduler.getInstance().addPeriodicEvent(new OCMOSJCacheCleaner(ctx), CLEAN_INTERVAL, CLEAN_INTERVAL);
|
||||
ctx.statManager().createFrequencyStat("client.sendMessageFailFrequency", "How often does a client fail to send a message?", "ClientMessages", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
ctx.statManager().createRateStat("client.sendMessageSize", "How large are messages sent by the client?", "ClientMessages", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
ctx.statManager().createRateStat("client.sendAckTime", "Message round trip time", "ClientMessages", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
ctx.statManager().createRateStat("client.timeoutCongestionTunnel", "How lagged our tunnels are when a send times out?", "ClientMessages", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
ctx.statManager().createRateStat("client.timeoutCongestionMessage", "How fast we process messages locally when a send times out?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
ctx.statManager().createRateStat("client.timeoutCongestionInbound", "How much faster we are receiving data than our average bps when a send times out?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
ctx.statManager().createRateStat("client.leaseSetFoundLocally", "How often we tried to look for a leaseSet and found it locally?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
ctx.statManager().createRateStat("client.leaseSetFoundRemoteTime", "How long we tried to look for a remote leaseSet (when we succeeded)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
ctx.statManager().createRateStat("client.leaseSetFailedRemoteTime", "How long we tried to look for a remote leaseSet (when we failed)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
ctx.statManager().createRateStat("client.dispatchPrepareTime", "How long until we've queued up the dispatch job (since we started)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
ctx.statManager().createRateStat("client.dispatchTime", "How long until we've dispatched the message (since we started)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
ctx.statManager().createRateStat("client.dispatchSendTime", "How long the actual dispatching takes?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
ctx.statManager().createRateStat("client.dispatchNoTunnels", "How long after start do we run out of tunnels to send/receive with?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
ctx.statManager().createRateStat("client.dispatchNoACK", "Repeated message sends to a peer (no ack required)", "ClientMessages", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l });
|
||||
_initialized = true;
|
||||
}
|
||||
}
|
||||
long timeoutMs = OVERALL_TIMEOUT_MS_DEFAULT;
|
||||
_clientMessage = msg;
|
||||
_clientMessageId = msg.getMessageId();
|
||||
@ -201,7 +213,6 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
||||
* Key the cache on the source+dest pair.
|
||||
*/
|
||||
private static HashMap _leaseSetCache = new HashMap();
|
||||
private static long _lscleanTime = 0;
|
||||
private LeaseSet getReplyLeaseSet(boolean force) {
|
||||
LeaseSet newLS = getContext().netDb().lookupLeaseSetLocally(_from.calculateHash());
|
||||
if (newLS == null)
|
||||
@ -235,10 +246,6 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
||||
// If the last leaseSet we sent him is still good, don't bother sending again
|
||||
long now = getContext().clock().now();
|
||||
synchronized (_leaseSetCache) {
|
||||
if (now - _lscleanTime > 5*60*1000) { // clean out periodically
|
||||
cleanLeaseSetCache(_leaseSetCache);
|
||||
_lscleanTime = now;
|
||||
}
|
||||
if (!force) {
|
||||
LeaseSet ls = (LeaseSet) _leaseSetCache.get(hashPair());
|
||||
if (ls != null) {
|
||||
@ -306,7 +313,6 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
||||
*
|
||||
*/
|
||||
private static HashMap _leaseCache = new HashMap();
|
||||
private static long _lcleanTime = 0;
|
||||
private boolean getNextLease() {
|
||||
_leaseSet = getContext().netDb().lookupLeaseSetLocally(_to.calculateHash());
|
||||
if (_leaseSet == null) {
|
||||
@ -319,10 +325,6 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
||||
// Use the same lease if it's still good
|
||||
// Even if _leaseSet changed, _leaseSet.getEncryptionKey() didn't...
|
||||
synchronized (_leaseCache) {
|
||||
if (now - _lcleanTime > 5*60*1000) { // clean out periodically
|
||||
cleanLeaseCache(_leaseCache);
|
||||
_lcleanTime = now;
|
||||
}
|
||||
_lease = (Lease) _leaseCache.get(hashPair());
|
||||
if (_lease != null) {
|
||||
// if outbound tunnel length == 0 && lease.firsthop.isBacklogged() don't use it ??
|
||||
@ -607,7 +609,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
||||
* (needed for cleanTunnelCache)
|
||||
* 44 = 32 * 4 / 3
|
||||
*/
|
||||
private Hash sourceFromHashPair(String s) {
|
||||
private static Hash sourceFromHashPair(String s) {
|
||||
return new Hash(Base64.decode(s.substring(44, 88)));
|
||||
}
|
||||
|
||||
@ -648,8 +650,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
||||
* Clean out old leaseSets from a set.
|
||||
* Caller must synchronize on tc.
|
||||
*/
|
||||
private void cleanLeaseSetCache(HashMap tc) {
|
||||
long now = getContext().clock().now();
|
||||
private static void cleanLeaseSetCache(RouterContext ctx, HashMap tc) {
|
||||
long now = ctx.clock().now();
|
||||
List deleteList = new ArrayList();
|
||||
for (Iterator iter = tc.entrySet().iterator(); iter.hasNext(); ) {
|
||||
Map.Entry entry = (Map.Entry)iter.next();
|
||||
@ -668,7 +670,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
||||
* Clean out old leases from a set.
|
||||
* Caller must synchronize on tc.
|
||||
*/
|
||||
private void cleanLeaseCache(HashMap tc) {
|
||||
private static void cleanLeaseCache(HashMap tc) {
|
||||
List deleteList = new ArrayList();
|
||||
for (Iterator iter = tc.entrySet().iterator(); iter.hasNext(); ) {
|
||||
Map.Entry entry = (Map.Entry)iter.next();
|
||||
@ -687,13 +689,13 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
||||
* Clean out old tunnels from a set.
|
||||
* Caller must synchronize on tc.
|
||||
*/
|
||||
private void cleanTunnelCache(HashMap tc) {
|
||||
private static void cleanTunnelCache(RouterContext ctx, HashMap tc) {
|
||||
List deleteList = new ArrayList();
|
||||
for (Iterator iter = tc.entrySet().iterator(); iter.hasNext(); ) {
|
||||
Map.Entry entry = (Map.Entry)iter.next();
|
||||
String k = (String) entry.getKey();
|
||||
TunnelInfo tunnel = (TunnelInfo) entry.getValue();
|
||||
if (!getContext().tunnelManager().isValidTunnel(sourceFromHashPair(k), tunnel))
|
||||
if (!ctx.tunnelManager().isValidTunnel(sourceFromHashPair(k), tunnel))
|
||||
deleteList.add(k);
|
||||
}
|
||||
for (Iterator iter = deleteList.iterator(); iter.hasNext(); ) {
|
||||
@ -702,6 +704,25 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
||||
}
|
||||
}
|
||||
|
||||
private static class OCMOSJCacheCleaner implements SimpleTimer.TimedEvent {
|
||||
private RouterContext _ctx;
|
||||
private OCMOSJCacheCleaner(RouterContext ctx) {
|
||||
_ctx = ctx;
|
||||
}
|
||||
public void timeReached() {
|
||||
synchronized(_leaseSetCache) {
|
||||
cleanLeaseSetCache(_ctx, _leaseSetCache);
|
||||
}
|
||||
synchronized(_leaseCache) {
|
||||
cleanLeaseCache(_leaseCache);
|
||||
}
|
||||
synchronized(_tunnelCache) {
|
||||
cleanTunnelCache(_ctx, _tunnelCache);
|
||||
cleanTunnelCache(_ctx, _backloggedTunnelCache);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Use the same outbound tunnel as we did for the same destination previously,
|
||||
* if possible, to keep the streaming lib happy
|
||||
@ -712,16 +733,10 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
||||
*/
|
||||
private static HashMap _tunnelCache = new HashMap();
|
||||
private static HashMap _backloggedTunnelCache = new HashMap();
|
||||
private static long _cleanTime = 0;
|
||||
private TunnelInfo selectOutboundTunnel(Destination to) {
|
||||
TunnelInfo tunnel;
|
||||
long now = getContext().clock().now();
|
||||
synchronized (_tunnelCache) {
|
||||
if (now - _cleanTime > 5*60*1000) { // clean out periodically
|
||||
cleanTunnelCache(_tunnelCache);
|
||||
cleanTunnelCache(_backloggedTunnelCache);
|
||||
_cleanTime = now;
|
||||
}
|
||||
/**
|
||||
* If old tunnel is valid and no longer backlogged, use it.
|
||||
* This prevents an active anonymity attack, where a peer could tell
|
||||
|
Reference in New Issue
Block a user