* Shutdown:

- Implement and call shutdown for BandwidthRefiller
    - Implement and register shutdown hook for Timestamper
    - Implement and register shutdown hook for Jetty console server
    - Fix UPnP-SSDPNotifySocket thread not stopping
    - Fix all but one UDP PacketHandler threads not stopping
This commit is contained in:
zzz
2011-06-16 19:04:23 +00:00
parent 518fdd8c03
commit cb72bb0427
11 changed files with 104 additions and 25 deletions

View File

@ -37,6 +37,8 @@ public class NewsFetcher implements Runnable, EepGet.StatusListener {
private File _newsFile;
private File _tempFile;
private static NewsFetcher _instance;
private volatile boolean _isRunning;
//public static final synchronized NewsFetcher getInstance() { return _instance; }
public static final synchronized NewsFetcher getInstance(I2PAppContext ctx) {
if (_instance != null)
@ -64,6 +66,12 @@ public class NewsFetcher implements Runnable, EepGet.StatusListener {
_tempFile = new File(_context.getTempDir(), TEMP_NEWS_FILE);
updateLastFetched();
_updateVersion = "";
_isRunning = true;
}
/** @since 0.8.8 */
void shutdown() {
_isRunning = false;
}
private void updateLastFetched() {
@ -108,7 +116,7 @@ public class NewsFetcher implements Runnable, EepGet.StatusListener {
public void run() {
try { Thread.sleep(INITIAL_DELAY + _context.random().nextLong(INITIAL_DELAY)); } catch (InterruptedException ie) {}
while (true) {
while (_isRunning) {
if (!_updateAvailable) checkForUpdates();
if (shouldFetchNews()) {
fetchNews();

View File

@ -342,10 +342,10 @@ public class RouterConsoleRunner {
}
NewsFetcher fetcher = NewsFetcher.getInstance(I2PAppContext.getGlobalContext());
Thread t = new I2PAppThread(fetcher, "NewsFetcher", true);
t.start();
Thread newsThread = new I2PAppThread(fetcher, "NewsFetcher", true);
newsThread.start();
t = new I2PAppThread(new StatSummarizer(), "StatSummarizer", true);
Thread t = new I2PAppThread(new StatSummarizer(), "StatSummarizer", true);
t.start();
List<RouterContext> contexts = RouterContext.listContexts();
@ -356,6 +356,9 @@ public class RouterConsoleRunner {
t.start();
ctx.addShutdownTask(new PluginStopper(ctx));
}
ctx.addShutdownTask(new NewsShutdown(fetcher, newsThread));
// stat summarizer registers its own hook
ctx.addShutdownTask(new ServerShutdown());
}
}
@ -495,15 +498,30 @@ public class RouterConsoleRunner {
}
}
/*******
public void stopConsole() {
/** @since 0.8.8 */
private class ServerShutdown implements Runnable {
public void run() {
try {
_server.stop();
} catch (InterruptedException ie) {
ie.printStackTrace();
} catch (InterruptedException ie) {}
}
}
/** @since 0.8.8 */
private static class NewsShutdown implements Runnable {
private final NewsFetcher _fetcher;
private final Thread _newsThread;
public NewsShutdown(NewsFetcher fetcher, Thread t) {
_fetcher = fetcher;
_newsThread = t;
}
public void run() {
_fetcher.shutdown();
_newsThread.interrupt();
}
}
********/
public static Properties webAppProperties() {
return webAppProperties(I2PAppContext.getGlobalContext().getConfigDir().getAbsolutePath());

View File

@ -29,6 +29,8 @@ public class Timestamper implements Runnable {
private boolean _daemon;
private boolean _initialized;
private boolean _wellSynced;
private volatile boolean _isRunning;
private Thread _timestamperThread;
private static final int MIN_QUERY_FREQUENCY = 5*60*1000;
private static final int DEFAULT_QUERY_FREQUENCY = 5*60*1000;
@ -106,10 +108,11 @@ public class Timestamper implements Runnable {
}
private void startTimestamper() {
I2PThread t = new I2PThread(this, "Timestamper");
t.setPriority(I2PThread.MIN_PRIORITY);
t.setDaemon(_daemon);
t.start();
_timestamperThread = new I2PThread(this, "Timestamper", _daemon);
_timestamperThread.setPriority(I2PThread.MIN_PRIORITY);
_isRunning = true;
_timestamperThread.start();
_context.addShutdownTask(new Shutdown());
}
public void waitForInitialization() {
@ -121,6 +124,15 @@ public class Timestamper implements Runnable {
} catch (InterruptedException ie) {}
}
/** @since 0.8.8 */
private class Shutdown implements Runnable {
public void run() {
_isRunning = false;
if (_timestamperThread != null)
_timestamperThread.interrupt();
}
}
public void run() {
try { Thread.sleep(1000); } catch (InterruptedException ie) {}
_log = _context.logManager().getLog(Timestamper.class);
@ -128,7 +140,7 @@ public class Timestamper implements Runnable {
_log.info("Starting timestamper");
boolean lastFailed = false;
try {
while (true) {
while (_isRunning) {
updateConfig();
if (!_disabled) {
// first the servers for our country, if we know what country we're in...

View File

@ -967,6 +967,7 @@ public class Router {
try { _context.tunnelDispatcher().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the tunnel dispatcher", t); }
try { _context.netDb().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the networkDb", t); }
try { _context.commSystem().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the comm system", t); }
try { _context.bandwidthLimiter().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the comm system", t); }
try { _context.peerManager().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the peer manager", t); }
try { _context.messageRegistry().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the message registry", t); }
try { _context.messageValidator().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the message validator", t); }

View File

@ -64,6 +64,7 @@ public class FIFOBandwidthLimiter {
/** lifetime counter of tokens available for use but exceeded our maxOutboundBurst size */
private final AtomicLong _totalWastedOutboundBytes = new AtomicLong();
private final FIFOBandwidthRefiller _refiller;
private final Thread _refillerThread;
private long _lastTotalSent;
private long _lastTotalReceived;
@ -91,9 +92,9 @@ public class FIFOBandwidthLimiter {
_lastTotalReceived = _totalAllocatedInboundBytes.get();
_lastStatsUpdated = now();
_refiller = new FIFOBandwidthRefiller(_context, this);
I2PThread t = new I2PThread(_refiller, "BWRefiller", true);
t.setPriority(I2PThread.NORM_PRIORITY-1);
t.start();
_refillerThread = new I2PThread(_refiller, "BWRefiller", true);
_refillerThread.setPriority(I2PThread.NORM_PRIORITY-1);
_refillerThread.start();
}
//public long getAvailableInboundBytes() { return _availableInboundBytes; }
@ -122,6 +123,19 @@ public class FIFOBandwidthLimiter {
public int getInboundBurstKBytesPerSecond() { return _refiller.getInboundBurstKBytesPerSecond(); }
public void reinitialize() {
clear();
_refiller.reinitialize();
}
/** @since 0.8.8 */
public void shutdown() {
_refiller.shutdown();
_refillerThread.interrupt();
clear();
}
/** @since 0.8.8 */
private void clear() {
_pendingInboundRequests.clear();
_pendingOutboundRequests.clear();
_availableInbound.set(0);
@ -134,7 +148,6 @@ public class FIFOBandwidthLimiter {
_unavailableOutboundBurst.set(0);
_inboundUnlimited = false;
_outboundUnlimited = false;
_refiller.reinitialize();
}
public Request createRequest() { return new SimpleRequest(); }

View File

@ -24,6 +24,7 @@ public class FIFOBandwidthRefiller implements Runnable {
private long _lastCheckConfigTime;
/** how frequently do we check the config for updates? */
private long _configCheckPeriodMs = 60*1000;
private volatile boolean _isRunning;
public static final String PROP_INBOUND_BANDWIDTH = "i2np.bandwidth.inboundKBytesPerSecond";
public static final String PROP_OUTBOUND_BANDWIDTH = "i2np.bandwidth.outboundKBytesPerSecond";
@ -67,12 +68,19 @@ public class FIFOBandwidthRefiller implements Runnable {
_context = context;
_log = context.logManager().getLog(FIFOBandwidthRefiller.class);
reinitialize();
_isRunning = true;
}
/** @since 0.8.8 */
public void shutdown() {
_isRunning = false;
}
public void run() {
// bootstrap 'em with nothing
_lastRefillTime = _limiter.now();
List<FIFOBandwidthLimiter.Request> buffer = new ArrayList(2);
while (true) {
while (_isRunning) {
long now = _limiter.now();
if (now >= _lastCheckConfigTime + _configCheckPeriodMs) {
checkConfig();

View File

@ -113,6 +113,11 @@ class PacketHandler {
return rv.toString();
}
/** @since 0.8.8 */
int getHandlerCount() {
return _handlers.length;
}
/** the packet is from a peer we are establishing an outbound con to, but failed validation, so fallback */
private static final short OUTBOUND_FALLBACK = 1;
/** the packet is from a peer we are establishing an inbound con to, but failed validation, so fallback */

View File

@ -147,6 +147,7 @@ class UDPEndpoint {
/**
* Blocking call to receive the next inbound UDP packet from any peer.
* @return null if we have shut down
*/
public UDPPacket receive() {
if (_receiver == null)

View File

@ -58,9 +58,11 @@ class UDPReceiver {
public void shutdown() {
_keepRunning = false;
_inboundQueue.clear();
for (int i = 0; i < _transport.getPacketHandlerCount(); i++) {
UDPPacket poison = UDPPacket.acquire(_context, false);
poison.setMessageType(TYPE_POISON);
_inboundQueue.offer(poison);
}
for (int i = 1; i <= 5 && !_inboundQueue.isEmpty(); i++) {
try {
Thread.sleep(i * 50);

View File

@ -1367,6 +1367,15 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
return "";
}
/** @since 0.8.8 */
int getPacketHandlerCount() {
PacketHandler handler = _handler;
if (handler != null)
return handler.getHandlerCount();
else
return 0;
}
private static final int DROP_INACTIVITY_TIME = 60*1000;
public void failed(OutboundMessageState msg) { failed(msg, true); }

View File

@ -120,7 +120,9 @@ public class HTTPMUSocket
return true;
try {
ssdpMultiSock.leaveGroup(ssdpMultiGroup, ssdpMultiIf);
// I2P close it instead of leaving group so the thread dies
//ssdpMultiSock.leaveGroup(ssdpMultiGroup, ssdpMultiIf);
ssdpMultiSock.close();
ssdpMultiSock = null;
}
catch (Exception e) {