propagate from branch 'i2p.i2p' (head 59d72ad41332fbcf2829c85f6b3e38aaca9ee528)

to branch 'i2p.i2p.zzz.android' (head f7e9a993c660229cca575d26a0ba06eea36cea8e)
This commit is contained in:
zzz
2011-06-23 17:42:29 +00:00
52 changed files with 767 additions and 202 deletions

View File

@ -22,8 +22,8 @@ import net.i2p.util.Log;
*
*/
public class I2NPMessageHandler {
private Log _log;
private I2PAppContext _context;
private final Log _log;
private final I2PAppContext _context;
private long _lastReadBegin;
private long _lastReadEnd;
private int _lastSize;

View File

@ -28,8 +28,8 @@ import net.i2p.util.SimpleByteCache;
* @author jrandom
*/
public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPMessage {
private Log _log;
protected I2PAppContext _context;
private final Log _log;
protected final I2PAppContext _context;
private long _expiration;
private long _uniqueId;

View File

@ -280,14 +280,22 @@ public class JobQueue {
void shutdown() {
_alive = false;
_timedJobs.clear();
_readyJobs.clear();
synchronized (_jobLock) {
_timedJobs.clear();
_readyJobs.clear();
_jobLock.notifyAll();
}
// The JobQueueRunners are NOT daemons,
// so they must be stopped.
Job poison = new PoisonJob();
for (int i = 0; i < _queueRunners.size(); i++)
for (JobQueueRunner runner : _queueRunners.values()) {
runner.stopRunning();
_readyJobs.offer(poison);
// TODO interrupt thread for each runner
}
_queueRunners.clear();
_jobStats.clear();
_runnerId = 0;
/********
if (_log.shouldLog(Log.WARN)) {

View File

@ -45,10 +45,12 @@ import net.i2p.stat.RateStat;
import net.i2p.stat.StatManager;
import net.i2p.util.ByteCache;
import net.i2p.util.FileUtil;
import net.i2p.util.FortunaRandomSource;
import net.i2p.util.I2PAppThread;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
import net.i2p.util.SecureFileOutputStream;
import net.i2p.util.SimpleByteCache;
import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer;
@ -73,6 +75,8 @@ public class Router {
private I2PThread.OOMEventListener _oomListener;
private ShutdownHook _shutdownHook;
private final I2PThread _gracefulShutdownDetector;
private final RouterWatchdog _watchdog;
private final Thread _watchdogThread;
public final static String PROP_CONFIG_FILE = "router.configLocation";
@ -187,6 +191,19 @@ public class Router {
// Save this in the context for the logger and apps that need it
envProps.setProperty("i2p.systemTimeZone", originalTimeZoneID);
// Make darn sure we don't have a leftover I2PAppContext in the same JVM
// e.g. on Android - see finalShutdown() also
List<RouterContext> contexts = RouterContext.getContexts();
if (contexts.isEmpty()) {
RouterContext.killGlobalContext();
} else if (System.getProperty("java.vendor").contains("Android")) {
System.err.println("Warning: Killing " + contexts.size() + " other routers in this JVM");
contexts.clear();
RouterContext.killGlobalContext();
} else {
System.err.println("Warning: " + contexts.size() + " other routers in this JVM");
}
// The important thing that happens here is the directory paths are set and created
// i2p.dir.router defaults to i2p.dir.config
// i2p.dir.app defaults to i2p.dir.router
@ -257,7 +274,7 @@ public class Router {
_killVMOnEnd = true;
_oomListener = new I2PThread.OOMEventListener() {
public void outOfMemory(OutOfMemoryError oom) {
ByteCache.clearAll();
clearCaches();
_log.log(Log.CRIT, "Thread ran out of memory", oom);
for (int i = 0; i < 5; i++) { // try this 5 times, in case it OOMs
try {
@ -275,11 +292,18 @@ public class Router {
_gracefulShutdownDetector = new I2PAppThread(new GracefulShutdown(), "Graceful shutdown hook", true);
_gracefulShutdownDetector.start();
Thread watchdog = new I2PAppThread(new RouterWatchdog(_context), "RouterWatchdog", true);
watchdog.start();
_watchdog = new RouterWatchdog(_context);
_watchdogThread = new I2PAppThread(_watchdog, "RouterWatchdog", true);
_watchdogThread.start();
}
/** @since 0.8.8 */
private static final void clearCaches() {
ByteCache.clearAll();
SimpleByteCache.clearAll();
}
/**
* Configure the router to kill the JVM when the router shuts down, as well
* as whether to explicitly halt the JVM during the hard fail process.
@ -616,12 +640,15 @@ public class Router {
public void rebuildNewIdentity() {
killKeys();
for (Runnable task : _context.getShutdownTasks()) {
if (_log.shouldLog(Log.WARN))
_log.warn("Running shutdown task " + task.getClass());
try {
task.run();
} catch (Throwable t) {
_log.log(Log.CRIT, "Error running shutdown task", t);
}
}
_context.removeShutdownTasks();
// hard and ugly
if (System.getProperty("wrapper.version") != null)
_log.log(Log.CRIT, "Restarting with new router identity");
@ -632,7 +659,10 @@ public class Router {
private void warmupCrypto() {
_context.random().nextBoolean();
new DHSessionKeyBuilder(); // load the class so it starts the precalc process
// Use restart() to refire the static refiller threads, in case
// we are restarting the router in the same JVM (Android)
DHSessionKeyBuilder.restart();
_context.elGamalEngine().restart();
}
private void startupQueue() {
@ -938,12 +968,15 @@ public class Router {
// Run the shutdown hooks first in case they want to send some goodbye messages
// Maybe we need a delay after this too?
for (Runnable task : _context.getShutdownTasks()) {
if (_log.shouldLog(Log.WARN))
_log.warn("Running shutdown task " + task.getClass());
try {
task.run();
} catch (Throwable t) {
_log.log(Log.CRIT, "Error running shutdown task", t);
}
}
_context.removeShutdownTasks();
try { _context.clientManager().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the client manager", t); }
try { _context.namingService().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the naming service", t); }
try { _context.jobQueue().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the job queue", t); }
@ -953,13 +986,37 @@ public class Router {
try { _context.tunnelDispatcher().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the tunnel dispatcher", t); }
try { _context.netDb().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the networkDb", t); }
try { _context.commSystem().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the comm system", t); }
try { _context.bandwidthLimiter().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the comm system", t); }
try { _context.peerManager().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the peer manager", t); }
try { _context.messageRegistry().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the message registry", t); }
try { _context.messageValidator().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the message validator", t); }
try { _context.inNetMessagePool().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the inbound net pool", t); }
//try { _sessionKeyPersistenceHelper.shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the session key manager", t); }
_context.deleteTempDir();
RouterContext.listContexts().remove(_context);
List<RouterContext> contexts = RouterContext.getContexts();
contexts.remove(_context);
// shut down I2PAppContext tasks here
// If there are multiple routers in the JVM, we don't want to do this
// to the DH or YK tasks, as they are singletons.
if (contexts.isEmpty()) {
try {
DHSessionKeyBuilder.shutdown();
} catch (Throwable t) { _log.log(Log.CRIT, "Error shutting DH", t); }
try {
_context.elGamalEngine().shutdown();
} catch (Throwable t) { _log.log(Log.CRIT, "Error shutting elGamal", t); }
} else {
_log.logAlways(Log.WARN, "Warning - " + contexts.size() + " routers remaining in this JVM, not releasing all resources");
}
try {
((FortunaRandomSource)_context.random()).shutdown();
} catch (Throwable t) { _log.log(Log.CRIT, "Error shutting random()", t); }
// logManager shut down in finalShutdown()
_watchdog.shutdown();
_watchdogThread.interrupt();
finalShutdown(exitCode);
}
@ -970,6 +1027,7 @@ public class Router {
private static final boolean ALLOW_DYNAMIC_KEYS = false;
private void finalShutdown(int exitCode) {
clearCaches();
_log.log(Log.CRIT, "Shutdown(" + exitCode + ") complete" /* , new Exception("Shutdown") */ );
try { _context.logManager().shutdown(); } catch (Throwable t) { }
if (ALLOW_DYNAMIC_KEYS) {
@ -979,6 +1037,20 @@ public class Router {
File f = getPingFile();
f.delete();
if (RouterContext.getContexts().isEmpty())
RouterContext.killGlobalContext();
// Since 0.8.8, mainly for Android
for (Runnable task : _context.getFinalShutdownTasks()) {
System.err.println("Running final shutdown task " + task.getClass());
try {
task.run();
} catch (Throwable t) {
System.err.println("Running final shutdown task " + t);
}
}
_context.getFinalShutdownTasks().clear();
if (_killVMOnEnd) {
try { Thread.sleep(1000); } catch (InterruptedException ie) {}
Runtime.getRuntime().halt(exitCode);
@ -1537,7 +1609,7 @@ private static class CoalesceStatsEvent implements SimpleTimer.TimedEvent {
long used = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
getContext().statManager().addRateData("router.memoryUsed", used, 0);
if (_maxMemory - used < LOW_MEMORY_THRESHOLD)
ByteCache.clearAll();
clearCaches();
getContext().tunnelDispatcher().updateParticipatingStats(COALESCE_TIME);

View File

@ -1,8 +1,11 @@
package net.i2p.router;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import net.i2p.I2PAppContext;
import net.i2p.data.Hash;
@ -55,6 +58,7 @@ public class RouterContext extends I2PAppContext {
private MessageValidator _messageValidator;
private MessageStateMonitor _messageStateMonitor;
private RouterThrottle _throttle;
private final Set<Runnable> _finalShutdownTasks;
private static List<RouterContext> _contexts = new ArrayList(1);
@ -67,7 +71,10 @@ public class RouterContext extends I2PAppContext {
// to init everything. Caller MUST call initAll() afterwards.
// Sorry, this breaks some main() unit tests out there.
//initAll();
if (!_contexts.isEmpty())
System.err.println("Warning - More than one router in this JVM");
_contexts.add(this);
_finalShutdownTasks = new CopyOnWriteArraySet();
}
/**
@ -165,11 +172,37 @@ public class RouterContext extends I2PAppContext {
/**
* Retrieve the list of router contexts currently instantiated in this JVM.
* This will always contain only one item (except when a simulation per the
* MultiRouter is going on), and the list should only be modified when a new
* MultiRouter is going on).
*
* @return an unmodifiable list (as of 0.8.8). May be null or empty.
*/
public static List<RouterContext> listContexts() {
return Collections.unmodifiableList(_contexts);
}
/**
* Same as listContexts() but package private and modifiable.
* The list should only be modified when a new
* context is created or a router is shut down.
*
* @since 0.8.8
*/
public static List<RouterContext> listContexts() { return _contexts; }
static List<RouterContext> getContexts() {
return _contexts;
}
/**
* Kill the global I2PAppContext, so it isn't still around
* when we restart in the same JVM (Android).
* Only do this if there are no other routers in the JVM.
*
* @since 0.8.8
*/
static void killGlobalContext() {
synchronized (I2PAppContext.class) {
_globalAppContext = null;
}
}
/** what router is this context working for? */
public Router router() { return _router; }
@ -402,6 +435,32 @@ public class RouterContext extends I2PAppContext {
}
}
/**
* @since 0.8.8
*/
void removeShutdownTasks() {
_shutdownTasks.clear();
}
/**
* The last thing to be called before router shutdown.
* No context resources, including logging, will be available.
* Only for external threads in the same JVM needing to know when
* the shutdown is complete, like Android.
* @since 0.8.8
*/
public void addFinalShutdownTask(Runnable task) {
_finalShutdownTasks.add(task);
}
/**
* @return the Set
* @since 0.8.8
*/
Set<Runnable> getFinalShutdownTasks() {
return _finalShutdownTasks;
}
/**
* Use this instead of context instanceof RouterContext
* @return true

View File

@ -15,14 +15,21 @@ class RouterWatchdog implements Runnable {
private final Log _log;
private final RouterContext _context;
private int _consecutiveErrors;
private volatile boolean _isRunning;
private static final long MAX_JOB_RUN_LAG = 60*1000;
public RouterWatchdog(RouterContext ctx) {
_context = ctx;
_log = ctx.logManager().getLog(RouterWatchdog.class);
_isRunning = true;
}
/** @since 0.8.8 */
public void shutdown() {
_isRunning = false;
}
public boolean verifyJobQueueLiveliness() {
long when = _context.jobQueue().getLastJobBegin();
if (when < 0)
@ -109,7 +116,7 @@ class RouterWatchdog implements Runnable {
}
public void run() {
while (true) {
while (_isRunning) {
try { Thread.sleep(60*1000); } catch (InterruptedException ie) {}
monitorRouter();
}

View File

@ -359,6 +359,9 @@ class PersistentDataStore extends TransientDataStore {
if (routerInfoFiles.length > 5)
_alreadyWarned = false;
for (int i = 0; i < routerInfoFiles.length; i++) {
// drop out if the router gets killed right after startup
if (!_context.router().isAlive())
break;
Hash key = getRouterInfoHash(routerInfoFiles[i].getName());
if ( (key != null) && (!isKnown(key)) ) {
// Run it inline so we don't clog up the job queue, esp. at startup

View File

@ -52,6 +52,11 @@ class PeerManager {
private static final long REORGANIZE_TIME_MEDIUM = 123*1000;
private static final long REORGANIZE_TIME_LONG = 551*1000;
/**
* Warning - this loads all the profiles in the constructor.
* This may take a long time - 30 seconds or more.
* Instantiate this in a Job or Thread.
*/
public PeerManager(RouterContext context) {
_context = context;
_log = context.logManager().getLog(PeerManager.class);
@ -99,6 +104,14 @@ class PeerManager {
}
}
/** @since 0.8.8 */
void clearProfiles() {
_organizer.clearProfiles();
_capabilitiesByPeer.clear();
for (int i = 0; i < _peersByCapability.length; i++)
_peersByCapability[i].clear();
}
Set selectPeers() {
return _organizer.selectAllPeers();
}
@ -111,6 +124,9 @@ class PeerManager {
_persistenceHelper.writeProfile(prof);
}
/**
* This may take a long time - 30 seconds or more
*/
void loadProfiles() {
Set<PeerProfile> profiles = _persistenceHelper.readProfiles();
for (Iterator<PeerProfile> iter = profiles.iterator(); iter.hasNext();) {

View File

@ -47,8 +47,10 @@ public class PeerManagerFacadeImpl implements PeerManagerFacade {
public void shutdown() {
_log.info("Shutting down the peer manager");
_testJob.stopTesting();
if (_manager != null)
if (_manager != null) {
_manager.storeProfiles();
_manager.clearProfiles();
}
}
public void restart() {

View File

@ -227,6 +227,19 @@ public class ProfileOrganizer {
public boolean isWellIntegrated(Hash peer) { return isX(_wellIntegratedPeers, peer); }
public boolean isFailing(Hash peer) { return isX(_failingPeers, peer); }
/** @since 0.8.8 */
void clearProfiles() {
getReadLock();
try {
_failingPeers.clear();
_fastPeers.clear();
_highCapacityPeers.clear();
_notFailingPeers.clear();
_notFailingPeersList.clear();
_wellIntegratedPeers.clear();
} finally { releaseReadLock(); }
}
/**
* if a peer sends us more than 5 replies in a searchReply that we cannot
* fetch, stop listening to them.

View File

@ -64,6 +64,7 @@ public class FIFOBandwidthLimiter {
/** lifetime counter of tokens available for use but exceeded our maxOutboundBurst size */
private final AtomicLong _totalWastedOutboundBytes = new AtomicLong();
private final FIFOBandwidthRefiller _refiller;
private final Thread _refillerThread;
private long _lastTotalSent;
private long _lastTotalReceived;
@ -91,9 +92,9 @@ public class FIFOBandwidthLimiter {
_lastTotalReceived = _totalAllocatedInboundBytes.get();
_lastStatsUpdated = now();
_refiller = new FIFOBandwidthRefiller(_context, this);
I2PThread t = new I2PThread(_refiller, "BWRefiller", true);
t.setPriority(I2PThread.NORM_PRIORITY-1);
t.start();
_refillerThread = new I2PThread(_refiller, "BWRefiller", true);
_refillerThread.setPriority(I2PThread.NORM_PRIORITY-1);
_refillerThread.start();
}
//public long getAvailableInboundBytes() { return _availableInboundBytes; }
@ -122,6 +123,19 @@ public class FIFOBandwidthLimiter {
public int getInboundBurstKBytesPerSecond() { return _refiller.getInboundBurstKBytesPerSecond(); }
public void reinitialize() {
clear();
_refiller.reinitialize();
}
/** @since 0.8.8 */
public void shutdown() {
_refiller.shutdown();
_refillerThread.interrupt();
clear();
}
/** @since 0.8.8 */
private void clear() {
_pendingInboundRequests.clear();
_pendingOutboundRequests.clear();
_availableInbound.set(0);
@ -134,7 +148,6 @@ public class FIFOBandwidthLimiter {
_unavailableOutboundBurst.set(0);
_inboundUnlimited = false;
_outboundUnlimited = false;
_refiller.reinitialize();
}
public Request createRequest() { return new SimpleRequest(); }

View File

@ -24,6 +24,7 @@ public class FIFOBandwidthRefiller implements Runnable {
private long _lastCheckConfigTime;
/** how frequently do we check the config for updates? */
private long _configCheckPeriodMs = 60*1000;
private volatile boolean _isRunning;
public static final String PROP_INBOUND_BANDWIDTH = "i2np.bandwidth.inboundKBytesPerSecond";
public static final String PROP_OUTBOUND_BANDWIDTH = "i2np.bandwidth.outboundKBytesPerSecond";
@ -67,12 +68,19 @@ public class FIFOBandwidthRefiller implements Runnable {
_context = context;
_log = context.logManager().getLog(FIFOBandwidthRefiller.class);
reinitialize();
_isRunning = true;
}
/** @since 0.8.8 */
public void shutdown() {
_isRunning = false;
}
public void run() {
// bootstrap 'em with nothing
_lastRefillTime = _limiter.now();
List<FIFOBandwidthLimiter.Request> buffer = new ArrayList(2);
while (true) {
while (_isRunning) {
long now = _limiter.now();
if (now >= _lastCheckConfigTime + _configCheckPeriodMs) {
checkConfig();

View File

@ -1089,6 +1089,10 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
}
private static final int MAX_HANDLERS = 4;
/**
* FIXME static queue mixes handlers from different contexts in multirouter JVM
*/
private final static LinkedBlockingQueue<I2NPMessageHandler> _i2npHandlers = new LinkedBlockingQueue(MAX_HANDLERS);
private final static I2NPMessageHandler acquireHandler(RouterContext ctx) {
@ -1129,6 +1133,15 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_dataReadBufs.offer(buf);
}
/** @since 0.8.8 */
static void releaseResources() {
_i2npHandlers.clear();
_dataReadBufs.clear();
synchronized(_bufs) {
_bufs.clear();
}
}
/**
* sizeof(data)+data+pad+crc.
*

View File

@ -702,6 +702,7 @@ public class NTCPTransport extends TransportImpl {
NTCPConnection con = (NTCPConnection)iter.next();
con.close();
}
NTCPConnection.releaseResources();
// will this work?
replaceAddress(null);
}

View File

@ -113,6 +113,11 @@ class PacketHandler {
return rv.toString();
}
/** @since 0.8.8 */
int getHandlerCount() {
return _handlers.length;
}
/** the packet is from a peer we are establishing an outbound con to, but failed validation, so fallback */
private static final short OUTBOUND_FALLBACK = 1;
/** the packet is from a peer we are establishing an inbound con to, but failed validation, so fallback */

View File

@ -147,6 +147,7 @@ class UDPEndpoint {
/**
* Blocking call to receive the next inbound UDP packet from any peer.
* @return null if we have shut down
*/
public UDPPacket receive() {
if (_receiver == null)

View File

@ -58,9 +58,11 @@ class UDPReceiver {
public void shutdown() {
_keepRunning = false;
_inboundQueue.clear();
UDPPacket poison = UDPPacket.acquire(_context, false);
poison.setMessageType(TYPE_POISON);
_inboundQueue.offer(poison);
for (int i = 0; i < _transport.getPacketHandlerCount(); i++) {
UDPPacket poison = UDPPacket.acquire(_context, false);
poison.setMessageType(TYPE_POISON);
_inboundQueue.offer(poison);
}
for (int i = 1; i <= 5 && !_inboundQueue.isEmpty(); i++) {
try {
Thread.sleep(i * 50);

View File

@ -1367,6 +1367,15 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
return "";
}
/** @since 0.8.8 */
int getPacketHandlerCount() {
PacketHandler handler = _handler;
if (handler != null)
return handler.getHandlerCount();
else
return 0;
}
private static final int DROP_INACTIVITY_TIME = 60*1000;
public void failed(OutboundMessageState msg) { failed(msg, true); }

View File

@ -84,7 +84,7 @@ public class RandomIterator<E> implements Iterator<E> {
* <a href="http://www.qbrundage.com/michaelb/pubs/essays/random_number_generation" title="http://www.qbrundage.com/michaelb/pubs/essays/random_number_generation" target="_blank">http://www.qbrundage.com/michaelb/pubs/e&#8230;</a>
* for some implementations, which are faster than java.util.Random.
*/
private static final Random rand = RandomSource.getInstance();
private final Random rand = RandomSource.getInstance();
/** Used to narrow the range to take random indexes from */
private int lower, upper;

View File

@ -120,7 +120,9 @@ public class HTTPMUSocket
return true;
try {
ssdpMultiSock.leaveGroup(ssdpMultiGroup, ssdpMultiIf);
// I2P close it instead of leaving group so the thread dies
//ssdpMultiSock.leaveGroup(ssdpMultiGroup, ssdpMultiIf);
ssdpMultiSock.close();
ssdpMultiSock = null;
}
catch (Exception e) {

View File

@ -65,6 +65,8 @@ public class ThreadCore implements Runnable
//threadObject.destroy();
//threadObject.stop();
setThreadObject(null);
// I2P break Disposer out of sleep()
threadObject.interrupt();
}
}