Job Queue:

- Fix overload dropping
   - Add drop count to job stats
   - Decrease overload threshold again
   - Concurrent tweaks
This commit is contained in:
zzz
2015-03-15 11:48:12 +00:00
parent 4c6aaa32b6
commit 76cf80a3d0
5 changed files with 60 additions and 22 deletions

View File

@ -140,11 +140,13 @@ public class JobQueueHelper extends HelperBase {
private void getJobStats(StringBuilder buf) {
buf.append("<table>\n" +
"<tr><th>").append(_("Job")).append("</th><th>").append(_("Runs")).append("</th>" +
"<th>").append(_("Dropped")).append("</th>" +
"<th>").append(_("Time")).append("</th><th><i>").append(_("Avg")).append("</i></th><th><i>")
.append(_("Max")).append("</i></th><th><i>").append(_("Min")).append("</i></th>" +
"<th>").append(_("Pending")).append("</th><th><i>").append(_("Avg")).append("</i></th><th><i>")
.append(_("Max")).append("</i></th><th><i>").append(_("Min")).append("</i></th></tr>\n");
long totRuns = 0;
long totDropped = 0;
long totExecTime = 0;
long avgExecTime = 0;
long maxExecTime = -1;
@ -161,6 +163,7 @@ public class JobQueueHelper extends HelperBase {
buf.append("<tr>");
buf.append("<td><b>").append(stats.getName()).append("</b></td>");
buf.append("<td align=\"right\">").append(stats.getRuns()).append("</td>");
buf.append("<td align=\"right\">").append(stats.getDropped()).append("</td>");
buf.append("<td align=\"right\">").append(DataHelper.formatDuration2(stats.getTotalTime())).append("</td>");
buf.append("<td align=\"right\">").append(DataHelper.formatDuration2(stats.getAvgTime())).append("</td>");
buf.append("<td align=\"right\">").append(DataHelper.formatDuration2(stats.getMaxTime())).append("</td>");
@ -171,6 +174,7 @@ public class JobQueueHelper extends HelperBase {
buf.append("<td align=\"right\">").append(DataHelper.formatDuration2(stats.getMinPendingTime())).append("</td>");
buf.append("</tr>\n");
totRuns += stats.getRuns();
totDropped += stats.getDropped();
totExecTime += stats.getTotalTime();
if (stats.getMaxTime() > maxExecTime)
maxExecTime = stats.getMaxTime();
@ -193,6 +197,7 @@ public class JobQueueHelper extends HelperBase {
buf.append("<tr class=\"tablefooter\">");
buf.append("<td><b>").append(_("Summary")).append("</b></td>");
buf.append("<td align=\"right\">").append(totRuns).append("</td>");
buf.append("<td align=\"right\">").append(totDropped).append("</td>");
buf.append("<td align=\"right\">").append(DataHelper.formatDuration2(totExecTime)).append("</td>");
buf.append("<td align=\"right\">").append(DataHelper.formatDuration2(avgExecTime)).append("</td>");
buf.append("<td align=\"right\">").append(DataHelper.formatDuration2(maxExecTime)).append("</td>");

View File

@ -1,3 +1,9 @@
2015-03-15 zzz
* Job Queue:
- Fix overload dropping
- Add drop count to job stats
- Decrease overload threshold again
2015-03-13 zzz
* i2psnark:
- Auto-reduce tunnel quantity based on peer count

View File

@ -51,7 +51,7 @@ public class JobQueue {
/** SortedSet of jobs that are scheduled for running in the future, earliest first */
private final Set<Job> _timedJobs;
/** job name to JobStat for that job */
private final Map<String, JobStats> _jobStats;
private final ConcurrentHashMap<String, JobStats> _jobStats;
private final QueuePumper _pumper;
/** will we allow the # job runners to grow beyond 1? */
private volatile boolean _allowParallelOperation;
@ -113,8 +113,8 @@ public class JobQueue {
/** max ready and waiting jobs before we start dropping 'em */
private int _maxWaitingJobs = DEFAULT_MAX_WAITING_JOBS;
private final static int DEFAULT_MAX_WAITING_JOBS = 50;
private final static long MIN_LAG_TO_DROP = 1000;
private final static int DEFAULT_MAX_WAITING_JOBS = 25;
private final static long MIN_LAG_TO_DROP = 500;
/** @deprecated unimplemented */
private final static String PROP_MAX_WAITING_JOBS = "router.maxWaitingJobs";
@ -218,6 +218,15 @@ public class JobQueue {
_context.statManager().addRateData("jobQueue.droppedJobs", 1);
_log.logAlways(Log.WARN, "Dropping job due to overload! # ready jobs: "
+ numReady + ": job = " + job);
String key = job.getName();
JobStats stats = _jobStats.get(key);
if (stats == null) {
stats = new JobStats(key);
JobStats old = _jobStats.putIfAbsent(key, stats);
if (old != null)
stats = old;
}
stats.jobDropped();
}
}
@ -292,14 +301,23 @@ public class JobQueue {
//
// Garlic added in 0.9.19, floodfills were getting overloaded
// with encrypted lookups
//
// Obviously we can only drop one-shot jobs, not those that requeue
//
if (cls == HandleFloodfillDatabaseLookupMessageJob.class ||
cls == HandleGarlicMessageJob.class) {
JobTiming jt = job.getTiming();
if (jt != null) {
long lag = _context.clock().now() - jt.getStartAfter();
if (lag >= MIN_LAG_TO_DROP)
return true;
}
// this tail drops based on the lag at the tail, which
// makes no sense...
//JobTiming jt = job.getTiming();
//if (jt != null) {
// long lag = _context.clock().now() - jt.getStartAfter();
// if (lag >= MIN_LAG_TO_DROP)
// return true;
//}
// this tail drops based on the lag at the head
if (getMaxLag() >= MIN_LAG_TO_DROP)
return true;
}
}
return false;
@ -611,10 +629,9 @@ public class JobQueue {
JobStats stats = _jobStats.get(key);
if (stats == null) {
stats = new JobStats(key);
_jobStats.put(key, stats);
// yes, if two runners finish the same job at the same time, this could
// create an extra object. but, who cares, its pushed out of the map
// immediately anyway.
JobStats old = _jobStats.putIfAbsent(key, stats);
if (old != null)
stats = old;
}
stats.jobRan(duration, lag);

View File

@ -12,10 +12,11 @@ import net.i2p.data.DataHelper;
public class JobStats {
private final String _job;
private final AtomicLong _numRuns = new AtomicLong();
private volatile long _totalTime;
private final AtomicLong _numDropped = new AtomicLong();
private final AtomicLong _totalTime = new AtomicLong();
private volatile long _maxTime;
private volatile long _minTime;
private volatile long _totalPendingTime;
private final AtomicLong _totalPendingTime = new AtomicLong();
private volatile long _maxPendingTime;
private volatile long _minPendingTime;
@ -29,43 +30,52 @@ public class JobStats {
public void jobRan(long runTime, long lag) {
_numRuns.incrementAndGet();
_totalTime += runTime;
_totalTime.addAndGet(runTime);
if ( (_maxTime < 0) || (runTime > _maxTime) )
_maxTime = runTime;
if ( (_minTime < 0) || (runTime < _minTime) )
_minTime = runTime;
_totalPendingTime += lag;
_totalPendingTime.addAndGet(lag);
if ( (_maxPendingTime < 0) || (lag > _maxPendingTime) )
_maxPendingTime = lag;
if ( (_minPendingTime < 0) || (lag < _minPendingTime) )
_minPendingTime = lag;
}
/** @since 0.9.19 */
public void jobDropped() {
_numDropped.incrementAndGet();
}
/** @since 0.9.19 */
public long getDropped() { return _numDropped.get(); }
public String getName() { return _job; }
public long getRuns() { return _numRuns.get(); }
public long getTotalTime() { return _totalTime; }
public long getTotalTime() { return _totalTime.get(); }
public long getMaxTime() { return _maxTime; }
public long getMinTime() { return _minTime; }
public long getAvgTime() {
long numRuns = _numRuns.get();
if (numRuns > 0)
return _totalTime / numRuns;
return _totalTime.get() / numRuns;
else
return 0;
}
public long getTotalPendingTime() { return _totalPendingTime; }
public long getTotalPendingTime() { return _totalPendingTime.get(); }
public long getMaxPendingTime() { return _maxPendingTime; }
public long getMinPendingTime() { return _minPendingTime; }
public long getAvgPendingTime() {
long numRuns = _numRuns.get();
if (numRuns > 0)
return _totalPendingTime / numRuns;
return _totalPendingTime.get() / numRuns;
else
return 0;
}
@Override
public int hashCode() { return _job.hashCode(); }
@Override
public boolean equals(Object obj) {
if ( (obj != null) && (obj instanceof JobStats) ) {

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 2;
public final static long BUILD = 3;
/** for example "-test" */
public final static String EXTRA = "";