forked from I2P_Developers/i2p.i2p
removed obsolete code
minor reorganization to help track down whats intermittently b0rking my kaffe instance logging
This commit is contained in:
@ -42,8 +42,6 @@ public class JobQueue {
|
|||||||
private ArrayList _readyJobs;
|
private ArrayList _readyJobs;
|
||||||
/** list of jobs that are scheduled for running in the future */
|
/** list of jobs that are scheduled for running in the future */
|
||||||
private ArrayList _timedJobs;
|
private ArrayList _timedJobs;
|
||||||
/** when true, don't run any new jobs or update any limits, etc */
|
|
||||||
private boolean _paused;
|
|
||||||
/** job name to JobStat for that job */
|
/** job name to JobStat for that job */
|
||||||
private SortedMap _jobStats;
|
private SortedMap _jobStats;
|
||||||
/** how many job queue runners can go concurrently */
|
/** how many job queue runners can go concurrently */
|
||||||
@ -117,7 +115,6 @@ public class JobQueue {
|
|||||||
_readyJobs = new ArrayList();
|
_readyJobs = new ArrayList();
|
||||||
_timedJobs = new ArrayList();
|
_timedJobs = new ArrayList();
|
||||||
_queueRunners = new HashMap();
|
_queueRunners = new HashMap();
|
||||||
_paused = false;
|
|
||||||
_jobStats = Collections.synchronizedSortedMap(new TreeMap());
|
_jobStats = Collections.synchronizedSortedMap(new TreeMap());
|
||||||
_allowParallelOperation = false;
|
_allowParallelOperation = false;
|
||||||
_pumper = new QueuePumper();
|
_pumper = new QueuePumper();
|
||||||
@ -241,9 +238,6 @@ public class JobQueue {
|
|||||||
*/
|
*/
|
||||||
Job getNext() {
|
Job getNext() {
|
||||||
while (_alive) {
|
while (_alive) {
|
||||||
while (_paused) {
|
|
||||||
try { Thread.sleep(30); } catch (InterruptedException ie) {}
|
|
||||||
}
|
|
||||||
Job rv = null;
|
Job rv = null;
|
||||||
int ready = 0;
|
int ready = 0;
|
||||||
synchronized (_readyJobs) {
|
synchronized (_readyJobs) {
|
||||||
@ -254,10 +248,13 @@ public class JobQueue {
|
|||||||
if (rv != null) {
|
if (rv != null) {
|
||||||
// we found one, but there may be more, so wake up enough
|
// we found one, but there may be more, so wake up enough
|
||||||
// other runners
|
// other runners
|
||||||
awaken(ready-1);
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Using a ready job after waking up " + (ready-1) + " others");
|
_log.debug("Waking up " + (ready-1) + " job runners (and running one)");
|
||||||
|
awaken(ready-1);
|
||||||
return rv;
|
return rv;
|
||||||
|
} else {
|
||||||
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
|
_log.debug("No jobs pending, waiting a second");
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@ -266,6 +263,8 @@ public class JobQueue {
|
|||||||
}
|
}
|
||||||
} catch (InterruptedException ie) {}
|
} catch (InterruptedException ie) {}
|
||||||
}
|
}
|
||||||
|
if (_log.shouldLog(Log.WARN))
|
||||||
|
_log.warn("No longer alive, returning null");
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -341,8 +340,6 @@ public class JobQueue {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//public void pauseQueue() { _paused = true; }
|
|
||||||
//public void unpauseQueue() { _paused = false; }
|
|
||||||
void removeRunner(int id) { _queueRunners.remove(new Integer(id)); }
|
void removeRunner(int id) { _queueRunners.remove(new Integer(id)); }
|
||||||
|
|
||||||
|
|
||||||
@ -358,23 +355,6 @@ public class JobQueue {
|
|||||||
_runnerLock.notify();
|
_runnerLock.notify();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int numRunners = 0;
|
|
||||||
synchronized (_queueRunners) {
|
|
||||||
numRunners = _queueRunners.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (numRunners > 1) {
|
|
||||||
if (numMadeReady > numRunners) {
|
|
||||||
if (numMadeReady < _maxRunners) {
|
|
||||||
_log.info("Too much job contention (" + numMadeReady + " ready and waiting, " + numRunners + " runners exist), adding " + numMadeReady + " new runners (with max " + _maxRunners + ")");
|
|
||||||
runQueue(numMadeReady);
|
|
||||||
} else {
|
|
||||||
_log.info("Too much job contention (" + numMadeReady + " ready and waiting, " + numRunners + " runners exist), increasing to our max of " + _maxRunners + " runners");
|
|
||||||
runQueue(_maxRunners);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -392,10 +372,6 @@ public class JobQueue {
|
|||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
while (_alive) {
|
while (_alive) {
|
||||||
while (_paused) {
|
|
||||||
try { Thread.sleep(1000); } catch (InterruptedException ie) {}
|
|
||||||
}
|
|
||||||
|
|
||||||
// periodically update our max runners limit
|
// periodically update our max runners limit
|
||||||
long now = _context.clock().now();
|
long now = _context.clock().now();
|
||||||
if (now > _lastLimitUpdated + MAX_LIMIT_UPDATE_DELAY) {
|
if (now > _lastLimitUpdated + MAX_LIMIT_UPDATE_DELAY) {
|
||||||
|
@ -37,7 +37,11 @@ class JobQueueRunner implements Runnable {
|
|||||||
while ( (_keepRunning) && (_context.jobQueue().isAlive()) ) {
|
while ( (_keepRunning) && (_context.jobQueue().isAlive()) ) {
|
||||||
try {
|
try {
|
||||||
Job job = _context.jobQueue().getNext();
|
Job job = _context.jobQueue().getNext();
|
||||||
if (job == null) continue;
|
if (job == null) {
|
||||||
|
if (_log.shouldLog(Log.ERROR))
|
||||||
|
_log.error("getNext returned null - dead?");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
long now = _context.clock().now();
|
long now = _context.clock().now();
|
||||||
|
|
||||||
long enqueuedTime = 0;
|
long enqueuedTime = 0;
|
||||||
@ -52,7 +56,6 @@ class JobQueueRunner implements Runnable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
long betweenJobs = now - lastActive;
|
long betweenJobs = now - lastActive;
|
||||||
_context.statManager().addRateData("jobQueue.jobRunnerInactive", betweenJobs, betweenJobs);
|
|
||||||
_currentJob = job;
|
_currentJob = job;
|
||||||
_lastJob = null;
|
_lastJob = null;
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
@ -67,6 +70,7 @@ class JobQueueRunner implements Runnable {
|
|||||||
_context.jobQueue().updateStats(job, doStart, origStartAfter, duration);
|
_context.jobQueue().updateStats(job, doStart, origStartAfter, duration);
|
||||||
long diff = _context.clock().now() - beforeUpdate;
|
long diff = _context.clock().now() - beforeUpdate;
|
||||||
|
|
||||||
|
_context.statManager().addRateData("jobQueue.jobRunnerInactive", betweenJobs, betweenJobs);
|
||||||
_context.statManager().addRateData("jobQueue.jobRun", duration, duration);
|
_context.statManager().addRateData("jobQueue.jobRun", duration, duration);
|
||||||
_context.statManager().addRateData("jobQueue.jobLag", doStart - origStartAfter, 0);
|
_context.statManager().addRateData("jobQueue.jobLag", doStart - origStartAfter, 0);
|
||||||
_context.statManager().addRateData("jobQueue.jobWait", enqueuedTime, enqueuedTime);
|
_context.statManager().addRateData("jobQueue.jobWait", enqueuedTime, enqueuedTime);
|
||||||
|
Reference in New Issue
Block a user