* Router Clock: First cut at recognizing and reacting to large system

clock shifts by partially restarting the router. Also improve
    restarts initiated from config.jsp
    Tickets #465, #468, #494
  * UPnP: Wait for a while to ensure port removal at shutdown or restart
This commit is contained in:
zzz
2011-07-10 00:00:58 +00:00
parent 4fd1800944
commit 42acdc314a
8 changed files with 209 additions and 58 deletions

View File

@ -271,11 +271,12 @@ public class ConfigNetHandler extends FormHandler {
if (switchRequired) { if (switchRequired) {
hiddenSwitch(); hiddenSwitch();
} else if (restartRequired) { } else if (restartRequired) {
if (_context.hasWrapper()) { //if (_context.hasWrapper()) {
// Wow this dumps all conns immediately and really isn't nice // Wow this dumps all conns immediately and really isn't nice
addFormNotice("Performing a soft restart"); addFormNotice("Performing a soft restart");
_context.router().restart(); _context.router().restart();
addFormNotice("Soft restart complete"); // restart() returns immediately now
//addFormNotice("Soft restart complete");
// Most of the time we aren't changing addresses, just enabling or disabling // Most of the time we aren't changing addresses, just enabling or disabling
// things, so let's try just a new routerInfo and see how that works. // things, so let's try just a new routerInfo and see how that works.
@ -285,14 +286,12 @@ public class ConfigNetHandler extends FormHandler {
// So don't do this... // So don't do this...
//_context.router().rebuildRouterInfo(); //_context.router().rebuildRouterInfo();
//addFormNotice("Router Info rebuilt"); //addFormNotice("Router Info rebuilt");
} else { //} else {
// There's a few changes that don't really require restart (e.g. enabling inbound TCP) // There's a few changes that don't really require restart (e.g. enabling inbound TCP)
// But it would be hard to get right, so just do a restart. // But it would be hard to get right, so just do a restart.
addFormError(_("Gracefully restarting I2P to change published router address")); //addFormError(_("Gracefully restarting I2P to change published router address"));
if (_context.hasWrapper()) //_context.router().shutdownGracefully(Router.EXIT_GRACEFUL_RESTART);
_context.addShutdownTask(new ConfigServiceHandler.UpdateWrapperManagerTask(Router.EXIT_GRACEFUL_RESTART)); //}
_context.router().shutdownGracefully(Router.EXIT_GRACEFUL_RESTART);
}
} }
} }

View File

