Router: Job Queue cleanups and javadocs (ticket #2590)

This commit is contained in:
zzz
2019-08-29 12:15:13 +00:00
parent e6dcfaee15
commit 5f3da69acc
3 changed files with 11 additions and 55 deletions

View File

@ -20,7 +20,6 @@ public abstract class JobImpl implements Job {
private final JobTiming _timing;
private static final AtomicLong _idSrc = new AtomicLong();
private final long _id;
//private Exception _addedBy;
private volatile long _madeReadyOn;
public JobImpl(RouterContext context) {
@ -42,18 +41,6 @@ public abstract class JobImpl implements Job {
return buf.toString();
}
/**
* @deprecated
* As of 0.8.1, this is a noop, as it just adds classes to the log manager
* class list for no good reason. Logging in jobs is almost always
* set explicitly rather than by class name.
*/
@Deprecated
void addedToQueue() {
//if (_context.logManager().getLog(getClass()).shouldLog(Log.DEBUG))
// _addedBy = new Exception();
}
/**
* @deprecated
* @return null always

View File

@ -156,7 +156,6 @@ public class JobQueue {
_context.statManager().createRateStat("jobQueue.jobRunSlow", "How long jobs that take over a second take", "JobQueue", new long[] { 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRequiredRateStat("jobQueue.jobLag", "Job run delay (ms)", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("jobQueue.jobWait", "How long does a job sit on the job queue?", "JobQueue", new long[] { 60*60*1000l, 24*60*60*1000l });
//_context.statManager().createRateStat("jobQueue.jobRunnerInactive", "How long are runners inactive?", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
_readyJobs = new LinkedBlockingQueue<Job>();
_timedJobs = new TreeSet<Job>(new JobComparator());
@ -173,10 +172,6 @@ public class JobQueue {
public void addJob(Job job) {
if (job == null || !_alive) return;
// This does nothing
//if (job instanceof JobImpl)
// ((JobImpl)job).addedToQueue();
long numReady;
boolean alreadyExists = false;
boolean dropped = false;
@ -487,11 +482,8 @@ public class JobQueue {
/**
* Start up the queue with the specified number of concurrent processors.
* If this method has already been called, it will adjust the number of
* runners to meet the new number. This does not kill jobs running on
* excess threads, it merely instructs the threads to die after finishing
* the current job.
*
* If this method has already been called, it will increase the number of
* runners if necessary. This does not ever stop or reduce threads.
*/
public synchronized void runQueue(int numThreads) {
// we're still starting up [serially] and we've got at least one runner,
@ -511,14 +503,18 @@ public class JobQueue {
runner.start();
}
} else if (_queueRunners.size() == numThreads) {
for (JobQueueRunner runner : _queueRunners.values()) {
runner.startRunning();
}
//for (JobQueueRunner runner : _queueRunners.values()) {
// runner.startRunning();
//}
if (_log.shouldWarn())
_log.warn("Already have " + numThreads + " threads");
} else { // numThreads < # runners, so shrink
//for (int i = _queueRunners.size(); i > numThreads; i++) {
// QueueRunner runner = (QueueRunner)_queueRunners.get(new Integer(i));
// runner.stopRunning();
//}
if (_log.shouldWarn())
_log.warn("Already have " + _queueRunners.size() + " threads, not decreasing");
}
}

View File

@ -14,7 +14,6 @@ class JobQueueRunner extends I2PThread {
private volatile Job _lastJob;
private volatile long _lastBegin;
private volatile long _lastEnd;
//private volatile int _state;
public JobQueueRunner(RouterContext context, int id) {
_context = context;
@ -23,11 +22,8 @@ class JobQueueRunner extends I2PThread {
_log = _context.logManager().getLog(JobQueueRunner.class);
setPriority(NORM_PRIORITY + 1);
// all createRateStat in JobQueue
//_state = 1;
}
//final int getState() { return _state; }
public Job getCurrentJob() { return _currentJob; }
public Job getLastJob() { return _lastJob; }
public int getRunnerId() { return _id; }
@ -35,16 +31,13 @@ class JobQueueRunner extends I2PThread {
public void startRunning() { _keepRunning = true; }
public long getLastBegin() { return _lastBegin; }
public long getLastEnd() { return _lastEnd; }
public void run() {
//_state = 2;
long lastActive = _context.clock().now();
while ( (_keepRunning) && (_context.jobQueue().isAlive()) ) {
//_state = 3;
while (_keepRunning && _context.jobQueue().isAlive()) {
try {
Job job = _context.jobQueue().getNext();
//_state = 4;
if (job == null) {
//_state = 5;
if (_context.router().isAlive())
if (_log.shouldLog(Log.ERROR))
_log.error("getNext returned null - dead?");
@ -54,41 +47,32 @@ class JobQueueRunner extends I2PThread {
long enqueuedTime = 0;
if (job instanceof JobImpl) {
//_state = 6;
long when = ((JobImpl)job).getMadeReadyOn();
if (when <= 0) {
//_state = 7;
_log.error("Job was not made ready?! " + job,
new Exception("Not made ready?!"));
} else {
//_state = 8;
enqueuedTime = now - when;
}
}
_currentJob = job;
_lastJob = null;
//_state = 9;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Runner " + _id + " running job " + job.getJobId() + ": " + job.getName());
long origStartAfter = job.getTiming().getStartAfter();
long doStart = _context.clock().now();
//_state = 10;
job.getTiming().start();
runCurrentJob();
job.getTiming().end();
//_state = 11;
long duration = job.getTiming().getActualEnd() - job.getTiming().getActualStart();
long beforeUpdate = _context.clock().now();
//_state = 12;
_context.jobQueue().updateStats(job, doStart, origStartAfter, duration);
//_state = 13;
long diff = _context.clock().now() - beforeUpdate;
long lag = doStart - origStartAfter;
if (lag < 0) lag = 0;
//_context.statManager().addRateData("jobQueue.jobRunnerInactive", betweenJobs, betweenJobs);
_context.statManager().addRateData("jobQueue.jobRun", duration, duration);
_context.statManager().addRateData("jobQueue.jobLag", lag);
_context.statManager().addRateData("jobQueue.jobWait", enqueuedTime, enqueuedTime);
@ -100,8 +84,6 @@ class JobQueueRunner extends I2PThread {
+ ") on job " + _currentJob);
}
//_state = 14;
if (diff > 100) {
if (_log.shouldLog(Log.WARN))
_log.warn("Updating statistics for the job took too long [" + diff + "ms]");
@ -113,27 +95,19 @@ class JobQueueRunner extends I2PThread {
_lastJob = _currentJob;
_currentJob = null;
_lastEnd = lastActive;
//_state = 15;
//if ( (jobNum % 10) == 0)
// System.gc();
} catch (Throwable t) {
_log.log(Log.CRIT, "error running?", t);
}
}
//_state = 16;
if (_context.router().isAlive())
_log.log(Log.CRIT, "Queue runner " + _id + " exiting");
_context.jobQueue().removeRunner(_id);
//_state = 17;
}
private void runCurrentJob() {
try {
//_state = 18;
_lastBegin = _context.clock().now();
_currentJob.runJob();
//_state = 19;
} catch (OutOfMemoryError oom) {
try {
if (SystemVersion.isAndroid())
@ -142,7 +116,6 @@ class JobQueueRunner extends I2PThread {
fireOOM(oom);
} catch (Throwable t) {}
} catch (Throwable t) {
//_state = 21;
_log.log(Log.CRIT, "Error processing job [" + _currentJob.getName()
+ "] on thread " + _id + ": " + t, t);
}