startup/shutdown synchronization in several places

This commit is contained in:
zzz
2012-09-16 15:47:36 +00:00
parent b6a5360390
commit 259c28f8c1
30 changed files with 131 additions and 118 deletions

View File

@ -324,14 +324,14 @@ public class InNetMessagePool implements Service {
public void renderStatusHTML(Writer out) {}
/** does nothing since we aren't threaded */
public void restart() {
public synchronized void restart() {
shutdown();
try { Thread.sleep(100); } catch (InterruptedException ie) {}
startup();
}
/** does nothing since we aren't threaded */
public void shutdown() {
public synchronized void shutdown() {
_alive = false;
if (!DISPATCH_DIRECT) {
synchronized (_pendingDataMessages) {
@ -343,7 +343,7 @@ public class InNetMessagePool implements Service {
}
/** does nothing since we aren't threaded */
public void startup() {
public synchronized void startup() {
_alive = true;
_dispatchThreaded = DEFAULT_DISPATCH_THREADED;
String threadedStr = _context.getProperty(PROP_DISPATCH_THREADED);

View File

@ -65,7 +65,7 @@ public class MessageHistory {
}
/** @since 0.8.12 */
public void shutdown() {
public synchronized void shutdown() {
if (_doLog)
addEntry(getPrefix() + "** Router shutdown");
_doPause = false;
@ -89,7 +89,7 @@ public class MessageHistory {
* Call this whenever the router identity changes.
*
*/
public void initialize(boolean forceReinitialize) {
public synchronized void initialize(boolean forceReinitialize) {
if (!forceReinitialize) return;
if (_context.router().getRouterInfo() == null) {

View File

@ -94,11 +94,11 @@ public class MessageValidator {
return dup;
}
public void startup() {
public synchronized void startup() {
_filter = new DecayingHashSet(_context, (int)Router.CLOCK_FUDGE_FACTOR * 2, 8, "RouterMV");
}
void shutdown() {
synchronized void shutdown() {
_filter.stopDecaying();
}
}

View File

@ -66,6 +66,7 @@ public class Router implements RouterClock.ClockShiftListener {
private String _configFilename;
private RouterInfo _routerInfo;
public final Object routerInfoFileLock = new Object();
private final Object _configFileLock = new Object();
private long _started;
private boolean _higherVersionSeen;
//private SessionKeyPersistenceHelper _sessionKeyPersistenceHelper;
@ -413,7 +414,7 @@ public class Router implements RouterClock.ClockShiftListener {
* Most users will just call main() instead.
* @since public as of 0.9 for Android and other embedded uses
*/
public void runRouter() {
public synchronized void runRouter() {
if (_isAlive)
throw new IllegalStateException();
String last = _config.get("router.previousFullVersion");
@ -468,16 +469,19 @@ public class Router implements RouterClock.ClockShiftListener {
*
* This is synchronized with saveConfig()
*/
public synchronized void readConfig() {
public void readConfig() {
synchronized(_configFileLock) {
String f = getConfigFilename();
Properties config = getConfig(_context, f);
// to avoid compiler errror
Map foo = _config;
foo.putAll(config);
}
}
/**
* this does not use ctx.getConfigDir(), must provide a full path in filename
* Caller must synchronize
*
* @param ctx will be null at startup when called from constructor
*/
@ -518,6 +522,7 @@ public class Router implements RouterClock.ClockShiftListener {
* has changed.
*/
public void rebuildRouterInfo() { rebuildRouterInfo(false); }
public void rebuildRouterInfo(boolean blockingRebuild) {
if (_log.shouldLog(Log.INFO))
_log.info("Rebuilding new routerInfo");
@ -698,7 +703,7 @@ public class Router implements RouterClock.ClockShiftListener {
* files, then reboot the router.
*
*/
public void rebuildNewIdentity() {
public synchronized void rebuildNewIdentity() {
if (_shutdownHook != null) {
try {
Runtime.getRuntime().removeShutdownHook(_shutdownHook);
@ -747,7 +752,7 @@ public class Router implements RouterClock.ClockShiftListener {
/**
* Shutdown with no chance of cancellation
*/
public void shutdown(int exitCode) {
public synchronized void shutdown(int exitCode) {
if (_shutdownInProgress)
return;
_shutdownInProgress = true;
@ -765,7 +770,7 @@ public class Router implements RouterClock.ClockShiftListener {
* Called by the ShutdownHook.
* NOT to be called by others, use shutdown().
*/
public void shutdown2(int exitCode) {
public synchronized void shutdown2(int exitCode) {
// help us shut down esp. after OOM
int priority = (exitCode == EXIT_OOM) ? Thread.MAX_PRIORITY - 1 : Thread.NORM_PRIORITY + 2;
Thread.currentThread().setPriority(priority);
@ -862,7 +867,7 @@ public class Router implements RouterClock.ClockShiftListener {
/**
* Cancel the JVM runtime hook before calling this.
*/
private void finalShutdown(int exitCode) {
private synchronized void finalShutdown(int exitCode) {
clearCaches();
_log.log(Log.CRIT, "Shutdown(" + exitCode + ") complete" /* , new Exception("Shutdown") */ );
try { _context.logManager().shutdown(); } catch (Throwable t) { }
@ -977,7 +982,8 @@ public class Router implements RouterClock.ClockShiftListener {
*
* Synchronized with file read in getConfig()
*/
public synchronized boolean saveConfig() {
public boolean saveConfig() {
synchronized(_configFileLock) {
FileOutputStream fos = null;
try {
fos = new SecureFileOutputStream(_configFilename);
@ -1008,6 +1014,7 @@ public class Router implements RouterClock.ClockShiftListener {
return true;
}
}
/**
* Updates the current config and then saves it.
@ -1019,13 +1026,15 @@ public class Router implements RouterClock.ClockShiftListener {
* @return success
* @since 0.8.13
*/
public synchronized boolean saveConfig(String name, String value) {
public boolean saveConfig(String name, String value) {
synchronized(_configFileLock) {
if (value != null)
_config.put(name, value);
else
removeConfigSetting(name);
return saveConfig();
}
}
/**
* Updates the current config and then saves it.
@ -1037,7 +1046,8 @@ public class Router implements RouterClock.ClockShiftListener {
* @return success
* @since 0.8.13
*/
public synchronized boolean saveConfig(Map toAdd, Collection<String> toRemove) {
public boolean saveConfig(Map toAdd, Collection<String> toRemove) {
synchronized(_configFileLock) {
if (toAdd != null)
_config.putAll(toAdd);
if (toRemove != null) {
@ -1047,6 +1057,7 @@ public class Router implements RouterClock.ClockShiftListener {
}
return saveConfig();
}
}
/**
* The clock shift listener.

View File

@ -89,7 +89,7 @@ class ClientManager {
_isStarted = true;
}
public void restart() {
public synchronized void restart() {
shutdown("Router restart");
// to let the old listener die
@ -103,7 +103,7 @@ class ClientManager {
/**
* @param msg message to send to the clients
*/
public void shutdown(String msg) {
public synchronized void shutdown(String msg) {
_isStarted = false;
_log.info("Shutting down the ClientManager");
if (_listener != null)

View File

@ -52,13 +52,13 @@ public class ClientManagerFacadeImpl extends ClientManagerFacade implements Inte
//_log.debug("Client manager facade created");
}
public void startup() {
public synchronized void startup() {
_log.info("Starting up the client subsystem");
int port = _context.getProperty(PROP_CLIENT_PORT, DEFAULT_PORT);
_manager = new ClientManager(_context, port);
}
public void shutdown() {
public synchronized void shutdown() {
shutdown("Router shutdown");
}
@ -66,12 +66,12 @@ public class ClientManagerFacadeImpl extends ClientManagerFacade implements Inte
* @param msg message to send to the clients
* @since 0.8.8
*/
public void shutdown(String msg) {
public synchronized void shutdown(String msg) {
if (_manager != null)
_manager.shutdown(msg);
}
public void restart() {
public synchronized void restart() {
if (_manager != null)
_manager.restart();
else

View File

@ -65,7 +65,7 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
}
@Override
public void startup() {
public synchronized void startup() {
super.startup();
_context.jobQueue().addJob(new FloodfillMonitorJob(_context, this));
_lookupThrottler = new LookupThrottler();
@ -87,7 +87,7 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
* @since 0.8.9
*/
@Override
public void shutdown() {
public synchronized void shutdown() {
if (_floodfillEnabled) {
// turn off to build a new RI...
_floodfillEnabled = false;

View File

@ -198,7 +198,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
_context.statManager().addRateData("netDb.exploreKeySet", _exploreKeys.size(), 0);
}
public void shutdown() {
public synchronized void shutdown() {
_initialized = false;
if (_kb != null)
_kb.clear();
@ -212,7 +212,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
// _exploreKeys = null;
}
public void restart() {
public synchronized void restart() {
_dbDir = _context.router().getConfigSetting(PROP_DB_DIR);
if (_dbDir == null) {
_log.info("No DB dir specified [" + PROP_DB_DIR + "], using [" + DEFAULT_DB_DIR + "]");
@ -240,7 +240,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
String getDbDir() { return _dbDir; }
public void startup() {
public synchronized void startup() {
_log.info("Starting up the kademlia network database");
RouterInfo ri = _context.router().getRouterInfo();
String dbDir = _context.getProperty(PROP_DB_DIR, DEFAULT_DB_DIR);

View File

@ -38,14 +38,14 @@ public class PeerManagerFacadeImpl implements PeerManagerFacade {
_testJob = new PeerTestJob(_context);
}
public void startup() {
public synchronized void startup() {
_log.info("Starting up the peer manager");
_manager = new PeerManager(_context);
_persistenceHelper.setUs(_context.routerHash());
_testJob.startTesting(_manager);
}
public void shutdown() {
public synchronized void shutdown() {
_log.info("Shutting down the peer manager");
_testJob.stopTesting();
if (_manager != null) {
@ -54,7 +54,7 @@ public class PeerManagerFacadeImpl implements PeerManagerFacade {
}
}
public void restart() {
public synchronized void restart() {
_manager.storeProfiles();
_persistenceHelper.setUs(_context.routerHash());
_manager.loadProfiles();

View File

@ -50,7 +50,7 @@ public class PeerTestJob extends JobImpl {
private int getTestConcurrency() { return 1; }
// FIXME Exporting non-public type through public API FIXME
public void startTesting(PeerManager manager) {
public synchronized void startTesting(PeerManager manager) {
_manager = manager;
_keepTesting = true;
this.getTiming().setStartAfter(getContext().clock().now() + DEFAULT_PEER_TEST_DELAY);
@ -58,7 +58,8 @@ public class PeerTestJob extends JobImpl {
if (_log.shouldLog(Log.INFO))
_log.info("Start testing peers");
}
public void stopTesting() {
public synchronized void stopTesting() {
_keepTesting = false;
if (_log.shouldLog(Log.INFO))
_log.info("Stop testing peers");

View File

@ -49,7 +49,7 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
startGeoIP();
}
public void startup() {
public synchronized void startup() {
_log.info("Starting up the comm system");
_manager = new TransportManager(_context);
_manager.startListening();
@ -59,13 +59,13 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
/**
* Cannot be restarted.
*/
public void shutdown() {
public synchronized void shutdown() {
if (_manager != null)
_manager.shutdown();
_geoIP.shutdown();
}
public void restart() {
public synchronized void restart() {
if (_manager == null)
startup();
else

View File

@ -140,13 +140,13 @@ public class FIFOBandwidthLimiter {
/** The configured maximum, not the current rate */
public int getInboundBurstKBytesPerSecond() { return _refiller.getInboundBurstKBytesPerSecond(); }
public void reinitialize() {
public synchronized void reinitialize() {
clear();
_refiller.reinitialize();
}
/** @since 0.8.8 */
public void shutdown() {
public synchronized void shutdown() {
_refiller.shutdown();
_refillerThread.interrupt();
clear();

View File

@ -85,7 +85,7 @@ public class FIFOBandwidthRefiller implements Runnable {
}
/** @since 0.8.8 */
void shutdown() {
synchronized void shutdown() {
_isRunning = false;
}
@ -111,7 +111,7 @@ public class FIFOBandwidthRefiller implements Runnable {
}
}
void reinitialize() {
synchronized void reinitialize() {
_lastRefillTime = _limiter.now();
checkConfig();
_lastCheckConfigTime = _lastRefillTime;

View File

@ -136,7 +136,7 @@ public class TransportManager implements TransportEventListener {
t.forwardPortStatus(port, externalPort, success, reason);
}
public void startListening() {
public synchronized void startListening() {
if (_dhThread.getState() == Thread.State.NEW)
_dhThread.start();
// For now, only start UPnP if we have no publicly-routable addresses
@ -159,7 +159,7 @@ public class TransportManager implements TransportEventListener {
_context.router().rebuildRouterInfo();
}
public void restart() {
public synchronized void restart() {
stopListening();
try { Thread.sleep(5*1000); } catch (InterruptedException ie) {}
startListening();
@ -168,7 +168,7 @@ public class TransportManager implements TransportEventListener {
/**
* Can be restarted.
*/
public void stopListening() {
public synchronized void stopListening() {
if (_upnpManager != null)
_upnpManager.stop();
for (Transport t : _transports.values()) {
@ -182,7 +182,7 @@ public class TransportManager implements TransportEventListener {
* Cannot be restarted.
* @since 0.9
*/
public void shutdown() {
public synchronized void shutdown() {
stopListening();
_dhThread.shutdown();
Addresses.clearCaches();

View File

@ -46,12 +46,12 @@ class NTCPSendFinisher {
//_context.statManager().createRateStat("ntcp.sendFinishTime", "How long to queue and excecute msg.afterSend()", "ntcp", new long[] {5*1000});
}
public void start() {
public synchronized void start() {
_count = 0;
_executor = new CustomThreadPoolExecutor(THREADS);
}
public void stop() {
public synchronized void stop() {
if (_executor != null)
_executor.shutdownNow();
}

View File

@ -36,7 +36,7 @@ class Reader {
_readAfterLive = new HashSet(8);
}
public void startReading(int numReaders) {
public synchronized void startReading(int numReaders) {
for (int i = 1; i <= numReaders; i++) {
Runner r = new Runner();
I2PThread t = new I2PThread(r, "NTCP reader " + i + '/' + numReaders, true);
@ -45,7 +45,7 @@ class Reader {
}
}
public void stopReading() {
public synchronized void stopReading() {
while (!_runners.isEmpty()) {
Runner r = _runners.remove(0);
r.stop();

View File

@ -32,7 +32,7 @@ class Writer {
_writeAfterLive = new HashSet(5);
}
public void startWriting(int numWriters) {
public synchronized void startWriting(int numWriters) {
for (int i = 1; i <=numWriters; i++) {
Runner r = new Runner();
I2PThread t = new I2PThread(r, "NTCP writer " + i + '/' + numWriters, true);
@ -40,7 +40,7 @@ class Writer {
t.start();
}
}
public void stopWriting() {
public synchronized void stopWriting() {
while (!_runners.isEmpty()) {
Runner r = _runners.remove(0);
r.stop();

View File

@ -53,14 +53,14 @@ class ACKSender implements Runnable {
_peersToACK.offer(peer);
}
public void startup() {
public synchronized void startup() {
_alive = true;
_peersToACK.clear();
I2PThread t = new I2PThread(this, "UDP ACK sender", true);
t.start();
}
public void shutdown() {
public synchronized void shutdown() {
_alive = false;
PeerState poison = new PeerState(_context, _transport, null, 0, null, false);
poison.setTheyRelayToUsAs(POISON_PS);

View File

@ -157,13 +157,13 @@ class EstablishmentManager {
//_context.statManager().createRateStat("udp.queueAllowTotalLifetime", "When a peer is retransmitting and we probabalistically allow a new message, what is the sum of the pending message lifetimes? (period is the new message's lifetime)?", "udp", UDPTransport.RATES);
}
public void startup() {
public synchronized void startup() {
_alive = true;
I2PThread t = new I2PThread(new Establisher(), "UDP Establisher", true);
t.start();
}
public void shutdown() {
public synchronized void shutdown() {
_alive = false;
notifyActivity();
}

View File

@ -48,7 +48,7 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
_context.statManager().createRateStat("udp.receivePiggyback", "How many acks were included in a packet with data fragments (time == # data fragments)", "udp", UDPTransport.RATES);
}
public void startup() {
public synchronized void startup() {
_alive = true;
// may want to extend the DecayingBloomFilter so we can use a smaller
// array size (currently its tuned for 10 minute rates for the
@ -57,7 +57,8 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
_ackSender.startup();
_messageReceiver.startup();
}
public void shutdown() {
public synchronized void shutdown() {
_alive = false;
if (_recentlyCompletedMessages != null)
_recentlyCompletedMessages.stopDecaying();

View File

@ -69,7 +69,7 @@ class MessageReceiver {
_alive = true;
}
public void startup() {
public synchronized void startup() {
_alive = true;
for (int i = 0; i < _threadCount; i++) {
I2PThread t = new I2PThread(new Runner(), "UDP message receiver " + (i+1) + '/' + _threadCount, true);
@ -83,7 +83,7 @@ class MessageReceiver {
public void run() { loop(_handler); }
}
public void shutdown() {
public synchronized void shutdown() {
_alive = false;
_completeMessages.clear();
for (int i = 0; i < _threadCount; i++) {

View File

@ -92,9 +92,9 @@ class OutboundMessageFragments {
_context.statManager().createRateStat("udp.sendCycleTimeSlow", "How long it takes to cycle through all of the active messages, when its going slowly?", "udp", UDPTransport.RATES);
}
public void startup() { _alive = true; }
public synchronized void startup() { _alive = true; }
public void shutdown() {
public synchronized void shutdown() {
_alive = false;
_activePeers.clear();
synchronized (_activePeers) {

View File

@ -91,7 +91,7 @@ class PacketHandler {
//_context.statManager().createRateStat("udp.receivePacketSize.relayResponse", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
}
public void startup() {
public synchronized void startup() {
_keepReading = true;
for (int i = 0; i < _handlers.length; i++) {
I2PThread t = new I2PThread(_handlers[i], "UDP Packet handler " + (i+1) + '/' + _handlers.length, true);
@ -99,7 +99,7 @@ class PacketHandler {
}
}
public void shutdown() {
public synchronized void shutdown() {
_keepReading = false;
}

View File

@ -23,13 +23,13 @@ class PacketPusher implements Runnable {
_sender = sender;
}
public void startup() {
public synchronized void startup() {
_alive = true;
I2PThread t = new I2PThread(this, "UDP packet pusher", true);
t.start();
}
public void shutdown() { _alive = false; }
public synchronized void shutdown() { _alive = false; }
public void run() {
while (_alive) {

View File

@ -59,14 +59,14 @@ class UDPReceiver {
_context.statManager().createRateStat("udp.ignorePacketFromDroplist", "Packet lifetime for those dropped on the drop list", "udp", UDPTransport.RATES);
}
public void startup() {
public synchronized void startup() {
//adjustDropProbability();
_keepRunning = true;
I2PThread t = new I2PThread(_runner, _name + '.' + _id, true);
t.start();
}
public void shutdown() {
public synchronized void shutdown() {
_keepRunning = false;
_inboundQueue.clear();
for (int i = 0; i < _transport.getPacketHandlerCount(); i++) {

View File

@ -64,7 +64,7 @@ class UDPSender {
_context.statManager().createRateStat("udp.sendPacketSize." + PacketBuilder.TYPE_CREAT, "session created packet size", "udp", UDPTransport.RATES);
}
public void startup() {
public synchronized void startup() {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Starting the runner: " + _name);
_keepRunning = true;
@ -72,7 +72,7 @@ class UDPSender {
t.start();
}
public void shutdown() {
public synchronized void shutdown() {
_keepRunning = false;
_outboundQueue.clear();
UDPPacket poison = UDPPacket.acquire(_context, false);

View File

@ -245,7 +245,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_context.simpleScheduler().addPeriodicEvent(new PingIntroducers(), MIN_EXPIRE_TIMEOUT * 3 / 4);
}
public void startup() {
public synchronized void startup() {
_fragments.shutdown();
if (_pusher != null)
_pusher.shutdown();
@ -373,7 +373,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_testEvent.reschedule(10*1000); // lets requeue it for Real Soon
}
public void shutdown() {
public synchronized void shutdown() {
destroyAll();
if (_endpoint != null)
_endpoint.shutdown();

View File

@ -86,7 +86,7 @@ class BuildExecutor implements Runnable {
/**
* @since 0.9
*/
public void restart() {
public synchronized void restart() {
synchronized (_recentBuildIds) {
_recentBuildIds.clear();
}
@ -98,7 +98,7 @@ class BuildExecutor implements Runnable {
* Cannot be restarted.
* @since 0.9
*/
public void shutdown() {
public synchronized void shutdown() {
_isRunning = false;
restart();
}

View File

@ -134,7 +134,7 @@ class BuildHandler implements Runnable {
* @param numThreads the number of threads to be shut down
* @since 0.9
*/
public void shutdown(int numThreads) {
public synchronized void shutdown(int numThreads) {
_isRunning = false;
_inboundBuildMessages.clear();
BuildMessageState poison = new BuildMessageState(null, null, null);

View File

@ -390,7 +390,7 @@ public class TunnelPoolManager implements TunnelManagerFacade {
}
}
public void restart() {
public synchronized void restart() {
_handler.restart();
_executor.restart();
shutdownExploratory();
@ -504,7 +504,7 @@ public class TunnelPoolManager implements TunnelManagerFacade {
}
}
public void startup() {
public synchronized void startup() {
_isShutdown = false;
if (!_executor.isRunning()) {
I2PThread t = new I2PThread(_executor, "BuildExecutor");
@ -546,7 +546,7 @@ public class TunnelPoolManager implements TunnelManagerFacade {
/**
* Cannot be restarted
*/
public void shutdown() {
public synchronized void shutdown() {
_handler.shutdown(_numHandlerThreads);
_executor.shutdown();
shutdownExploratory();