From 303e257841e46576c3053d40c6b4c7a07ca8245b Mon Sep 17 00:00:00 2001 From: jrandom Date: Fri, 7 May 2004 17:51:28 +0000 Subject: [PATCH] synchronization reduction and keep track of the 'last' job for each runner (to help debug something i see once a week on kaffe) --- router/java/src/net/i2p/router/JobQueue.java | 34 ++++-- .../src/net/i2p/router/JobQueueRunner.java | 5 + router/java/src/net/i2p/router/JobStats.java | 112 ++++++++++-------- 3 files changed, 90 insertions(+), 61 deletions(-) diff --git a/router/java/src/net/i2p/router/JobQueue.java b/router/java/src/net/i2p/router/JobQueue.java index 3420e3b24..147a20bf0 100644 --- a/router/java/src/net/i2p/router/JobQueue.java +++ b/router/java/src/net/i2p/router/JobQueue.java @@ -13,6 +13,8 @@ import java.util.HashMap; import java.util.Iterator; import java.util.ArrayList; import java.util.TreeMap; +import java.util.SortedMap; +import java.util.Collections; import net.i2p.router.message.HandleSourceRouteReplyMessageJob; import net.i2p.router.networkdb.HandleDatabaseLookupMessageJob; @@ -43,7 +45,7 @@ public class JobQueue { /** when true, don't run any new jobs or update any limits, etc */ private boolean _paused; /** job name to JobStat for that job */ - private TreeMap _jobStats; + private SortedMap _jobStats; /** how many job queue runners can go concurrently */ private int _maxRunners; private QueuePumper _pumper; @@ -116,7 +118,7 @@ public class JobQueue { _timedJobs = new ArrayList(); _queueRunners = new HashMap(); _paused = false; - _jobStats = new TreeMap(); + _jobStats = Collections.synchronizedSortedMap(new TreeMap()); _allowParallelOperation = false; _pumper = new QueuePumper(); I2PThread pumperThread = new I2PThread(_pumper); @@ -436,13 +438,15 @@ public class JobQueue { MessageHistory hist = _context.messageHistory(); long uptime = _context.router().getUptime(); - synchronized (_jobStats) { - if (!_jobStats.containsKey(key)) - _jobStats.put(key, new JobStats(key)); - JobStats stats = (JobStats)_jobStats.get(key); - - stats.jobRan(duration, lag); + JobStats stats = null; + if (!_jobStats.containsKey(key)) { + _jobStats.put(key, new JobStats(key)); + // yes, if two runners finish the same job at the same time, this could + // create an extra object. but, who cares, its pushed out of the map + // immediately anyway. } + stats = (JobStats)_jobStats.get(key); + stats.jobRan(duration, lag); String dieMsg = null; @@ -599,15 +603,20 @@ public class JobQueue { ArrayList readyJobs = null; ArrayList timedJobs = null; ArrayList activeJobs = new ArrayList(4); + ArrayList justFinishedJobs = new ArrayList(4); synchronized (_readyJobs) { readyJobs = new ArrayList(_readyJobs); } synchronized (_timedJobs) { timedJobs = new ArrayList(_timedJobs); } synchronized (_queueRunners) { for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext();) { JobQueueRunner runner = (JobQueueRunner)iter.next(); Job job = runner.getCurrentJob(); - if (job != null) + if (job != null) { activeJobs.add(job.getName()); + } else { + job = runner.getLastJob(); + justFinishedJobs.add(job.getName()); } + } } StringBuffer buf = new StringBuffer(20*1024); buf.append("

JobQueue

"); @@ -621,6 +630,11 @@ public class JobQueue { buf.append("
  • ").append(activeJobs.get(i)).append("
  • \n"); } buf.append("\n"); + buf.append("# just finished jobs: ").append(justFinishedJobs.size()).append("
      \n"); + for (int i = 0; i < justFinishedJobs.size(); i++) { + buf.append("
    1. ").append(justFinishedJobs.get(i)).append("
    2. \n"); + } + buf.append("
    \n"); buf.append("# ready/waiting jobs: ").append(readyJobs.size()).append(" (lots of these mean there's likely a big problem)
      \n"); for (int i = 0; i < readyJobs.size(); i++) { buf.append("
    1. ").append(readyJobs.get(i)).append("
    2. \n"); @@ -662,7 +676,7 @@ public class JobQueue { TreeMap tstats = null; synchronized (_jobStats) { - tstats = (TreeMap)_jobStats.clone(); + tstats = new TreeMap(_jobStats); } for (Iterator iter = tstats.values().iterator(); iter.hasNext(); ) { diff --git a/router/java/src/net/i2p/router/JobQueueRunner.java b/router/java/src/net/i2p/router/JobQueueRunner.java index 17f7656df..15694db47 100644 --- a/router/java/src/net/i2p/router/JobQueueRunner.java +++ b/router/java/src/net/i2p/router/JobQueueRunner.java @@ -12,6 +12,7 @@ class JobQueueRunner implements Runnable { private int _id; private long _numJobs; private Job _currentJob; + private Job _lastJob; public JobQueueRunner(RouterContext context, int id) { _context = context; @@ -19,6 +20,7 @@ class JobQueueRunner implements Runnable { _keepRunning = true; _numJobs = 0; _currentJob = null; + _lastJob = null; _log = _context.logManager().getLog(JobQueueRunner.class); _context.statManager().createRateStat("jobQueue.jobRun", "How long jobs take", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); _context.statManager().createRateStat("jobQueue.jobLag", "How long jobs have to wait before running", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); @@ -27,6 +29,7 @@ class JobQueueRunner implements Runnable { } public Job getCurrentJob() { return _currentJob; } + public Job getLastJob() { return _lastJob; } public int getRunnerId() { return _id; } public void stopRunning() { _keepRunning = false; } public void run() { @@ -51,6 +54,7 @@ class JobQueueRunner implements Runnable { long betweenJobs = now - lastActive; _context.statManager().addRateData("jobQueue.jobRunnerInactive", betweenJobs, betweenJobs); _currentJob = job; + _lastJob = null; if (_log.shouldLog(Log.DEBUG)) _log.debug("Runner " + _id + " running job " + job.getJobId() + ": " + job.getName()); long origStartAfter = job.getTiming().getStartAfter(); @@ -75,6 +79,7 @@ class JobQueueRunner implements Runnable { _log.debug("Job duration " + duration + "ms for " + job.getName() + " with lag of " + (doStart-origStartAfter) + "ms"); lastActive = _context.clock().now(); + _lastJob = _currentJob; _currentJob = null; } catch (Throwable t) { if (_log.shouldLog(Log.CRIT)) diff --git a/router/java/src/net/i2p/router/JobStats.java b/router/java/src/net/i2p/router/JobStats.java index c71479a04..d3cf345d0 100644 --- a/router/java/src/net/i2p/router/JobStats.java +++ b/router/java/src/net/i2p/router/JobStats.java @@ -5,71 +5,81 @@ import net.i2p.data.DataHelper; /** glorified struct to contain basic job stats */ class JobStats { private String _job; - private long _numRuns; - private long _totalTime; - private long _maxTime; - private long _minTime; - private long _totalPendingTime; - private long _maxPendingTime; - private long _minPendingTime; - + private volatile long _numRuns; + private volatile long _totalTime; + private volatile long _maxTime; + private volatile long _minTime; + private volatile long _totalPendingTime; + private volatile long _maxPendingTime; + private volatile long _minPendingTime; + public JobStats(String name) { - _job = name; - _numRuns = 0; - _totalTime = 0; - _maxTime = -1; - _minTime = -1; - _totalPendingTime = 0; - _maxPendingTime = -1; - _minPendingTime = -1; + _job = name; + _numRuns = 0; + _totalTime = 0; + _maxTime = -1; + _minTime = -1; + _totalPendingTime = 0; + _maxPendingTime = -1; + _minPendingTime = -1; } - + public void jobRan(long runTime, long lag) { - _numRuns++; - _totalTime += runTime; - if ( (_maxTime < 0) || (runTime > _maxTime) ) - _maxTime = runTime; - if ( (_minTime < 0) || (runTime < _minTime) ) - _minTime = runTime; - _totalPendingTime += lag; - if ( (_maxPendingTime < 0) || (lag > _maxPendingTime) ) - _maxPendingTime = lag; - if ( (_minPendingTime < 0) || (lag < _minPendingTime) ) - _minPendingTime = lag; + _numRuns++; + _totalTime += runTime; + if ( (_maxTime < 0) || (runTime > _maxTime) ) + _maxTime = runTime; + if ( (_minTime < 0) || (runTime < _minTime) ) + _minTime = runTime; + _totalPendingTime += lag; + if ( (_maxPendingTime < 0) || (lag > _maxPendingTime) ) + _maxPendingTime = lag; + if ( (_minPendingTime < 0) || (lag < _minPendingTime) ) + _minPendingTime = lag; } - + public String getName() { return _job; } public long getRuns() { return _numRuns; } public long getTotalTime() { return _totalTime; } public long getMaxTime() { return _maxTime; } public long getMinTime() { return _minTime; } - public long getAvgTime() { if (_numRuns > 0) return _totalTime / _numRuns; else return 0; } + public long getAvgTime() { + if (_numRuns > 0) + return _totalTime / _numRuns; + else + return 0; + } public long getTotalPendingTime() { return _totalPendingTime; } public long getMaxPendingTime() { return _maxPendingTime; } public long getMinPendingTime() { return _minPendingTime; } - public long getAvgPendingTime() { if (_numRuns > 0) return _totalPendingTime / _numRuns; else return 0; } - + public long getAvgPendingTime() { + if (_numRuns > 0) + return _totalPendingTime / _numRuns; + else + return 0; + } + public int hashCode() { return _job.hashCode(); } public boolean equals(Object obj) { - if ( (obj != null) && (obj instanceof JobStats) ) { - JobStats stats = (JobStats)obj; - return DataHelper.eq(getName(), stats.getName()) && - getRuns() == stats.getRuns() && - getTotalTime() == stats.getTotalTime() && - getMaxTime() == stats.getMaxTime() && - getMinTime() == stats.getMinTime(); - } else { - return false; - } + if ( (obj != null) && (obj instanceof JobStats) ) { + JobStats stats = (JobStats)obj; + return DataHelper.eq(getName(), stats.getName()) && + getRuns() == stats.getRuns() && + getTotalTime() == stats.getTotalTime() && + getMaxTime() == stats.getMaxTime() && + getMinTime() == stats.getMinTime(); + } else { + return false; + } } - - public String toString() { - StringBuffer buf = new StringBuffer(); - buf.append("Over ").append(getRuns()).append(" runs, job ").append(getName()).append(" took "); - buf.append(getTotalTime()).append("ms (").append(getAvgTime()).append("ms/").append(getMaxTime()).append("ms/"); - buf.append(getMinTime()).append("ms avg/max/min) after a total lag of "); - buf.append(getTotalPendingTime()).append("ms (").append(getAvgPendingTime()).append("ms/"); - buf.append(getMaxPendingTime()).append("ms/").append(getMinPendingTime()).append("ms avg/max/min)"); - return buf.toString(); + + public String toString() { + StringBuffer buf = new StringBuffer(); + buf.append("Over ").append(getRuns()).append(" runs, job ").append(getName()).append(" took "); + buf.append(getTotalTime()).append("ms (").append(getAvgTime()).append("ms/").append(getMaxTime()).append("ms/"); + buf.append(getMinTime()).append("ms avg/max/min) after a total lag of "); + buf.append(getTotalPendingTime()).append("ms (").append(getAvgPendingTime()).append("ms/"); + buf.append(getMaxPendingTime()).append("ms/").append(getMinPendingTime()).append("ms avg/max/min)"); + return buf.toString(); } }