* Threads:

- Reduce thread pool sizes based on memory and/or bandwidth limits
      - Tweak some thread names for clarity
This commit is contained in:
zzz
2010-12-04 18:54:06 +00:00
parent 6dfd9bca69
commit 9aaf95ca98
12 changed files with 129 additions and 68 deletions

View File

@ -428,7 +428,7 @@ public class PeerCoordinator implements PeerListener
peer.runConnection(_util, listener, bitfield); peer.runConnection(_util, listener, bitfield);
} }
}; };
String threadName = peer.toString(); String threadName = "Snark peer " + peer.toString();
new I2PAppThread(r, threadName).start(); new I2PAppThread(r, threadName).start();
return true; return true;
} }

View File

@ -28,12 +28,14 @@ import net.i2p.I2PAppContext;
public class SimpleScheduler { public class SimpleScheduler {
private static final SimpleScheduler _instance = new SimpleScheduler(); private static final SimpleScheduler _instance = new SimpleScheduler();
public static SimpleScheduler getInstance() { return _instance; } public static SimpleScheduler getInstance() { return _instance; }
private static final int THREADS = 4; private static final int MIN_THREADS = 2;
private static final int MAX_THREADS = 4;
private I2PAppContext _context; private I2PAppContext _context;
private Log _log; private Log _log;
private ScheduledThreadPoolExecutor _executor; private ScheduledThreadPoolExecutor _executor;
private String _name; private String _name;
private int _count; private int _count;
private final int _threads;
protected SimpleScheduler() { this("SimpleScheduler"); } protected SimpleScheduler() { this("SimpleScheduler"); }
protected SimpleScheduler(String name) { protected SimpleScheduler(String name) {
@ -41,7 +43,9 @@ public class SimpleScheduler {
_log = _context.logManager().getLog(SimpleScheduler.class); _log = _context.logManager().getLog(SimpleScheduler.class);
_name = name; _name = name;
_count = 0; _count = 0;
_executor = new ScheduledThreadPoolExecutor(THREADS, new CustomThreadFactory()); long maxMemory = Runtime.getRuntime().maxMemory();
_threads = (int) Math.max(MIN_THREADS, Math.min(MAX_THREADS, 1 + (maxMemory / (32*1024*1024))));
_executor = new ScheduledThreadPoolExecutor(_threads, new CustomThreadFactory());
_executor.prestartAllCoreThreads(); _executor.prestartAllCoreThreads();
} }
@ -90,7 +94,7 @@ public class SimpleScheduler {
private class CustomThreadFactory implements ThreadFactory { private class CustomThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
Thread rv = Executors.defaultThreadFactory().newThread(r); Thread rv = Executors.defaultThreadFactory().newThread(r);
rv.setName(_name + ' ' + (++_count) + '/' + THREADS); rv.setName(_name + ' ' + (++_count) + '/' + _threads);
// Uncomment this to test threadgrouping, but we should be all safe now that the constructor preallocates! // Uncomment this to test threadgrouping, but we should be all safe now that the constructor preallocates!
// String name = rv.getThreadGroup().getName(); // String name = rv.getThreadGroup().getName();
// if(!name.equals("main")) { // if(!name.equals("main")) {

View File

@ -18,14 +18,16 @@ import net.i2p.I2PAppContext;
public class SimpleTimer { public class SimpleTimer {
private static final SimpleTimer _instance = new SimpleTimer(); private static final SimpleTimer _instance = new SimpleTimer();
public static SimpleTimer getInstance() { return _instance; } public static SimpleTimer getInstance() { return _instance; }
private I2PAppContext _context; private final I2PAppContext _context;
private Log _log; private final Log _log;
/** event time (Long) to event (TimedEvent) mapping */ /** event time (Long) to event (TimedEvent) mapping */
private final TreeMap _events; private final TreeMap _events;
/** event (TimedEvent) to event time (Long) mapping */ /** event (TimedEvent) to event time (Long) mapping */
private Map _eventTimes; private Map _eventTimes;
private final List _readyEvents; private final List _readyEvents;
private SimpleStore runn; private SimpleStore runn;
private static final int MIN_THREADS = 2;
private static final int MAX_THREADS = 4;
protected SimpleTimer() { this("SimpleTimer"); } protected SimpleTimer() { this("SimpleTimer"); }
protected SimpleTimer(String name) { protected SimpleTimer(String name) {
@ -39,9 +41,11 @@ public class SimpleTimer {
runner.setName(name); runner.setName(name);
runner.setDaemon(true); runner.setDaemon(true);
runner.start(); runner.start();
for (int i = 0; i < 3; i++) { long maxMemory = Runtime.getRuntime().maxMemory();
int threads = (int) Math.max(MIN_THREADS, Math.min(MAX_THREADS, 1 + (maxMemory / (32*1024*1024))));
for (int i = 1; i <= threads ; i++) {
I2PThread executor = new I2PThread(new Executor(_context, _log, _readyEvents, runn)); I2PThread executor = new I2PThread(new Executor(_context, _log, _readyEvents, runn));
executor.setName(name + "Executor " + i); executor.setName(name + "Executor " + i + '/' + threads);
executor.setDaemon(true); executor.setDaemon(true);
executor.start(); executor.start();
} }

View File

@ -27,12 +27,14 @@ import net.i2p.I2PAppContext;
public class SimpleTimer2 { public class SimpleTimer2 {
private static final SimpleTimer2 _instance = new SimpleTimer2(); private static final SimpleTimer2 _instance = new SimpleTimer2();
public static SimpleTimer2 getInstance() { return _instance; } public static SimpleTimer2 getInstance() { return _instance; }
private static final int THREADS = 4; private static final int MIN_THREADS = 2;
private static final int MAX_THREADS = 4;
private I2PAppContext _context; private I2PAppContext _context;
private static Log _log; // static so TimedEvent can use it private static Log _log; // static so TimedEvent can use it
private ScheduledThreadPoolExecutor _executor; private ScheduledThreadPoolExecutor _executor;
private String _name; private String _name;
private int _count; private int _count;
private final int _threads;
protected SimpleTimer2() { this("SimpleTimer2"); } protected SimpleTimer2() { this("SimpleTimer2"); }
protected SimpleTimer2(String name) { protected SimpleTimer2(String name) {
@ -40,7 +42,9 @@ public class SimpleTimer2 {
_log = _context.logManager().getLog(SimpleTimer2.class); _log = _context.logManager().getLog(SimpleTimer2.class);
_name = name; _name = name;
_count = 0; _count = 0;
_executor = new CustomScheduledThreadPoolExecutor(THREADS, new CustomThreadFactory()); long maxMemory = Runtime.getRuntime().maxMemory();
_threads = (int) Math.max(MIN_THREADS, Math.min(MAX_THREADS, 1 + (maxMemory / (32*1024*1024))));
_executor = new CustomScheduledThreadPoolExecutor(_threads, new CustomThreadFactory());
_executor.prestartAllCoreThreads(); _executor.prestartAllCoreThreads();
} }
@ -67,7 +71,7 @@ public class SimpleTimer2 {
private class CustomThreadFactory implements ThreadFactory { private class CustomThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
Thread rv = Executors.defaultThreadFactory().newThread(r); Thread rv = Executors.defaultThreadFactory().newThread(r);
rv.setName(_name + ' ' + (++_count) + '/' + THREADS); rv.setName(_name + ' ' + (++_count) + '/' + _threads);
// Uncomment this to test threadgrouping, but we should be all safe now that the constructor preallocates! // Uncomment this to test threadgrouping, but we should be all safe now that the constructor preallocates!
// String name = rv.getThreadGroup().getName(); // String name = rv.getThreadGroup().getName();
// if(!name.equals("main")) { // if(!name.equals("main")) {

View File

@ -395,10 +395,8 @@ public class JobQueue {
for (int i = _queueRunners.size(); i < numThreads; i++) { for (int i = _queueRunners.size(); i < numThreads; i++) {
JobQueueRunner runner = new JobQueueRunner(_context, i); JobQueueRunner runner = new JobQueueRunner(_context, i);
_queueRunners.put(Integer.valueOf(i), runner); _queueRunners.put(Integer.valueOf(i), runner);
Thread t = new I2PThread(runner); Thread t = new I2PThread(runner, "JobQueue " + (++_runnerId) + '/' + numThreads, false);
t.setName("JobQueue"+(_runnerId++));
//t.setPriority(I2PThread.MAX_PRIORITY-1); //t.setPriority(I2PThread.MAX_PRIORITY-1);
t.setDaemon(false);
t.start(); t.start();
} }
} else if (_queueRunners.size() == numThreads) { } else if (_queueRunners.size() == numThreads) {

View File

@ -24,22 +24,27 @@ import net.i2p.util.Log;
* @author zzz * @author zzz
*/ */
public class NTCPSendFinisher { public class NTCPSendFinisher {
private static final int THREADS = 4; private static final int MIN_THREADS = 1;
private static final int MAX_THREADS = 4;
private final I2PAppContext _context; private final I2PAppContext _context;
private final NTCPTransport _transport; private final NTCPTransport _transport;
private final Log _log; private final Log _log;
private int _count; private static int _count;
private ThreadPoolExecutor _executor; private ThreadPoolExecutor _executor;
private static int _threads;
public NTCPSendFinisher(I2PAppContext context, NTCPTransport transport) { public NTCPSendFinisher(I2PAppContext context, NTCPTransport transport) {
_context = context; _context = context;
_log = _context.logManager().getLog(NTCPSendFinisher.class); _log = _context.logManager().getLog(NTCPSendFinisher.class);
_transport = transport; _transport = transport;
_context.statManager().createRateStat("ntcp.sendFinishTime", "How long to queue and excecute msg.afterSend()", "ntcp", new long[] {5*1000});
} }
public void start() { public void start() {
_count = 0; _count = 0;
_executor = new CustomThreadPoolExecutor(); long maxMemory = Runtime.getRuntime().maxMemory();
_threads = (int) Math.max(MIN_THREADS, Math.min(MAX_THREADS, 1 + (maxMemory / (32*1024*1024))));
_executor = new CustomThreadPoolExecutor(_threads);
} }
public void stop() { public void stop() {
@ -57,18 +62,18 @@ public class NTCPSendFinisher {
} }
// not really needed for now but in case we want to add some hooks like afterExecute() // not really needed for now but in case we want to add some hooks like afterExecute()
private class CustomThreadPoolExecutor extends ThreadPoolExecutor { private static class CustomThreadPoolExecutor extends ThreadPoolExecutor {
public CustomThreadPoolExecutor() { public CustomThreadPoolExecutor(int num) {
// use unbounded queue, so maximumPoolSize and keepAliveTime have no effect // use unbounded queue, so maximumPoolSize and keepAliveTime have no effect
super(THREADS, THREADS, 1000, TimeUnit.MILLISECONDS, super(num, num, 1000, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(), new CustomThreadFactory()); new LinkedBlockingQueue(), new CustomThreadFactory());
} }
} }
private class CustomThreadFactory implements ThreadFactory { private static class CustomThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
Thread rv = Executors.defaultThreadFactory().newThread(r); Thread rv = Executors.defaultThreadFactory().newThread(r);
rv.setName("NTCPSendFinisher " + (++_count) + '/' + THREADS); rv.setName("NTCPSendFinisher " + (++_count) + '/' + _threads);
rv.setDaemon(true); rv.setDaemon(true);
return rv; return rv;
} }
@ -78,15 +83,18 @@ public class NTCPSendFinisher {
* Call afterSend() for the message * Call afterSend() for the message
*/ */
private class RunnableEvent implements Runnable { private class RunnableEvent implements Runnable {
private OutNetMessage _msg; private final OutNetMessage _msg;
private final long _queued;
public RunnableEvent(OutNetMessage msg) { public RunnableEvent(OutNetMessage msg) {
_msg = msg; _msg = msg;
_queued = _context.clock().now();
} }
public void run() { public void run() {
try { try {
_transport.afterSend(_msg, true, false, _msg.getSendTime()); _transport.afterSend(_msg, true, false, _msg.getSendTime());
_context.statManager().addRateData("ntcp.sendFinishTime", _context.clock().now() - _queued, 0);
} catch (Throwable t) { } catch (Throwable t) {
_log.log(Log.CRIT, " wtf, afterSend borked", t); _log.log(Log.CRIT, " wtf, afterSend borked", t);
} }

View File

@ -433,8 +433,10 @@ public class NTCPTransport extends TransportImpl {
return skews; return skews;
} }
private static final int NUM_CONCURRENT_READERS = 3; private static final int MIN_CONCURRENT_READERS = 2; // unless < 32MB
private static final int NUM_CONCURRENT_WRITERS = 3; private static final int MIN_CONCURRENT_WRITERS = 2; // unless < 32MB
private static final int MAX_CONCURRENT_READERS = 4;
private static final int MAX_CONCURRENT_WRITERS = 4;
/** /**
* Called by TransportManager. * Called by TransportManager.
@ -449,12 +451,8 @@ public class NTCPTransport extends TransportImpl {
if (_pumper.isAlive()) if (_pumper.isAlive())
return _myAddress != null ? _myAddress.toRouterAddress() : null; return _myAddress != null ? _myAddress.toRouterAddress() : null;
if (_log.shouldLog(Log.WARN)) _log.warn("Starting ntcp transport listening"); if (_log.shouldLog(Log.WARN)) _log.warn("Starting ntcp transport listening");
_finisher.start();
_pumper.startPumping();
_reader.startReading(NUM_CONCURRENT_READERS);
_writer.startWriting(NUM_CONCURRENT_WRITERS);
startIt();
configureLocalAddress(); configureLocalAddress();
return bindAddress(); return bindAddress();
} }
@ -471,12 +469,8 @@ public class NTCPTransport extends TransportImpl {
if (_pumper.isAlive()) if (_pumper.isAlive())
return _myAddress != null ? _myAddress.toRouterAddress() : null; return _myAddress != null ? _myAddress.toRouterAddress() : null;
if (_log.shouldLog(Log.WARN)) _log.warn("Restarting ntcp transport listening"); if (_log.shouldLog(Log.WARN)) _log.warn("Restarting ntcp transport listening");
_finisher.start();
_pumper.startPumping();
_reader.startReading(NUM_CONCURRENT_READERS);
_writer.startWriting(NUM_CONCURRENT_WRITERS);
startIt();
if (addr == null) if (addr == null)
_myAddress = null; _myAddress = null;
else else
@ -484,6 +478,28 @@ public class NTCPTransport extends TransportImpl {
return bindAddress(); return bindAddress();
} }
/**
* Start up. Caller must synchronize.
* @since 0.8.3
*/
private void startIt() {
_finisher.start();
_pumper.startPumping();
long maxMemory = Runtime.getRuntime().maxMemory();
int nr, nw;
if (maxMemory < 32*1024*1024) {
nr = nw = 1;
} else if (maxMemory < 64*1024*1024) {
nr = nw = 2;
} else {
nr = Math.max(MIN_CONCURRENT_READERS, Math.min(MAX_CONCURRENT_READERS, _context.bandwidthLimiter().getInboundKBytesPerSecond() / 20));
nw = Math.max(MIN_CONCURRENT_WRITERS, Math.min(MAX_CONCURRENT_WRITERS, _context.bandwidthLimiter().getOutboundKBytesPerSecond() / 20));
}
_reader.startReading(nr);
_writer.startWriting(nw);
}
public boolean isAlive() { public boolean isAlive() {
return _pumper.isAlive(); return _pumper.isAlive();
} }

View File

@ -15,13 +15,13 @@ import net.i2p.util.Log;
* *
*/ */
class Reader { class Reader {
private RouterContext _context; private final RouterContext _context;
private Log _log; private final Log _log;
// TODO change to LBQ ?? // TODO change to LBQ ??
private final List<NTCPConnection> _pendingConnections; private final List<NTCPConnection> _pendingConnections;
private List<NTCPConnection> _liveReads; private final List<NTCPConnection> _liveReads;
private List<NTCPConnection> _readAfterLive; private final List<NTCPConnection> _readAfterLive;
private List<Runner> _runners; private final List<Runner> _runners;
public Reader(RouterContext ctx) { public Reader(RouterContext ctx) {
_context = ctx; _context = ctx;
@ -33,9 +33,9 @@ class Reader {
} }
public void startReading(int numReaders) { public void startReading(int numReaders) {
for (int i = 0; i < numReaders; i++) { for (int i = 1; i <= numReaders; i++) {
Runner r = new Runner(); Runner r = new Runner();
I2PThread t = new I2PThread(r, "NTCP read " + i, true); I2PThread t = new I2PThread(r, "NTCP reader " + i + '/' + numReaders, true);
_runners.add(r); _runners.add(r);
t.start(); t.start();
} }

View File

@ -14,12 +14,12 @@ import net.i2p.util.Log;
* *
*/ */
class Writer { class Writer {
private RouterContext _context; private final RouterContext _context;
private Log _log; private final Log _log;
private final List<NTCPConnection> _pendingConnections; private final List<NTCPConnection> _pendingConnections;
private List<NTCPConnection> _liveWrites; private final List<NTCPConnection> _liveWrites;
private List<NTCPConnection> _writeAfterLive; private final List<NTCPConnection> _writeAfterLive;
private List<Runner> _runners; private final List<Runner> _runners;
public Writer(RouterContext ctx) { public Writer(RouterContext ctx) {
_context = ctx; _context = ctx;
@ -31,9 +31,9 @@ class Writer {
} }
public void startWriting(int numWriters) { public void startWriting(int numWriters) {
for (int i = 0; i < numWriters; i++) { for (int i = 1; i <=numWriters; i++) {
Runner r = new Runner(); Runner r = new Runner();
I2PThread t = new I2PThread(r, "NTCP write " + i, true); I2PThread t = new I2PThread(r, "NTCP writer " + i + '/' + numWriters, true);
_runners.add(r); _runners.add(r);
t.start(); t.start();
} }

View File

@ -27,7 +27,9 @@ class MessageReceiver {
private final BlockingQueue<InboundMessageState> _completeMessages; private final BlockingQueue<InboundMessageState> _completeMessages;
private boolean _alive; private boolean _alive;
//private ByteCache _cache; //private ByteCache _cache;
private static final int THREADS = 5; private static final int MIN_THREADS = 2; // unless < 32MB
private static final int MAX_THREADS = 5;
private final int _threadCount;
private static final long POISON_IMS = -99999999999l; private static final long POISON_IMS = -99999999999l;
public MessageReceiver(RouterContext ctx, UDPTransport transport) { public MessageReceiver(RouterContext ctx, UDPTransport transport) {
@ -35,10 +37,19 @@ class MessageReceiver {
_log = ctx.logManager().getLog(MessageReceiver.class); _log = ctx.logManager().getLog(MessageReceiver.class);
_transport = transport; _transport = transport;
_completeMessages = new LinkedBlockingQueue(); _completeMessages = new LinkedBlockingQueue();
long maxMemory = Runtime.getRuntime().maxMemory();
if (maxMemory < 32*1024*1024)
_threadCount = 1;
else if (maxMemory < 64*1024*1024)
_threadCount = 2;
else
_threadCount = Math.max(MIN_THREADS, Math.min(MAX_THREADS, ctx.bandwidthLimiter().getInboundKBytesPerSecond() / 20));
// the runners run forever, no need to have a cache // the runners run forever, no need to have a cache
//_cache = ByteCache.getInstance(64, I2NPMessage.MAX_SIZE); //_cache = ByteCache.getInstance(64, I2NPMessage.MAX_SIZE);
_context.statManager().createRateStat("udp.inboundExpired", "How many messages were expired before reception?", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.inboundExpired", "How many messages were expired before reception?", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.inboundRemaining", "How many messages were remaining when a message is pulled off the complete queue?", "udp", UDPTransport.RATES); //_context.statManager().createRateStat("udp.inboundRemaining", "How many messages were remaining when a message is pulled off the complete queue?", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.inboundReady", "How many messages were ready when a message is added to the complete queue?", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.inboundReady", "How many messages were ready when a message is added to the complete queue?", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.inboundReadTime", "How long it takes to parse in the completed fragments into a message?", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.inboundReadTime", "How long it takes to parse in the completed fragments into a message?", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.inboundReceiveProcessTime", "How long it takes to add the message to the transport?", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.inboundReceiveProcessTime", "How long it takes to add the message to the transport?", "udp", UDPTransport.RATES);
@ -49,8 +60,8 @@ class MessageReceiver {
public void startup() { public void startup() {
_alive = true; _alive = true;
for (int i = 0; i < THREADS; i++) { for (int i = 0; i < _threadCount; i++) {
I2PThread t = new I2PThread(new Runner(), "UDP message receiver " + i + '/' + THREADS, true); I2PThread t = new I2PThread(new Runner(), "UDP message receiver " + (i+1) + '/' + _threadCount, true);
t.start(); t.start();
} }
} }
@ -64,7 +75,7 @@ class MessageReceiver {
public void shutdown() { public void shutdown() {
_alive = false; _alive = false;
_completeMessages.clear(); _completeMessages.clear();
for (int i = 0; i < THREADS; i++) { for (int i = 0; i < _threadCount; i++) {
InboundMessageState ims = new InboundMessageState(_context, POISON_IMS, null); InboundMessageState ims = new InboundMessageState(_context, POISON_IMS, null);
_completeMessages.offer(ims); _completeMessages.offer(ims);
} }
@ -119,8 +130,8 @@ class MessageReceiver {
if (message != null) { if (message != null) {
long before = System.currentTimeMillis(); long before = System.currentTimeMillis();
if (remaining > 0) //if (remaining > 0)
_context.statManager().addRateData("udp.inboundRemaining", remaining, 0); // _context.statManager().addRateData("udp.inboundRemaining", remaining, 0);
int size = message.getCompleteSize(); int size = message.getCompleteSize();
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Full message received (" + message.getMessageId() + ") after " + message.getLifetime()); _log.info("Full message received (" + message.getMessageId() + ") after " + message.getLifetime());

View File

@ -31,11 +31,13 @@ class PacketHandler {
private boolean _keepReading; private boolean _keepReading;
private final Handler[] _handlers; private final Handler[] _handlers;
private static final int NUM_HANDLERS = 5; private static final int MIN_NUM_HANDLERS = 2; // unless < 32MB
private static final int MAX_NUM_HANDLERS = 5;
/** let packets be up to 30s slow */ /** let packets be up to 30s slow */
private static final long GRACE_PERIOD = Router.CLOCK_FUDGE_FACTOR + 30*1000; private static final long GRACE_PERIOD = Router.CLOCK_FUDGE_FACTOR + 30*1000;
PacketHandler(RouterContext ctx, UDPTransport transport, UDPEndpoint endpoint, EstablishmentManager establisher, InboundMessageFragments inbound, PeerTestManager testManager, IntroductionManager introManager) {// LINT -- Exporting non-public type through public API PacketHandler(RouterContext ctx, UDPTransport transport, UDPEndpoint endpoint, EstablishmentManager establisher,
InboundMessageFragments inbound, PeerTestManager testManager, IntroductionManager introManager) {
_context = ctx; _context = ctx;
_log = ctx.logManager().getLog(PacketHandler.class); _log = ctx.logManager().getLog(PacketHandler.class);
_transport = transport; _transport = transport;
@ -44,10 +46,20 @@ class PacketHandler {
_inbound = inbound; _inbound = inbound;
_testManager = testManager; _testManager = testManager;
_introManager = introManager; _introManager = introManager;
_handlers = new Handler[NUM_HANDLERS];
for (int i = 0; i < NUM_HANDLERS; i++) { long maxMemory = Runtime.getRuntime().maxMemory();
int num_handlers;
if (maxMemory < 32*1024*1024)
num_handlers = 1;
else if (maxMemory < 64*1024*1024)
num_handlers = 2;
else
num_handlers = Math.max(MIN_NUM_HANDLERS, Math.min(MAX_NUM_HANDLERS, ctx.bandwidthLimiter().getInboundKBytesPerSecond() / 20));
_handlers = new Handler[num_handlers];
for (int i = 0; i < num_handlers; i++) {
_handlers[i] = new Handler(); _handlers[i] = new Handler();
} }
_context.statManager().createRateStat("udp.handleTime", "How long it takes to handle a received packet after its been pulled off the queue", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.handleTime", "How long it takes to handle a received packet after its been pulled off the queue", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.queueTime", "How long after a packet is received can we begin handling it", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.queueTime", "How long after a packet is received can we begin handling it", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receivePacketSkew", "How long ago after the packet was sent did we receive it", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.receivePacketSkew", "How long ago after the packet was sent did we receive it", "udp", UDPTransport.RATES);
@ -79,8 +91,8 @@ class PacketHandler {
public void startup() { public void startup() {
_keepReading = true; _keepReading = true;
for (int i = 0; i < NUM_HANDLERS; i++) { for (int i = 0; i < _handlers.length; i++) {
I2PThread t = new I2PThread(_handlers[i], "UDP Packet handler " + i + '/' + NUM_HANDLERS, true); I2PThread t = new I2PThread(_handlers[i], "UDP Packet handler " + (i+1) + '/' + _handlers.length, true);
t.start(); t.start();
} }
} }
@ -91,8 +103,8 @@ class PacketHandler {
String getHandlerStatus() { String getHandlerStatus() {
StringBuilder rv = new StringBuilder(); StringBuilder rv = new StringBuilder();
rv.append("Handlers: ").append(NUM_HANDLERS); rv.append("Handlers: ").append(_handlers.length);
for (int i = 0; i < NUM_HANDLERS; i++) { for (int i = 0; i < _handlers.length; i++) {
Handler handler = _handlers[i]; Handler handler = _handlers[i];
rv.append(" handler ").append(i).append(" state: ").append(handler._state); rv.append(" handler ").append(i).append(" state: ").append(handler._state);
} }

View File

@ -16,22 +16,26 @@ public class TunnelGatewayPumper implements Runnable {
private RouterContext _context; private RouterContext _context;
private final BlockingQueue<PumpedTunnelGateway> _wantsPumping; private final BlockingQueue<PumpedTunnelGateway> _wantsPumping;
private boolean _stop; private boolean _stop;
private static final int PUMPERS = 4; private static final int MIN_PUMPERS = 1;
private static final int MAX_PUMPERS = 4;
private final int _pumpers;
/** Creates a new instance of TunnelGatewayPumper */ /** Creates a new instance of TunnelGatewayPumper */
public TunnelGatewayPumper(RouterContext ctx) { public TunnelGatewayPumper(RouterContext ctx) {
_context = ctx; _context = ctx;
_wantsPumping = new LinkedBlockingQueue(); _wantsPumping = new LinkedBlockingQueue();
_stop = false; _stop = false;
for (int i = 0; i < PUMPERS; i++) long maxMemory = Runtime.getRuntime().maxMemory();
new I2PThread(this, "Tunnel GW pumper " + i + '/' + PUMPERS, true).start(); _pumpers = (int) Math.max(MIN_PUMPERS, Math.min(MAX_PUMPERS, 1 + (maxMemory / (32*1024*1024))));
for (int i = 0; i < _pumpers; i++)
new I2PThread(this, "Tunnel GW pumper " + (i+1) + '/' + _pumpers, true).start();
} }
public void stopPumping() { public void stopPumping() {
_stop=true; _stop=true;
_wantsPumping.clear(); _wantsPumping.clear();
PumpedTunnelGateway poison = new PoisonPTG(_context); PumpedTunnelGateway poison = new PoisonPTG(_context);
for (int i = 0; i < PUMPERS; i++) for (int i = 0; i < _pumpers; i++)
_wantsPumping.offer(poison); _wantsPumping.offer(poison);
for (int i = 1; i <= 5 && !_wantsPumping.isEmpty(); i++) { for (int i = 1; i <= 5 && !_wantsPumping.isEmpty(); i++) {
try { try {