@ -17,7 +17,7 @@ import net.i2p.util.Log;
* forever. * forever.
*/ */
public class Timestamper implements Runnable { public class Timestamper implements Runnable {
private I2PAppContext _context; private final I2PAppContext _context;
private Log _log; private Log _log;
private final List<String> _servers; private final List<String> _servers;
private List<String> _priorityServers; private List<String> _priorityServers;
@ -26,7 +26,7 @@ public class Timestamper implements Runnable {
private int _concurringServers; private int _concurringServers;
private int _consecutiveFails; private int _consecutiveFails;
private volatile boolean _disabled; private volatile boolean _disabled;
private boolean _daemon; private final boolean _daemon;
private boolean _initialized; private boolean _initialized;
private boolean _wellSynced; private boolean _wellSynced;
private volatile boolean _isRunning; private volatile boolean _isRunning;
@ -60,6 +60,10 @@ public class Timestamper implements Runnable {
// moved here to prevent problems with synchronized statements. // moved here to prevent problems with synchronized statements.
_servers = new ArrayList(3); _servers = new ArrayList(3);
_listeners = new CopyOnWriteArrayList(); _listeners = new CopyOnWriteArrayList();
_context = ctx;
_daemon = daemon;
// DO NOT initialize _log here, stack overflow via LogManager init loop
// Don't bother starting a thread if we are disabled. // Don't bother starting a thread if we are disabled.
// This means we no longer check every 5 minutes to see if we got enabled, // This means we no longer check every 5 minutes to see if we got enabled,
// so the property must be set at startup. // so the property must be set at startup.
@ -69,10 +73,6 @@ public class Timestamper implements Runnable {
_initialized = true; _initialized = true;
return; return;
} }
_context = ctx;
_daemon = daemon;
_initialized = false;
_wellSynced = false;
if (lsnr != null) if (lsnr != null)
_listeners.add(lsnr); _listeners.add(lsnr);
updateConfig(); updateConfig();
@ -124,6 +124,15 @@ public class Timestamper implements Runnable {
} catch (InterruptedException ie) {} } catch (InterruptedException ie) {}
} }
/**
* Update the time immediately.
* @since 0.8.8
*/
public void timestampNow() {
if (_initialized && _isRunning && (!_disabled) && _timestamperThread != null)
_timestamperThread.interrupt();
}
/** @since 0.8.8 */ /** @since 0.8.8 */
private class Shutdown implements Runnable { private class Shutdown implements Runnable {
public void run() { public void run() {

View File

@ -1,6 +1,5 @@
package net.i2p.util; package net.i2p.util;
import java.util.Iterator;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CopyOnWriteArraySet;
@ -21,11 +20,11 @@ import net.i2p.time.Timestamper;
public class Clock implements Timestamper.UpdateListener { public class Clock implements Timestamper.UpdateListener {
protected final I2PAppContext _context; protected final I2PAppContext _context;
private final Timestamper _timestamper; private final Timestamper _timestamper;
protected final long _startedOn; protected long _startedOn;
protected boolean _statCreated; protected boolean _statCreated;
protected volatile long _offset; protected volatile long _offset;
protected boolean _alreadyChanged; protected boolean _alreadyChanged;
private final Set _listeners; private final Set<ClockUpdateListener> _listeners;
public Clock(I2PAppContext context) { public Clock(I2PAppContext context) {
_context = context; _context = context;
@ -33,6 +32,7 @@ public class Clock implements Timestamper.UpdateListener {
_timestamper = new Timestamper(context, this); _timestamper = new Timestamper(context, this);
_startedOn = System.currentTimeMillis(); _startedOn = System.currentTimeMillis();
} }
public static Clock getInstance() { public static Clock getInstance() {
return I2PAppContext.getGlobalContext().clock(); return I2PAppContext.getGlobalContext().clock();
} }
@ -151,13 +151,17 @@ public class Clock implements Timestamper.UpdateListener {
} }
protected void fireOffsetChanged(long delta) { protected void fireOffsetChanged(long delta) {
for (Iterator iter = _listeners.iterator(); iter.hasNext();) { for (ClockUpdateListener lsnr : _listeners) {
ClockUpdateListener lsnr = (ClockUpdateListener) iter.next();
lsnr.offsetChanged(delta); lsnr.offsetChanged(delta);
} }
} }
public static interface ClockUpdateListener { public interface ClockUpdateListener {
/**
* @param delta = (new offset - old offset),
* where each offset = (now() - System.currentTimeMillis())
*/
public void offsetChanged(long delta); public void offsetChanged(long delta);
} }
} }

View File

@ -58,7 +58,7 @@ import net.i2p.util.SimpleTimer;
* Main driver for the router. * Main driver for the router.
* *
*/ */
public class Router { public class Router implements RouterClock.ClockShiftListener {
private Log _log; private Log _log;
private RouterContext _context; private RouterContext _context;
private final Map<String, String> _config; private final Map<String, String> _config;
@ -70,7 +70,7 @@ public class Router {
private boolean _higherVersionSeen; private boolean _higherVersionSeen;
//private SessionKeyPersistenceHelper _sessionKeyPersistenceHelper; //private SessionKeyPersistenceHelper _sessionKeyPersistenceHelper;
private boolean _killVMOnEnd; private boolean _killVMOnEnd;
private boolean _isAlive; private volatile boolean _isAlive;
private int _gracefulExitCode; private int _gracefulExitCode;
private I2PThread.OOMEventListener _oomListener; private I2PThread.OOMEventListener _oomListener;
private ShutdownHook _shutdownHook; private ShutdownHook _shutdownHook;
@ -351,9 +351,15 @@ public class Router {
/** /**
* True if the router has tried to communicate with another router who is running a higher * True if the router has tried to communicate with another router who is running a higher
* incompatible protocol version. * incompatible protocol version.
* * @deprecated unused
*/ */
public boolean getHigherVersionSeen() { return _higherVersionSeen; } public boolean getHigherVersionSeen() { return _higherVersionSeen; }
/**
* True if the router has tried to communicate with another router who is running a higher
* incompatible protocol version.
* @deprecated unused
*/
public void setHigherVersionSeen(boolean seen) { _higherVersionSeen = seen; } public void setHigherVersionSeen(boolean seen) { _higherVersionSeen = seen; }
public long getWhenStarted() { return _started; } public long getWhenStarted() { return _started; }
@ -361,7 +367,7 @@ public class Router {
/** wall clock uptime */ /** wall clock uptime */
public long getUptime() { public long getUptime() {
if ( (_context == null) || (_context.clock() == null) ) return 1; // racing on startup if ( (_context == null) || (_context.clock() == null) ) return 1; // racing on startup
return _context.clock().now() - _context.clock().getOffset() - _started; return Math.max(1, _context.clock().now() - _context.clock().getOffset() - _started);
} }
public RouterContext getContext() { return _context; } public RouterContext getContext() { return _context; }
@ -961,7 +967,11 @@ public class Router {
public static final int EXIT_HARD_RESTART = 4; public static final int EXIT_HARD_RESTART = 4;
public static final int EXIT_GRACEFUL_RESTART = 5; public static final int EXIT_GRACEFUL_RESTART = 5;
/**
* Shutdown with no chance of cancellation
*/
public void shutdown(int exitCode) { public void shutdown(int exitCode) {
((RouterClock) _context.clock()).removeShiftListener(this);
_isAlive = false; _isAlive = false;
_context.random().saveSeed(); _context.random().saveSeed();
I2PThread.removeOOMEventListener(_oomListener); I2PThread.removeOOMEventListener(_oomListener);
@ -1199,33 +1209,77 @@ public class Router {
return true; return true;
} }
/**
* The clock shift listener.
* Restart the router if we should.
*
* @since 0.8.8
*/
public void clockShift(long delta) {
if (gracefulShutdownInProgress() || !_isAlive)
return;
if (delta > -60*1000 && delta < 60*1000)
return;
if (_context.commSystem().countActivePeers() <= 0)
return;
if (delta > 0)
_log.error("Restarting after large clock shift forward by " + DataHelper.formatDuration(delta));
else
_log.error("Restarting after large clock shift backward by " + DataHelper.formatDuration(0 - delta));
restart();
}
/** /**
* A "soft" restart, primarily of the comm system, after * A "soft" restart, primarily of the comm system, after
* a port change or large step-change in system time. * a port change or large step-change in system time.
* Does not stop the whole JVM, so it is safe even in the absence * Does not stop the whole JVM, so it is safe even in the absence
* of the wrapper. * of the wrapper.
* This is not a graceful restart - all peer connections are dropped. * This is not a graceful restart - all peer connections are dropped immediately.
*
* As of 0.8.8, this returns immediately and does the actual restart in a separate thread.
* Poll isAlive() if you need to know when the restart is complete.
*/ */
public void restart() { public synchronized void restart() {
if (gracefulShutdownInProgress() || !_isAlive)
return;
((RouterClock) _context.clock()).removeShiftListener(this);
_isAlive = false; _isAlive = false;
Thread t = new Thread(new Restarter(), "Router Restart");
t.start();
}
try { _context.commSystem().restart(); } catch (Throwable t) { _log.log(Log.CRIT, "Error restarting the comm system", t); } /**
try { _context.clientManager().restart(); } catch (Throwable t) { _log.log(Log.CRIT, "Error restarting the client manager", t); } * @since 0.8.8
try { _context.tunnelManager().restart(); } catch (Throwable t) { _log.log(Log.CRIT, "Error restarting the tunnel manager", t); } */
try { _context.peerManager().restart(); } catch (Throwable t) { _log.log(Log.CRIT, "Error restarting the peer manager", t); } private class Restarter implements Runnable {
try { _context.netDb().restart(); } catch (Throwable t) { _log.log(Log.CRIT, "Error restarting the networkDb", t); } public void run() {
_started = _context.clock().now();
_log.error("Stopping the router for a restart...");
_log.logAlways(Log.WARN, "Stopping the client manager");
try { _context.clientManager().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error stopping the client manager", t); }
_log.logAlways(Log.WARN, "Stopping the comm system");
try { _context.commSystem().restart(); } catch (Throwable t) { _log.log(Log.CRIT, "Error restarting the comm system", t); }
_log.logAlways(Log.WARN, "Stopping the tunnel manager");
try { _context.tunnelManager().restart(); } catch (Throwable t) { _log.log(Log.CRIT, "Error restarting the tunnel manager", t); }
//try { _context.jobQueue().restart(); } catch (Throwable t) { _log.log(Log.CRIT, "Error restarting the job queue", t); } //try { _context.peerManager().restart(); } catch (Throwable t) { _log.log(Log.CRIT, "Error restarting the peer manager", t); }
//try { _context.netDb().restart(); } catch (Throwable t) { _log.log(Log.CRIT, "Error restarting the networkDb", t); }
//try { _context.jobQueue().restart(); } catch (Throwable t) { _log.log(Log.CRIT, "Error restarting the job queue", t); }
_log.log(Log.CRIT, "Restart teardown complete... "); _log.logAlways(Log.WARN, "Router teardown complete, restarting the router...");
try { Thread.sleep(10*1000); } catch (InterruptedException ie) {} try { Thread.sleep(10*1000); } catch (InterruptedException ie) {}
_log.log(Log.CRIT, "Restarting..."); _log.logAlways(Log.WARN, "Restarting the comm system");
_log.logAlways(Log.WARN, "Restarting the tunnel manager");
_log.logAlways(Log.WARN, "Restarting the client manager");
try { _context.clientManager().startup(); } catch (Throwable t) { _log.log(Log.CRIT, "Error stopping the client manager", t); }
_isAlive = true; _isAlive = true;
_started = _context.clock().now(); rebuildRouterInfo();
_log.log(Log.CRIT, "Restart complete"); _log.logAlways(Log.WARN, "Restart complete");
((RouterClock) _context.clock()).addShiftListener(Router.this);
}
} }
public static void main(String args[]) { public static void main(String args[]) {

View File

@ -1,5 +1,9 @@
package net.i2p.router; package net.i2p.router;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import net.i2p.data.DataHelper;
import net.i2p.util.Clock; import net.i2p.util.Clock;
import net.i2p.util.Log; import net.i2p.util.Log;
@ -26,6 +30,7 @@ public class RouterClock extends Clock {
private static final long MAX_SLEW = 50; private static final long MAX_SLEW = 50;
public static final int DEFAULT_STRATUM = 8; public static final int DEFAULT_STRATUM = 8;
private static final int WORST_STRATUM = 16; private static final int WORST_STRATUM = 16;
/** the max NTP Timestamper delay is 30m right now, make this longer than that */ /** the max NTP Timestamper delay is 30m right now, make this longer than that */
private static final long MIN_DELAY_FOR_WORSE_STRATUM = 45*60*1000; private static final long MIN_DELAY_FOR_WORSE_STRATUM = 45*60*1000;
private volatile long _desiredOffset; private volatile long _desiredOffset;
@ -34,12 +39,21 @@ public class RouterClock extends Clock {
private long _lastChanged; private long _lastChanged;
private int _lastStratum; private int _lastStratum;
private final RouterContext _contextRC; /**
* If the system clock shifts by this much (positive or negative),
* call the callback, we probably need a soft restart.
* @since 0.8.8
*/
private static final long MASSIVE_SHIFT = 75*1000;
private final Set<ClockShiftListener> _shiftListeners;
private volatile long _lastShiftNanos;
public RouterClock(RouterContext context) { public RouterClock(RouterContext context) {
super(context); super(context);
_contextRC = context;
_lastStratum = WORST_STRATUM; _lastStratum = WORST_STRATUM;
_lastSlewed = System.currentTimeMillis();
_shiftListeners = new CopyOnWriteArraySet();
_lastShiftNanos = System.nanoTime();
} }
/** /**
@ -98,11 +112,11 @@ public class RouterClock extends Clock {
} }
// If so configured, check sanity of proposed clock offset // If so configured, check sanity of proposed clock offset
if (_contextRC.getBooleanPropertyDefaultTrue("router.clockOffsetSanityCheck") && if (_context.getBooleanPropertyDefaultTrue("router.clockOffsetSanityCheck") &&
_alreadyChanged) { _alreadyChanged) {
// Try calculating peer clock skew // Try calculating peer clock skew
long currentPeerClockSkew = _contextRC.commSystem().getFramedAveragePeerClockSkew(50); long currentPeerClockSkew = ((RouterContext)_context).commSystem().getFramedAveragePeerClockSkew(50);
// Predict the effect of applying the proposed clock offset // Predict the effect of applying the proposed clock offset
long predictedPeerClockSkew = currentPeerClockSkew + delta; long predictedPeerClockSkew = currentPeerClockSkew + delta;
@ -131,10 +145,10 @@ public class RouterClock extends Clock {
getLog().info("Updating target clock offset to " + offsetMs + "ms from " + _offset + "ms, Stratum " + stratum); getLog().info("Updating target clock offset to " + offsetMs + "ms from " + _offset + "ms, Stratum " + stratum);
if (!_statCreated) { if (!_statCreated) {
_contextRC.statManager().createRequiredRateStat("clock.skew", "Clock step adjustment (ms)", "Clock", new long[] { 10*60*1000, 3*60*60*1000, 24*60*60*60 }); _context.statManager().createRequiredRateStat("clock.skew", "Clock step adjustment (ms)", "Clock", new long[] { 10*60*1000, 3*60*60*1000, 24*60*60*60 });
_statCreated = true; _statCreated = true;
} }
_contextRC.statManager().addRateData("clock.skew", delta, 0); _context.statManager().addRateData("clock.skew", delta, 0);
_desiredOffset = offsetMs; _desiredOffset = offsetMs;
} else { } else {
getLog().log(Log.INFO, "Initializing clock offset to " + offsetMs + "ms, Stratum " + stratum); getLog().log(Log.INFO, "Initializing clock offset to " + offsetMs + "ms, Stratum " + stratum);
@ -177,7 +191,12 @@ public class RouterClock extends Clock {
long systemNow = System.currentTimeMillis(); long systemNow = System.currentTimeMillis();
// copy the global, so two threads don't both increment or decrement _offset // copy the global, so two threads don't both increment or decrement _offset
long offset = _offset; long offset = _offset;
if (systemNow >= _lastSlewed + MAX_SLEW) { long sinceLastSlewed = systemNow - _lastSlewed;
if (sinceLastSlewed >= MASSIVE_SHIFT ||
sinceLastSlewed <= 0 - MASSIVE_SHIFT) {
_lastSlewed = systemNow;
notifyMassive(sinceLastSlewed);
} else if (sinceLastSlewed >= MAX_SLEW) {
// copy the global // copy the global
long desiredOffset = _desiredOffset; long desiredOffset = _desiredOffset;
if (desiredOffset > offset) { if (desiredOffset > offset) {
@ -196,6 +215,66 @@ public class RouterClock extends Clock {
return offset + systemNow; return offset + systemNow;
} }
/*
* A large system clock shift happened. Tell people about it.
*
* @since 0.8.8
*/
private void notifyMassive(long shift) {
long nowNanos = System.nanoTime();
// try to prevent dups, not guaranteed
// nanoTime() isn't guaranteed to be monotonic either :(
if (nowNanos < _lastShiftNanos + MASSIVE_SHIFT)
return;
_lastShiftNanos = nowNanos;
// reset these so the offset can be reset by the timestamper again
_startedOn = System.currentTimeMillis();
_alreadyChanged = false;
getTimestamper().timestampNow();
if (shift > 0)
getLog().log(Log.CRIT, "Large clock shift forward by " + DataHelper.formatDuration(shift));
else
getLog().log(Log.CRIT, "Large clock shift backward by " + DataHelper.formatDuration(0 - shift));
for (ClockShiftListener lsnr : _shiftListeners) {
lsnr.clockShift(shift);
}
}
/*
* Get notified of massive System clock shifts, positive or negative -
* generally a minute or more.
* The adjusted (offset) clock changes by the same amount.
* The offset itself did not change.
* Warning - duplicate notifications may occur.
*
* @since 0.8.8
*/
public void addShiftListener(ClockShiftListener lsnr) {
_shiftListeners.add(lsnr);
}
/*
* @since 0.8.8
*/
public void removeShiftListener(ClockShiftListener lsnr) {
_shiftListeners.remove(lsnr);
}
/*
* @since 0.8.8
*/
public interface ClockShiftListener {
/**
* @param delta The system clock and adjusted clock just changed by this much,
* in milliseconds (approximately)
*/
public void clockShift(long delta);
}
/* /*
* How far we still have to slew, for diagnostics * How far we still have to slew, for diagnostics
* @since 0.7.12 * @since 0.7.12
@ -204,5 +283,4 @@ public class RouterClock extends Clock {
public long getDeltaOffset() { public long getDeltaOffset() {
return _desiredOffset - _offset; return _desiredOffset - _offset;
} }
} }

View File

@ -91,15 +91,8 @@ class ClientManager {
// to let the old listener die // to let the old listener die
try { Thread.sleep(2*1000); } catch (InterruptedException ie) {} try { Thread.sleep(2*1000); } catch (InterruptedException ie) {}
int port = ClientManagerFacadeImpl.DEFAULT_PORT; int port = _ctx.getProperty(ClientManagerFacadeImpl.PROP_CLIENT_PORT,
String portStr = _ctx.router().getConfigSetting(ClientManagerFacadeImpl.PROP_CLIENT_PORT); ClientManagerFacadeImpl.DEFAULT_PORT);
if (portStr != null) {
try {
port = Integer.parseInt(portStr);
} catch (NumberFormatException nfe) {
_log.error("Error setting the port: " + portStr + " is not valid", nfe);
}
}
startListeners(port); startListeners(port);
} }

View File

@ -11,6 +11,7 @@ package net.i2p.router.startup;
import net.i2p.router.Job; import net.i2p.router.Job;
import net.i2p.router.JobImpl; import net.i2p.router.JobImpl;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.router.RouterClock;
import net.i2p.util.Log; import net.i2p.util.Log;
/** This actually boots almost everything */ /** This actually boots almost everything */
@ -45,6 +46,7 @@ public class BootCommSystemJob extends JobImpl {
getContext().jobQueue().addJob(new StartAcceptingClientsJob(getContext())); getContext().jobQueue().addJob(new StartAcceptingClientsJob(getContext()));
getContext().jobQueue().addJob(new ReadConfigJob(getContext())); getContext().jobQueue().addJob(new ReadConfigJob(getContext()));
((RouterClock) getContext().clock()).addShiftListener(getContext().router());
} }
private void startupDb() { private void startupDb() {

View File

@ -91,8 +91,20 @@ class UPnP extends ControlPoint implements DeviceChangeListener, EventListener {
return super.start(); return super.start();
} }
/**
* WARNING - Blocking up to 2 seconds
*/
public void terminate() { public void terminate() {
// this gets spun off in a thread...
unregisterPortMappings(); unregisterPortMappings();
// If we stop too early and we've forwarded multiple ports,
// the later ones don't get unregistered
int i = 0;
while (i++ < 20 && !portsForwarded.isEmpty()) {
try {
Thread.sleep(100);
} catch (InterruptedException ie) {}
}
super.stop(); super.stop();
_router = null; _router = null;
_service = null; _service = null;
@ -672,7 +684,7 @@ class UPnP extends ControlPoint implements DeviceChangeListener, EventListener {
*/ */
private void unregisterPorts(Set<ForwardPort> portsToForwardNow) { private void unregisterPorts(Set<ForwardPort> portsToForwardNow) {
Thread t = new Thread(new UnregisterPortsThread(portsToForwardNow)); Thread t = new Thread(new UnregisterPortsThread(portsToForwardNow));
t.setName("UPnP Port Opener " + (++__id)); t.setName("UPnP Port Closer " + (++__id));
t.setDaemon(true); t.setDaemon(true);
t.start(); t.start();
} }