diff --git a/router/java/src/net/i2p/router/JobImpl.java b/router/java/src/net/i2p/router/JobImpl.java index f6bafcb959..d5ad4c1f6d 100644 --- a/router/java/src/net/i2p/router/JobImpl.java +++ b/router/java/src/net/i2p/router/JobImpl.java @@ -20,7 +20,6 @@ public abstract class JobImpl implements Job { private final JobTiming _timing; private static final AtomicLong _idSrc = new AtomicLong(); private final long _id; - //private Exception _addedBy; private volatile long _madeReadyOn; public JobImpl(RouterContext context) { @@ -42,18 +41,6 @@ public abstract class JobImpl implements Job { return buf.toString(); } - /** - * @deprecated - * As of 0.8.1, this is a noop, as it just adds classes to the log manager - * class list for no good reason. Logging in jobs is almost always - * set explicitly rather than by class name. - */ - @Deprecated - void addedToQueue() { - //if (_context.logManager().getLog(getClass()).shouldLog(Log.DEBUG)) - // _addedBy = new Exception(); - } - /** * @deprecated * @return null always diff --git a/router/java/src/net/i2p/router/JobQueue.java b/router/java/src/net/i2p/router/JobQueue.java index b5f7ae1011..22326f1c5c 100644 --- a/router/java/src/net/i2p/router/JobQueue.java +++ b/router/java/src/net/i2p/router/JobQueue.java @@ -156,7 +156,6 @@ public class JobQueue { _context.statManager().createRateStat("jobQueue.jobRunSlow", "How long jobs that take over a second take", "JobQueue", new long[] { 60*60*1000l, 24*60*60*1000l }); _context.statManager().createRequiredRateStat("jobQueue.jobLag", "Job run delay (ms)", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); _context.statManager().createRateStat("jobQueue.jobWait", "How long does a job sit on the job queue?", "JobQueue", new long[] { 60*60*1000l, 24*60*60*1000l }); - //_context.statManager().createRateStat("jobQueue.jobRunnerInactive", "How long are runners inactive?", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); _readyJobs = new LinkedBlockingQueue(); _timedJobs = new TreeSet(new JobComparator()); @@ -173,10 +172,6 @@ public class JobQueue { public void addJob(Job job) { if (job == null || !_alive) return; - // This does nothing - //if (job instanceof JobImpl) - // ((JobImpl)job).addedToQueue(); - long numReady; boolean alreadyExists = false; boolean dropped = false; @@ -487,11 +482,8 @@ public class JobQueue { /** * Start up the queue with the specified number of concurrent processors. - * If this method has already been called, it will adjust the number of - * runners to meet the new number. This does not kill jobs running on - * excess threads, it merely instructs the threads to die after finishing - * the current job. - * + * If this method has already been called, it will increase the number of + * runners if necessary. This does not ever stop or reduce threads. */ public synchronized void runQueue(int numThreads) { // we're still starting up [serially] and we've got at least one runner, @@ -511,14 +503,18 @@ public class JobQueue { runner.start(); } } else if (_queueRunners.size() == numThreads) { - for (JobQueueRunner runner : _queueRunners.values()) { - runner.startRunning(); - } + //for (JobQueueRunner runner : _queueRunners.values()) { + // runner.startRunning(); + //} + if (_log.shouldWarn()) + _log.warn("Already have " + numThreads + " threads"); } else { // numThreads < # runners, so shrink //for (int i = _queueRunners.size(); i > numThreads; i++) { // QueueRunner runner = (QueueRunner)_queueRunners.get(new Integer(i)); // runner.stopRunning(); //} + if (_log.shouldWarn()) + _log.warn("Already have " + _queueRunners.size() + " threads, not decreasing"); } } diff --git a/router/java/src/net/i2p/router/JobQueueRunner.java b/router/java/src/net/i2p/router/JobQueueRunner.java index 07be132f13..d79d4e1a4e 100644 --- a/router/java/src/net/i2p/router/JobQueueRunner.java +++ b/router/java/src/net/i2p/router/JobQueueRunner.java @@ -14,7 +14,6 @@ class JobQueueRunner extends I2PThread { private volatile Job _lastJob; private volatile long _lastBegin; private volatile long _lastEnd; - //private volatile int _state; public JobQueueRunner(RouterContext context, int id) { _context = context; @@ -23,11 +22,8 @@ class JobQueueRunner extends I2PThread { _log = _context.logManager().getLog(JobQueueRunner.class); setPriority(NORM_PRIORITY + 1); // all createRateStat in JobQueue - //_state = 1; } - //final int getState() { return _state; } - public Job getCurrentJob() { return _currentJob; } public Job getLastJob() { return _lastJob; } public int getRunnerId() { return _id; } @@ -35,16 +31,13 @@ class JobQueueRunner extends I2PThread { public void startRunning() { _keepRunning = true; } public long getLastBegin() { return _lastBegin; } public long getLastEnd() { return _lastEnd; } + public void run() { - //_state = 2; long lastActive = _context.clock().now(); - while ( (_keepRunning) && (_context.jobQueue().isAlive()) ) { - //_state = 3; + while (_keepRunning && _context.jobQueue().isAlive()) { try { Job job = _context.jobQueue().getNext(); - //_state = 4; if (job == null) { - //_state = 5; if (_context.router().isAlive()) if (_log.shouldLog(Log.ERROR)) _log.error("getNext returned null - dead?"); @@ -54,41 +47,32 @@ class JobQueueRunner extends I2PThread { long enqueuedTime = 0; if (job instanceof JobImpl) { - //_state = 6; long when = ((JobImpl)job).getMadeReadyOn(); if (when <= 0) { - //_state = 7; _log.error("Job was not made ready?! " + job, new Exception("Not made ready?!")); } else { - //_state = 8; enqueuedTime = now - when; } } _currentJob = job; _lastJob = null; - //_state = 9; if (_log.shouldLog(Log.DEBUG)) _log.debug("Runner " + _id + " running job " + job.getJobId() + ": " + job.getName()); long origStartAfter = job.getTiming().getStartAfter(); long doStart = _context.clock().now(); - //_state = 10; job.getTiming().start(); runCurrentJob(); job.getTiming().end(); - //_state = 11; long duration = job.getTiming().getActualEnd() - job.getTiming().getActualStart(); long beforeUpdate = _context.clock().now(); - //_state = 12; _context.jobQueue().updateStats(job, doStart, origStartAfter, duration); - //_state = 13; long diff = _context.clock().now() - beforeUpdate; long lag = doStart - origStartAfter; if (lag < 0) lag = 0; - //_context.statManager().addRateData("jobQueue.jobRunnerInactive", betweenJobs, betweenJobs); _context.statManager().addRateData("jobQueue.jobRun", duration, duration); _context.statManager().addRateData("jobQueue.jobLag", lag); _context.statManager().addRateData("jobQueue.jobWait", enqueuedTime, enqueuedTime); @@ -100,8 +84,6 @@ class JobQueueRunner extends I2PThread { + ") on job " + _currentJob); } - //_state = 14; - if (diff > 100) { if (_log.shouldLog(Log.WARN)) _log.warn("Updating statistics for the job took too long [" + diff + "ms]"); @@ -113,27 +95,19 @@ class JobQueueRunner extends I2PThread { _lastJob = _currentJob; _currentJob = null; _lastEnd = lastActive; - //_state = 15; - - //if ( (jobNum % 10) == 0) - // System.gc(); } catch (Throwable t) { _log.log(Log.CRIT, "error running?", t); } } - //_state = 16; if (_context.router().isAlive()) _log.log(Log.CRIT, "Queue runner " + _id + " exiting"); _context.jobQueue().removeRunner(_id); - //_state = 17; } private void runCurrentJob() { try { - //_state = 18; _lastBegin = _context.clock().now(); _currentJob.runJob(); - //_state = 19; } catch (OutOfMemoryError oom) { try { if (SystemVersion.isAndroid()) @@ -142,7 +116,6 @@ class JobQueueRunner extends I2PThread { fireOOM(oom); } catch (Throwable t) {} } catch (Throwable t) { - //_state = 21; _log.log(Log.CRIT, "Error processing job [" + _currentJob.getName() + "] on thread " + _id + ": " + t, t); }