diff --git a/router/java/src/net/i2p/router/JobQueue.java b/router/java/src/net/i2p/router/JobQueue.java
index 3420e3b24..147a20bf0 100644
--- a/router/java/src/net/i2p/router/JobQueue.java
+++ b/router/java/src/net/i2p/router/JobQueue.java
@@ -13,6 +13,8 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.ArrayList;
import java.util.TreeMap;
+import java.util.SortedMap;
+import java.util.Collections;
import net.i2p.router.message.HandleSourceRouteReplyMessageJob;
import net.i2p.router.networkdb.HandleDatabaseLookupMessageJob;
@@ -43,7 +45,7 @@ public class JobQueue {
/** when true, don't run any new jobs or update any limits, etc */
private boolean _paused;
/** job name to JobStat for that job */
- private TreeMap _jobStats;
+ private SortedMap _jobStats;
/** how many job queue runners can go concurrently */
private int _maxRunners;
private QueuePumper _pumper;
@@ -116,7 +118,7 @@ public class JobQueue {
_timedJobs = new ArrayList();
_queueRunners = new HashMap();
_paused = false;
- _jobStats = new TreeMap();
+ _jobStats = Collections.synchronizedSortedMap(new TreeMap());
_allowParallelOperation = false;
_pumper = new QueuePumper();
I2PThread pumperThread = new I2PThread(_pumper);
@@ -436,13 +438,15 @@ public class JobQueue {
MessageHistory hist = _context.messageHistory();
long uptime = _context.router().getUptime();
- synchronized (_jobStats) {
- if (!_jobStats.containsKey(key))
- _jobStats.put(key, new JobStats(key));
- JobStats stats = (JobStats)_jobStats.get(key);
-
- stats.jobRan(duration, lag);
+ JobStats stats = null;
+ if (!_jobStats.containsKey(key)) {
+ _jobStats.put(key, new JobStats(key));
+ // yes, if two runners finish the same job at the same time, this could
+ // create an extra object. but, who cares, its pushed out of the map
+ // immediately anyway.
}
+ stats = (JobStats)_jobStats.get(key);
+ stats.jobRan(duration, lag);
String dieMsg = null;
@@ -599,15 +603,20 @@ public class JobQueue {
ArrayList readyJobs = null;
ArrayList timedJobs = null;
ArrayList activeJobs = new ArrayList(4);
+ ArrayList justFinishedJobs = new ArrayList(4);
synchronized (_readyJobs) { readyJobs = new ArrayList(_readyJobs); }
synchronized (_timedJobs) { timedJobs = new ArrayList(_timedJobs); }
synchronized (_queueRunners) {
for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext();) {
JobQueueRunner runner = (JobQueueRunner)iter.next();
Job job = runner.getCurrentJob();
- if (job != null)
+ if (job != null) {
activeJobs.add(job.getName());
+ } else {
+ job = runner.getLastJob();
+ justFinishedJobs.add(job.getName());
}
+ }
}
StringBuffer buf = new StringBuffer(20*1024);
buf.append("
JobQueue
");
@@ -621,6 +630,11 @@ public class JobQueue {
buf.append("").append(activeJobs.get(i)).append("\n");
}
buf.append("\n");
+ buf.append("# just finished jobs: ").append(justFinishedJobs.size()).append("\n");
+ for (int i = 0; i < justFinishedJobs.size(); i++) {
+ buf.append("- ").append(justFinishedJobs.get(i)).append("
\n");
+ }
+ buf.append("
\n");
buf.append("# ready/waiting jobs: ").append(readyJobs.size()).append(" (lots of these mean there's likely a big problem)\n");
for (int i = 0; i < readyJobs.size(); i++) {
buf.append("- ").append(readyJobs.get(i)).append("
\n");
@@ -662,7 +676,7 @@ public class JobQueue {
TreeMap tstats = null;
synchronized (_jobStats) {
- tstats = (TreeMap)_jobStats.clone();
+ tstats = new TreeMap(_jobStats);
}
for (Iterator iter = tstats.values().iterator(); iter.hasNext(); ) {
diff --git a/router/java/src/net/i2p/router/JobQueueRunner.java b/router/java/src/net/i2p/router/JobQueueRunner.java
index 17f7656df..15694db47 100644
--- a/router/java/src/net/i2p/router/JobQueueRunner.java
+++ b/router/java/src/net/i2p/router/JobQueueRunner.java
@@ -12,6 +12,7 @@ class JobQueueRunner implements Runnable {
private int _id;
private long _numJobs;
private Job _currentJob;
+ private Job _lastJob;
public JobQueueRunner(RouterContext context, int id) {
_context = context;
@@ -19,6 +20,7 @@ class JobQueueRunner implements Runnable {
_keepRunning = true;
_numJobs = 0;
_currentJob = null;
+ _lastJob = null;
_log = _context.logManager().getLog(JobQueueRunner.class);
_context.statManager().createRateStat("jobQueue.jobRun", "How long jobs take", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("jobQueue.jobLag", "How long jobs have to wait before running", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
@@ -27,6 +29,7 @@ class JobQueueRunner implements Runnable {
}
public Job getCurrentJob() { return _currentJob; }
+ public Job getLastJob() { return _lastJob; }
public int getRunnerId() { return _id; }
public void stopRunning() { _keepRunning = false; }
public void run() {
@@ -51,6 +54,7 @@ class JobQueueRunner implements Runnable {
long betweenJobs = now - lastActive;
_context.statManager().addRateData("jobQueue.jobRunnerInactive", betweenJobs, betweenJobs);
_currentJob = job;
+ _lastJob = null;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Runner " + _id + " running job " + job.getJobId() + ": " + job.getName());
long origStartAfter = job.getTiming().getStartAfter();
@@ -75,6 +79,7 @@ class JobQueueRunner implements Runnable {
_log.debug("Job duration " + duration + "ms for " + job.getName()
+ " with lag of " + (doStart-origStartAfter) + "ms");
lastActive = _context.clock().now();
+ _lastJob = _currentJob;
_currentJob = null;
} catch (Throwable t) {
if (_log.shouldLog(Log.CRIT))
diff --git a/router/java/src/net/i2p/router/JobStats.java b/router/java/src/net/i2p/router/JobStats.java
index c71479a04..d3cf345d0 100644
--- a/router/java/src/net/i2p/router/JobStats.java
+++ b/router/java/src/net/i2p/router/JobStats.java
@@ -5,71 +5,81 @@ import net.i2p.data.DataHelper;
/** glorified struct to contain basic job stats */
class JobStats {
private String _job;
- private long _numRuns;
- private long _totalTime;
- private long _maxTime;
- private long _minTime;
- private long _totalPendingTime;
- private long _maxPendingTime;
- private long _minPendingTime;
-
+ private volatile long _numRuns;
+ private volatile long _totalTime;
+ private volatile long _maxTime;
+ private volatile long _minTime;
+ private volatile long _totalPendingTime;
+ private volatile long _maxPendingTime;
+ private volatile long _minPendingTime;
+
public JobStats(String name) {
- _job = name;
- _numRuns = 0;
- _totalTime = 0;
- _maxTime = -1;
- _minTime = -1;
- _totalPendingTime = 0;
- _maxPendingTime = -1;
- _minPendingTime = -1;
+ _job = name;
+ _numRuns = 0;
+ _totalTime = 0;
+ _maxTime = -1;
+ _minTime = -1;
+ _totalPendingTime = 0;
+ _maxPendingTime = -1;
+ _minPendingTime = -1;
}
-
+
public void jobRan(long runTime, long lag) {
- _numRuns++;
- _totalTime += runTime;
- if ( (_maxTime < 0) || (runTime > _maxTime) )
- _maxTime = runTime;
- if ( (_minTime < 0) || (runTime < _minTime) )
- _minTime = runTime;
- _totalPendingTime += lag;
- if ( (_maxPendingTime < 0) || (lag > _maxPendingTime) )
- _maxPendingTime = lag;
- if ( (_minPendingTime < 0) || (lag < _minPendingTime) )
- _minPendingTime = lag;
+ _numRuns++;
+ _totalTime += runTime;
+ if ( (_maxTime < 0) || (runTime > _maxTime) )
+ _maxTime = runTime;
+ if ( (_minTime < 0) || (runTime < _minTime) )
+ _minTime = runTime;
+ _totalPendingTime += lag;
+ if ( (_maxPendingTime < 0) || (lag > _maxPendingTime) )
+ _maxPendingTime = lag;
+ if ( (_minPendingTime < 0) || (lag < _minPendingTime) )
+ _minPendingTime = lag;
}
-
+
public String getName() { return _job; }
public long getRuns() { return _numRuns; }
public long getTotalTime() { return _totalTime; }
public long getMaxTime() { return _maxTime; }
public long getMinTime() { return _minTime; }
- public long getAvgTime() { if (_numRuns > 0) return _totalTime / _numRuns; else return 0; }
+ public long getAvgTime() {
+ if (_numRuns > 0)
+ return _totalTime / _numRuns;
+ else
+ return 0;
+ }
public long getTotalPendingTime() { return _totalPendingTime; }
public long getMaxPendingTime() { return _maxPendingTime; }
public long getMinPendingTime() { return _minPendingTime; }
- public long getAvgPendingTime() { if (_numRuns > 0) return _totalPendingTime / _numRuns; else return 0; }
-
+ public long getAvgPendingTime() {
+ if (_numRuns > 0)
+ return _totalPendingTime / _numRuns;
+ else
+ return 0;
+ }
+
public int hashCode() { return _job.hashCode(); }
public boolean equals(Object obj) {
- if ( (obj != null) && (obj instanceof JobStats) ) {
- JobStats stats = (JobStats)obj;
- return DataHelper.eq(getName(), stats.getName()) &&
- getRuns() == stats.getRuns() &&
- getTotalTime() == stats.getTotalTime() &&
- getMaxTime() == stats.getMaxTime() &&
- getMinTime() == stats.getMinTime();
- } else {
- return false;
- }
+ if ( (obj != null) && (obj instanceof JobStats) ) {
+ JobStats stats = (JobStats)obj;
+ return DataHelper.eq(getName(), stats.getName()) &&
+ getRuns() == stats.getRuns() &&
+ getTotalTime() == stats.getTotalTime() &&
+ getMaxTime() == stats.getMaxTime() &&
+ getMinTime() == stats.getMinTime();
+ } else {
+ return false;
+ }
}
-
- public String toString() {
- StringBuffer buf = new StringBuffer();
- buf.append("Over ").append(getRuns()).append(" runs, job ").append(getName()).append(" took ");
- buf.append(getTotalTime()).append("ms (").append(getAvgTime()).append("ms/").append(getMaxTime()).append("ms/");
- buf.append(getMinTime()).append("ms avg/max/min) after a total lag of ");
- buf.append(getTotalPendingTime()).append("ms (").append(getAvgPendingTime()).append("ms/");
- buf.append(getMaxPendingTime()).append("ms/").append(getMinPendingTime()).append("ms avg/max/min)");
- return buf.toString();
+
+ public String toString() {
+ StringBuffer buf = new StringBuffer();
+ buf.append("Over ").append(getRuns()).append(" runs, job ").append(getName()).append(" took ");
+ buf.append(getTotalTime()).append("ms (").append(getAvgTime()).append("ms/").append(getMaxTime()).append("ms/");
+ buf.append(getMinTime()).append("ms avg/max/min) after a total lag of ");
+ buf.append(getTotalPendingTime()).append("ms (").append(getAvgPendingTime()).append("ms/");
+ buf.append(getMaxPendingTime()).append("ms/").append(getMinPendingTime()).append("ms avg/max/min)");
+ return buf.toString();
}
}