diff --git a/apps/routerconsole/java/src/net/i2p/router/web/JobQueueHelper.java b/apps/routerconsole/java/src/net/i2p/router/web/JobQueueHelper.java index 6593717a5e..e270bf7ffc 100644 --- a/apps/routerconsole/java/src/net/i2p/router/web/JobQueueHelper.java +++ b/apps/routerconsole/java/src/net/i2p/router/web/JobQueueHelper.java @@ -140,11 +140,13 @@ public class JobQueueHelper extends HelperBase { private void getJobStats(StringBuilder buf) { buf.append("\n" + "" + + "" + "" + "\n"); long totRuns = 0; + long totDropped = 0; long totExecTime = 0; long avgExecTime = 0; long maxExecTime = -1; @@ -161,6 +163,7 @@ public class JobQueueHelper extends HelperBase { buf.append(""); buf.append(""); buf.append(""); + buf.append(""); buf.append(""); buf.append(""); buf.append(""); @@ -171,6 +174,7 @@ public class JobQueueHelper extends HelperBase { buf.append(""); buf.append("\n"); totRuns += stats.getRuns(); + totDropped += stats.getDropped(); totExecTime += stats.getTotalTime(); if (stats.getMaxTime() > maxExecTime) maxExecTime = stats.getMaxTime(); @@ -193,6 +197,7 @@ public class JobQueueHelper extends HelperBase { buf.append(""); buf.append(""); buf.append(""); + buf.append(""); buf.append(""); buf.append(""); buf.append(""); diff --git a/history.txt b/history.txt index 8e67a9ea3b..ee12a696c1 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,9 @@ +2015-03-15 zzz + * Job Queue: + - Fix overload dropping + - Add drop count to job stats + - Decrease overload threshold again + 2015-03-13 zzz * i2psnark: - Auto-reduce tunnel quantity based on peer count diff --git a/router/java/src/net/i2p/router/JobQueue.java b/router/java/src/net/i2p/router/JobQueue.java index 1b85adcfb6..f25b907fe6 100644 --- a/router/java/src/net/i2p/router/JobQueue.java +++ b/router/java/src/net/i2p/router/JobQueue.java @@ -51,7 +51,7 @@ public class JobQueue { /** SortedSet of jobs that are scheduled for running in the future, earliest first */ private final Set _timedJobs; /** job name to JobStat for that job */ - private final Map _jobStats; + private final ConcurrentHashMap _jobStats; private final QueuePumper _pumper; /** will we allow the # job runners to grow beyond 1? */ private volatile boolean _allowParallelOperation; @@ -113,8 +113,8 @@ public class JobQueue { /** max ready and waiting jobs before we start dropping 'em */ private int _maxWaitingJobs = DEFAULT_MAX_WAITING_JOBS; - private final static int DEFAULT_MAX_WAITING_JOBS = 50; - private final static long MIN_LAG_TO_DROP = 1000; + private final static int DEFAULT_MAX_WAITING_JOBS = 25; + private final static long MIN_LAG_TO_DROP = 500; /** @deprecated unimplemented */ private final static String PROP_MAX_WAITING_JOBS = "router.maxWaitingJobs"; @@ -218,6 +218,15 @@ public class JobQueue { _context.statManager().addRateData("jobQueue.droppedJobs", 1); _log.logAlways(Log.WARN, "Dropping job due to overload! # ready jobs: " + numReady + ": job = " + job); + String key = job.getName(); + JobStats stats = _jobStats.get(key); + if (stats == null) { + stats = new JobStats(key); + JobStats old = _jobStats.putIfAbsent(key, stats); + if (old != null) + stats = old; + } + stats.jobDropped(); } } @@ -292,14 +301,23 @@ public class JobQueue { // // Garlic added in 0.9.19, floodfills were getting overloaded // with encrypted lookups + // + // Obviously we can only drop one-shot jobs, not those that requeue + // if (cls == HandleFloodfillDatabaseLookupMessageJob.class || cls == HandleGarlicMessageJob.class) { - JobTiming jt = job.getTiming(); - if (jt != null) { - long lag = _context.clock().now() - jt.getStartAfter(); - if (lag >= MIN_LAG_TO_DROP) - return true; - } + // this tail drops based on the lag at the tail, which + // makes no sense... + //JobTiming jt = job.getTiming(); + //if (jt != null) { + // long lag = _context.clock().now() - jt.getStartAfter(); + // if (lag >= MIN_LAG_TO_DROP) + // return true; + //} + + // this tail drops based on the lag at the head + if (getMaxLag() >= MIN_LAG_TO_DROP) + return true; } } return false; @@ -611,10 +629,9 @@ public class JobQueue { JobStats stats = _jobStats.get(key); if (stats == null) { stats = new JobStats(key); - _jobStats.put(key, stats); - // yes, if two runners finish the same job at the same time, this could - // create an extra object. but, who cares, its pushed out of the map - // immediately anyway. + JobStats old = _jobStats.putIfAbsent(key, stats); + if (old != null) + stats = old; } stats.jobRan(duration, lag); diff --git a/router/java/src/net/i2p/router/JobStats.java b/router/java/src/net/i2p/router/JobStats.java index 8e6b222d1c..2fd008b6d6 100644 --- a/router/java/src/net/i2p/router/JobStats.java +++ b/router/java/src/net/i2p/router/JobStats.java @@ -12,10 +12,11 @@ import net.i2p.data.DataHelper; public class JobStats { private final String _job; private final AtomicLong _numRuns = new AtomicLong(); - private volatile long _totalTime; + private final AtomicLong _numDropped = new AtomicLong(); + private final AtomicLong _totalTime = new AtomicLong(); private volatile long _maxTime; private volatile long _minTime; - private volatile long _totalPendingTime; + private final AtomicLong _totalPendingTime = new AtomicLong(); private volatile long _maxPendingTime; private volatile long _minPendingTime; @@ -29,43 +30,52 @@ public class JobStats { public void jobRan(long runTime, long lag) { _numRuns.incrementAndGet(); - _totalTime += runTime; + _totalTime.addAndGet(runTime); if ( (_maxTime < 0) || (runTime > _maxTime) ) _maxTime = runTime; if ( (_minTime < 0) || (runTime < _minTime) ) _minTime = runTime; - _totalPendingTime += lag; + _totalPendingTime.addAndGet(lag); if ( (_maxPendingTime < 0) || (lag > _maxPendingTime) ) _maxPendingTime = lag; if ( (_minPendingTime < 0) || (lag < _minPendingTime) ) _minPendingTime = lag; } + /** @since 0.9.19 */ + public void jobDropped() { + _numDropped.incrementAndGet(); + } + + /** @since 0.9.19 */ + public long getDropped() { return _numDropped.get(); } + public String getName() { return _job; } public long getRuns() { return _numRuns.get(); } - public long getTotalTime() { return _totalTime; } + public long getTotalTime() { return _totalTime.get(); } public long getMaxTime() { return _maxTime; } public long getMinTime() { return _minTime; } public long getAvgTime() { long numRuns = _numRuns.get(); if (numRuns > 0) - return _totalTime / numRuns; + return _totalTime.get() / numRuns; else return 0; } - public long getTotalPendingTime() { return _totalPendingTime; } + public long getTotalPendingTime() { return _totalPendingTime.get(); } public long getMaxPendingTime() { return _maxPendingTime; } public long getMinPendingTime() { return _minPendingTime; } public long getAvgPendingTime() { long numRuns = _numRuns.get(); if (numRuns > 0) - return _totalPendingTime / numRuns; + return _totalPendingTime.get() / numRuns; else return 0; } @Override public int hashCode() { return _job.hashCode(); } + @Override public boolean equals(Object obj) { if ( (obj != null) && (obj instanceof JobStats) ) { diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 282c18b422..0725033fa6 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 2; + public final static long BUILD = 3; /** for example "-test" */ public final static String EXTRA = "";
").append(_("Job")).append("").append(_("Runs")).append("").append(_("Dropped")).append("").append(_("Time")).append("").append(_("Avg")).append("") .append(_("Max")).append("").append(_("Min")).append("").append(_("Pending")).append("").append(_("Avg")).append("") .append(_("Max")).append("").append(_("Min")).append("
").append(stats.getName()).append("").append(stats.getRuns()).append("").append(stats.getDropped()).append("").append(DataHelper.formatDuration2(stats.getTotalTime())).append("").append(DataHelper.formatDuration2(stats.getAvgTime())).append("").append(DataHelper.formatDuration2(stats.getMaxTime())).append("").append(DataHelper.formatDuration2(stats.getMinPendingTime())).append("
").append(_("Summary")).append("").append(totRuns).append("").append(totDropped).append("").append(DataHelper.formatDuration2(totExecTime)).append("").append(DataHelper.formatDuration2(avgExecTime)).append("").append(DataHelper.formatDuration2(maxExecTime)).append("