diff --git a/router/java/src/net/i2p/router/JobQueue.java b/router/java/src/net/i2p/router/JobQueue.java index d261387d5..ff76086cb 100644 --- a/router/java/src/net/i2p/router/JobQueue.java +++ b/router/java/src/net/i2p/router/JobQueue.java @@ -44,7 +44,7 @@ public class JobQueue { /** job name to JobStat for that job */ private SortedMap _jobStats; /** how many job queue runners can go concurrently */ - private int _maxRunners; + private int _maxRunners = 1; private QueuePumper _pumper; /** will we allow the # job runners to grow beyond 1? */ private boolean _allowParallelOperation; @@ -170,6 +170,7 @@ public class JobQueue { } else { synchronized (_timedJobs) { _timedJobs.add(job); + _timedJobs.notifyAll(); } } } else { @@ -300,48 +301,6 @@ public class JobQueue { return null; } - /** - * Move newly ready timed jobs to the ready queue. Returns the - * number of ready jobs after the check is completed - * - */ - private int checkJobTimings() { - boolean newJobsReady = false; - long now = _context.clock().now(); - ArrayList toAdd = null; - synchronized (_timedJobs) { - for (int i = 0; i < _timedJobs.size(); i++) { - Job j = (Job)_timedJobs.get(i); - // find jobs due to start before now - if (j.getTiming().getStartAfter() <= now) { - if (j instanceof JobImpl) - ((JobImpl)j).madeReady(); - - if (toAdd == null) toAdd = new ArrayList(4); - toAdd.add(j); - _timedJobs.remove(i); - i--; // so the index stays consistent - } - } - } - - int ready = 0; - synchronized (_readyJobs) { - if (toAdd != null) { - // rather than addAll, which allocs a byte array rv before adding, - // we iterate, since toAdd is usually going to only be 1 or 2 entries - // and since readyJobs will often have the space, we can avoid the - // extra alloc. (no, i'm not just being insane - i'm updating this based - // on some profiling data ;) - for (int i = 0; i < toAdd.size(); i++) - _readyJobs.add(toAdd.get(i)); - } - ready = _readyJobs.size(); - } - - return ready; - } - /** * Start up the queue with the specified number of concurrent processors. * If this method has already been called, it will adjust the number of @@ -384,7 +343,6 @@ public class JobQueue { void removeRunner(int id) { _queueRunners.remove(new Integer(id)); } - /** * Notify a sufficient number of waiting runners, and if necessary, increase * the number of runners (up to maxRunners) @@ -406,30 +364,56 @@ public class JobQueue { * */ private final class QueuePumper implements Runnable, Clock.ClockUpdateListener { - private long _lastLimitUpdated; public QueuePumper() { - _lastLimitUpdated = 0; _context.clock().addUpdateListener(this); } public void run() { try { while (_alive) { - // periodically update our max runners limit long now = _context.clock().now(); - if (now > _lastLimitUpdated + MAX_LIMIT_UPDATE_DELAY) { - if (_log.shouldLog(Log.INFO)) - _log.info("Updating the limits"); - updateMaxLimit(); - updateTimingLimits(); - _lastLimitUpdated = now; + long timeToWait = 0; + ArrayList toAdd = null; + synchronized (_timedJobs) { + for (int i = 0; i < _timedJobs.size(); i++) { + Job j = (Job)_timedJobs.get(i); + // find jobs due to start before now + long timeLeft = j.getTiming().getStartAfter() - now; + if (timeLeft <= 0) { + if (j instanceof JobImpl) + ((JobImpl)j).madeReady(); + + if (toAdd == null) toAdd = new ArrayList(4); + toAdd.add(j); + _timedJobs.remove(i); + i--; // so the index stays consistent + } else { + if ( (timeToWait <= 0) || (timeLeft < timeToWait) ) + timeToWait = timeLeft; + } + } + if (toAdd == null) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Waiting " + timeToWait + " before rechecking the timed queue"); + try { + _timedJobs.wait(timeToWait); + } catch (InterruptedException ie) {} + } } + - // turn timed jobs into ready jobs - int numMadeReady = checkJobTimings(); - - awaken(numMadeReady); - - try { Thread.sleep(500); } catch (InterruptedException ie) {} + if (toAdd != null) { + synchronized (_readyJobs) { + // rather than addAll, which allocs a byte array rv before adding, + // we iterate, since toAdd is usually going to only be 1 or 2 entries + // and since readyJobs will often have the space, we can avoid the + // extra alloc. (no, i'm not just being insane - i'm updating this based + // on some profiling data ;) + for (int i = 0; i < toAdd.size(); i++) + _readyJobs.add(toAdd.get(i)); + } + + awaken(toAdd.size()); + } } } catch (Throwable t) { _context.clock().removeUpdateListener(this); @@ -439,9 +423,10 @@ public class JobQueue { } public void offsetChanged(long delta) { - if (_lastLimitUpdated > 0) - _lastLimitUpdated += delta; updateJobTimings(delta); + synchronized (_timedJobs) { + _timedJobs.notifyAll(); + } } } @@ -530,130 +515,6 @@ public class JobQueue { } } - //// - // update config params - //// - - /** - * Update the max number of job queue runners - * - */ - private void updateMaxLimit() { - if (_context.router() == null) { - _maxRunners = DEFAULT_MAX_RUNNERS; - return; - } - String str = _context.router().getConfigSetting(PROP_MAX_RUNNERS); - if (str != null) { - try { - _maxRunners = Integer.parseInt(str); - return; - } catch (NumberFormatException nfe) { - _log.error("Invalid maximum job runners [" + str + "]"); - } - } - if (_log.shouldLog(Log.INFO)) - _log.info("Defaulting the maximum job runners to " + DEFAULT_MAX_RUNNERS); - _maxRunners = DEFAULT_MAX_RUNNERS; - } - - /** - * Update the job lag and run threshold for warnings and fatalities, as well - * as the warmup time before which fatalities will be ignored - * - */ - private void updateTimingLimits() { - if (_context.router() == null) { - _lagWarning = DEFAULT_LAG_WARNING; - _lagFatal = DEFAULT_LAG_FATAL; - _runWarning = DEFAULT_RUN_WARNING; - _runFatal = DEFAULT_RUN_FATAL; - _warmupTime = DEFAULT_WARMUP_TIME; - _maxWaitingJobs = DEFAULT_MAX_WAITING_JOBS; - return; - } - String str = _context.router().getConfigSetting(PROP_LAG_WARNING); - if (str != null) { - try { - _lagWarning = Integer.parseInt(str); - } catch (NumberFormatException nfe) { - _log.error("Invalid job lag warning [" + str + "]"); - _lagWarning = DEFAULT_LAG_WARNING; - } - } else { - _lagWarning = DEFAULT_LAG_WARNING; - } - if (_log.shouldLog(Log.INFO)) - _log.info("Setting the warning job lag time to " + _lagWarning + "ms"); - - str = _context.router().getConfigSetting(PROP_LAG_FATAL); - if (str != null) { - try { - _lagFatal = Integer.parseInt(str); - } catch (NumberFormatException nfe) { - _log.error("Invalid job lag fatal [" + str + "]"); - _lagFatal = DEFAULT_LAG_FATAL; - } - } else { - _lagFatal = DEFAULT_LAG_FATAL; - } - if (_log.shouldLog(Log.INFO)) - _log.info("Setting the fatal job lag time to " + _lagFatal + "ms"); - - str = _context.router().getConfigSetting(PROP_RUN_WARNING); - if (str != null) { - try { - _runWarning = Integer.parseInt(str); - } catch (NumberFormatException nfe) { - _log.error("Invalid job run warning [" + str + "]"); - _runWarning = DEFAULT_RUN_WARNING; - } - } else { - _runWarning = DEFAULT_RUN_WARNING; - } - if (_log.shouldLog(Log.INFO)) - _log.info("Setting the warning job run time to " + _runWarning + "ms"); - - str = _context.router().getConfigSetting(PROP_RUN_FATAL); - if (str != null) { - try { - _runFatal = Integer.parseInt(str); - } catch (NumberFormatException nfe) { - _log.error("Invalid job run fatal [" + str + "]"); - _runFatal = DEFAULT_RUN_FATAL; - } - } else { - _runFatal = DEFAULT_RUN_FATAL; - } - if (_log.shouldLog(Log.INFO)) - _log.info("Setting the fatal job run time to " + _runFatal + "ms"); - - str = _context.router().getConfigSetting(PROP_WARMUM_TIME); - if (str != null) { - try { - _warmupTime = Integer.parseInt(str); - } catch (NumberFormatException nfe) { - _log.error("Invalid warmup time [" + str + "]"); - _warmupTime = DEFAULT_WARMUP_TIME; - } - } else { - _warmupTime = DEFAULT_WARMUP_TIME; - } - - str = _context.router().getConfigSetting(PROP_MAX_WAITING_JOBS); - if (str != null) { - try { - _maxWaitingJobs = Integer.parseInt(str); - } catch (NumberFormatException nfe) { - _log.error("Invalid max waiting jobs [" + str + "]"); - _maxWaitingJobs = DEFAULT_MAX_WAITING_JOBS; - } - } else { - _maxWaitingJobs = DEFAULT_MAX_WAITING_JOBS; - } - if (_log.shouldLog(Log.INFO)) - _log.info("Setting the max waiting jobs to " + _maxWaitingJobs); - } //// // the remainder are utility methods for dumping status info