* Message Registry: Clear pending messages at restart / shutdown

* OCMOSJ: Clear caches at restart
  * UPnP:
    - Fix device rediscovery and port opening after restart
This commit is contained in:
zzz
2011-07-10 15:04:42 +00:00
parent 42acdc314a
commit 3455d3f943
9 changed files with 83 additions and 8 deletions

View File

@ -1,3 +1,20 @@
2011-07-10 zzz
* DH, YK:
- Improve YK speed test
- Shut down thread faster
- Refiller keeps going until full
- Cleanups
* I2PTunnel: Fix a shutdown hang
* Message Registry: Clear pending messages at restart / shutdown
* OCMOSJ: Clear caches at restart
* Router Clock: First cut at recognizing and reacting to large system
clock shifts by partially restarting the router. Also improve
restarts initiated from config.jsp
Tickets #465, #468, #494
* UPnP:
- Wait for a while to ensure port removal at shutdown or restart
- Fix device rediscovery and port opening after restart
2011-07-08 zzz 2011-07-08 zzz
* Findbugs: Several fixes and cleanups * Findbugs: Several fixes and cleanups
* I2NP: Consolidate common code from TunnelBuildMessage and * I2NP: Consolidate common code from TunnelBuildMessage and

View File

@ -39,6 +39,13 @@ public class ClientMessagePool {
OutboundClientMessageOneShotJob.clearAllCaches(); OutboundClientMessageOneShotJob.clearAllCaches();
} }
/**
* @since 0.8.8
*/
public void restart() {
shutdown();
}
/** /**
* Add a new message to the pool. The message can either be locally or * Add a new message to the pool. The message can either be locally or
* remotely destined. * remotely destined.

View File

@ -321,11 +321,15 @@ public class InNetMessagePool implements Service {
} }
public void renderStatusHTML(Writer out) {} public void renderStatusHTML(Writer out) {}
/** does nothing since we aren't threaded */
public void restart() { public void restart() {
shutdown(); shutdown();
try { Thread.sleep(100); } catch (InterruptedException ie) {} try { Thread.sleep(100); } catch (InterruptedException ie) {}
startup(); startup();
} }
/** does nothing since we aren't threaded */
public void shutdown() { public void shutdown() {
_alive = false; _alive = false;
if (!DISPATCH_DIRECT) { if (!DISPATCH_DIRECT) {
@ -337,6 +341,7 @@ public class InNetMessagePool implements Service {
} }
} }
/** does nothing since we aren't threaded */
public void startup() { public void startup() {
_alive = true; _alive = true;
_dispatchThreaded = DEFAULT_DISPATCH_THREADED; _dispatchThreaded = DEFAULT_DISPATCH_THREADED;

View File

@ -1258,6 +1258,7 @@ public class Router implements RouterClock.ClockShiftListener {
_log.logAlways(Log.WARN, "Stopping the client manager"); _log.logAlways(Log.WARN, "Stopping the client manager");
try { _context.clientManager().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error stopping the client manager", t); } try { _context.clientManager().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error stopping the client manager", t); }
_log.logAlways(Log.WARN, "Stopping the comm system"); _log.logAlways(Log.WARN, "Stopping the comm system");
try { _context.messageRegistry().restart(); } catch (Throwable t) { _log.log(Log.CRIT, "Error restarting the message registry", t); }
try { _context.commSystem().restart(); } catch (Throwable t) { _log.log(Log.CRIT, "Error restarting the comm system", t); } try { _context.commSystem().restart(); } catch (Throwable t) { _log.log(Log.CRIT, "Error restarting the comm system", t); }
_log.logAlways(Log.WARN, "Stopping the tunnel manager"); _log.logAlways(Log.WARN, "Stopping the tunnel manager");
try { _context.tunnelManager().restart(); } catch (Throwable t) { _log.log(Log.CRIT, "Error restarting the tunnel manager", t); } try { _context.tunnelManager().restart(); } catch (Throwable t) { _log.log(Log.CRIT, "Error restarting the tunnel manager", t); }
@ -1272,7 +1273,8 @@ public class Router implements RouterClock.ClockShiftListener {
_log.logAlways(Log.WARN, "Restarting the comm system"); _log.logAlways(Log.WARN, "Restarting the comm system");
_log.logAlways(Log.WARN, "Restarting the tunnel manager"); _log.logAlways(Log.WARN, "Restarting the tunnel manager");
_log.logAlways(Log.WARN, "Restarting the client manager"); _log.logAlways(Log.WARN, "Restarting the client manager");
try { _context.clientManager().startup(); } catch (Throwable t) { _log.log(Log.CRIT, "Error stopping the client manager", t); } try { _context.clientMessagePool().restart(); } catch (Throwable t) { _log.log(Log.CRIT, "Error restarting the CMP", t); }
try { _context.clientManager().startup(); } catch (Throwable t) { _log.log(Log.CRIT, "Error starting the client manager", t); }
_isAlive = true; _isAlive = true;
rebuildRouterInfo(); rebuildRouterInfo();

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */ /** deprecated */
public final static String ID = "Monotone"; public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION; public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 8; public final static long BUILD = 9;
/** for example "-test" */ /** for example "-test" */
public final static String EXTRA = ""; public final static String EXTRA = "";

View File

@ -28,15 +28,15 @@ import net.i2p.util.Log;
import net.i2p.util.SimpleTimer; import net.i2p.util.SimpleTimer;
public class OutboundMessageRegistry { public class OutboundMessageRegistry {
private Log _log; private final Log _log;
/** list of currently active MessageSelector instances */ /** list of currently active MessageSelector instances */
private final List _selectors; private final List _selectors;
/** map of active MessageSelector to either an OutNetMessage or a List of OutNetMessages causing it (for quick removal) */ /** map of active MessageSelector to either an OutNetMessage or a List of OutNetMessages causing it (for quick removal) */
private final Map _selectorToMessage; private final Map _selectorToMessage;
/** set of active OutNetMessage (for quick removal and selector fetching) */ /** set of active OutNetMessage (for quick removal and selector fetching) */
private final Set _activeMessages; private final Set _activeMessages;
private CleanupTask _cleanupTask; private final CleanupTask _cleanupTask;
private RouterContext _context; private final RouterContext _context;
public OutboundMessageRegistry(RouterContext context) { public OutboundMessageRegistry(RouterContext context) {
_context = context; _context = context;
@ -47,7 +47,29 @@ public class OutboundMessageRegistry {
_cleanupTask = new CleanupTask(); _cleanupTask = new CleanupTask();
} }
public void shutdown() {} /**
* Does something @since 0.8.8
*/
public void shutdown() {
synchronized (_selectors) {
_selectors.clear();
}
synchronized (_selectorToMessage) {
_selectorToMessage.clear();
}
// Calling the fail job for every active message would
// be way too much at shutdown/restart, right?
synchronized (_activeMessages) {
_activeMessages.clear();
}
}
/**
* @since 0.8.8
*/
public void restart() {
shutdown();
}
/** /**
* Retrieve all messages that are waiting for the specified message. In * Retrieve all messages that are waiting for the specified message. In

View File

@ -88,6 +88,9 @@ class UPnP extends ControlPoint implements DeviceChangeListener, EventListener {
} }
public boolean runPlugin() { public boolean runPlugin() {
synchronized(lock) {
portsToForward = null;
}
return super.start(); return super.start();
} }
@ -95,6 +98,9 @@ class UPnP extends ControlPoint implements DeviceChangeListener, EventListener {
* WARNING - Blocking up to 2 seconds * WARNING - Blocking up to 2 seconds
*/ */
public void terminate() { public void terminate() {
synchronized(lock) {
portsToForward = null;
}
// this gets spun off in a thread... // this gets spun off in a thread...
unregisterPortMappings(); unregisterPortMappings();
// If we stop too early and we've forwarded multiple ports, // If we stop too early and we've forwarded multiple ports,
@ -621,7 +627,11 @@ class UPnP extends ControlPoint implements DeviceChangeListener, EventListener {
} }
portsToForward = ports; portsToForward = ports;
} }
if(_router == null) return; // When one is found, we will do the forwards if(_router == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("No UPnP router available to update");
return; // When one is found, we will do the forwards
}
} }
if(portsToDumpNow != null) if(portsToDumpNow != null)
unregisterPorts(portsToDumpNow); unregisterPorts(portsToDumpNow);
@ -645,6 +655,8 @@ class UPnP extends ControlPoint implements DeviceChangeListener, EventListener {
* so throw this in a thread. * so throw this in a thread.
*/ */
private void registerPorts(Set<ForwardPort> portsToForwardNow) { private void registerPorts(Set<ForwardPort> portsToForwardNow) {
if (_log.shouldLog(Log.INFO))
_log.info("Starting thread to forward " + portsToForwardNow.size() + " ports");
Thread t = new Thread(new RegisterPortsThread(portsToForwardNow)); Thread t = new Thread(new RegisterPortsThread(portsToForwardNow));
t.setName("UPnP Port Opener " + (++__id)); t.setName("UPnP Port Opener " + (++__id));
t.setDaemon(true); t.setDaemon(true);
@ -683,6 +695,8 @@ class UPnP extends ControlPoint implements DeviceChangeListener, EventListener {
* so throw this in a thread. * so throw this in a thread.
*/ */
private void unregisterPorts(Set<ForwardPort> portsToForwardNow) { private void unregisterPorts(Set<ForwardPort> portsToForwardNow) {
if (_log.shouldLog(Log.INFO))
_log.info("Starting thread to un-forward " + portsToForwardNow.size() + " ports");
Thread t = new Thread(new UnregisterPortsThread(portsToForwardNow)); Thread t = new Thread(new UnregisterPortsThread(portsToForwardNow));
t.setName("UPnP Port Closer " + (++__id)); t.setName("UPnP Port Closer " + (++__id));
t.setDaemon(true); t.setDaemon(true);

View File

@ -60,6 +60,8 @@ class UPnPManager {
} }
public synchronized void start() { public synchronized void start() {
if (_log.shouldLog(Log.DEBUG))
_log.debug("UPnP Start");
if (!_isRunning) if (!_isRunning)
_isRunning = _upnp.runPlugin(); _isRunning = _upnp.runPlugin();
if (!_isRunning) if (!_isRunning)
@ -83,7 +85,7 @@ class UPnPManager {
*/ */
public void update(Map<String, Integer> ports) { public void update(Map<String, Integer> ports) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("UPnP Update:"); _log.debug("UPnP Update with " + ports.size() + " ports");
if (!_isRunning) if (!_isRunning)
return; return;
Set<ForwardPort> forwards = new HashSet(ports.size()); Set<ForwardPort> forwards = new HashSet(ports.size());

View File

@ -914,6 +914,12 @@ public class ControlPoint implements HTTPRequestListener
setRenewSubscriber(null); setRenewSubscriber(null);
} }
// I2P so we will re-notify on restart
DeviceList dl = getDeviceList();
for (int i = 0; i < dl.size(); i++) {
removeDevice(dl.getDevice(i));
}
return true; return true;
} }