2004-11-02 jrandom
* Fix for a long standing synchronization bug in the JobQueue (and added some kooky flags to make sure it stays dead) * Update the ministreaming lib to force mode=guaranteed if the default lib is used, and mode=best_effort for all other libs.
This commit is contained in:
@ -89,7 +89,8 @@ public class I2PSocketManagerFactory {
|
||||
if (!opts.containsKey(name))
|
||||
opts.setProperty(name, System.getProperty(name));
|
||||
}
|
||||
if (true) {
|
||||
boolean oldLib = DEFAULT_MANAGER.equals(opts.getProperty(PROP_MANAGER, DEFAULT_MANAGER));
|
||||
if (oldLib) {
|
||||
// for the old streaming lib
|
||||
opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_GUARANTEED);
|
||||
//opts.setProperty("tunnels.depthInbound", "0");
|
||||
|
@ -1,28 +1,51 @@
|
||||
package net.i2p.client.streaming;
|
||||
|
||||
/**
|
||||
*
|
||||
* Usage: StreamSinkTest [(old|new) [#hops [#kb]]]
|
||||
*/
|
||||
public class StreamSinkTest {
|
||||
/* private static String HOST1 = "dev.i2p.net";
|
||||
private static String HOST2 = "dev.i2p.net";
|
||||
private static String PORT1 = "4101";
|
||||
private static String PORT2 = "4501";
|
||||
*/ /* */
|
||||
/*
|
||||
private static String HOST1 = "localhost";
|
||||
private static String HOST2 = "localhost";
|
||||
private static String PORT1 = "7654";
|
||||
private static String PORT2 = "7654";
|
||||
/* */ /*
|
||||
*/
|
||||
private static String HOST1 = "localhost";
|
||||
private static String HOST2 = "localhost";
|
||||
private static String PORT1 = "10001";
|
||||
private static String PORT2 = "11001";
|
||||
*/
|
||||
/* */
|
||||
|
||||
public static void main(String args[]) {
|
||||
boolean old = false;
|
||||
int hops = 0;
|
||||
int kb = 32*1024;
|
||||
if (args.length > 0) {
|
||||
if ("old".equals(args[0]))
|
||||
old = true;
|
||||
}
|
||||
if (args.length > 1) {
|
||||
try {
|
||||
hops = Integer.parseInt(args[1]);
|
||||
} catch (NumberFormatException nfe) {
|
||||
hops = 0;
|
||||
}
|
||||
}
|
||||
if (args.length > 2) {
|
||||
try {
|
||||
kb = Integer.parseInt(args[2]);
|
||||
} catch (NumberFormatException nfe) {
|
||||
kb = 32*1024;
|
||||
}
|
||||
}
|
||||
|
||||
if (!old)
|
||||
System.setProperty(I2PSocketManagerFactory.PROP_MANAGER, I2PSocketManagerFull.class.getName());
|
||||
//System.setProperty("tunnels.depthInbound", "0");
|
||||
System.setProperty("tunnels.depthInbound", ""+hops);
|
||||
|
||||
new Thread(new Runnable() {
|
||||
public void run() {
|
||||
@ -32,10 +55,10 @@ public class StreamSinkTest {
|
||||
|
||||
try { Thread.sleep(60*1000); } catch (Exception e) {}
|
||||
|
||||
//run(256, 10000);
|
||||
//run(256, 1);
|
||||
//run(256, 1000);
|
||||
//run(1024, 10);
|
||||
run(32*1024, 1);
|
||||
//run(4*1024, 10);
|
||||
run(kb, 1);
|
||||
//run(1*1024, 1);
|
||||
//run("/home/jrandom/streamSinkTestDir/clientSink36766.dat", 1);
|
||||
//run(512*1024, 1);
|
||||
|
@ -1,4 +1,10 @@
|
||||
$Id: history.txt,v 1.60 2004/11/01 08:31:31 jrandom Exp $
|
||||
$Id: history.txt,v 1.61 2004/11/02 03:27:56 jrandom Exp $
|
||||
|
||||
2004-11-02 jrandom
|
||||
* Fix for a long standing synchronization bug in the JobQueue (and added
|
||||
some kooky flags to make sure it stays dead)
|
||||
* Update the ministreaming lib to force mode=guaranteed if the default
|
||||
lib is used, and mode=best_effort for all other libs.
|
||||
|
||||
2004-11-02 jrandom
|
||||
* Fixed up the configuration overrides for the streaming socket lib
|
||||
|
@ -153,6 +153,8 @@ public class I2NPMessageReader {
|
||||
return 0;
|
||||
} else {
|
||||
boolean shouldLag = _context.random().nextInt(1000) > size;
|
||||
if (!shouldLag) return 0;
|
||||
|
||||
long readLag = getReadLag();
|
||||
if (readLag > 0) {
|
||||
long lag = _context.random().nextLong(readLag);
|
||||
|
@ -132,7 +132,6 @@ public class JobQueue {
|
||||
if (job instanceof JobImpl)
|
||||
((JobImpl)job).addedToQueue();
|
||||
|
||||
boolean isReady = false;
|
||||
long numReady = 0;
|
||||
boolean alreadyExists = false;
|
||||
synchronized (_readyJobs) {
|
||||
@ -154,7 +153,9 @@ public class JobQueue {
|
||||
+ numReady + ": job = " + job);
|
||||
job.dropped();
|
||||
_context.statManager().addRateData("jobQueue.droppedJobs", 1, 1);
|
||||
awaken(1);
|
||||
synchronized (_readyJobs) {
|
||||
_readyJobs.notifyAll();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
@ -166,7 +167,7 @@ public class JobQueue {
|
||||
((JobImpl)job).madeReady();
|
||||
synchronized (_readyJobs) {
|
||||
_readyJobs.add(job);
|
||||
isReady = true;
|
||||
_readyJobs.notifyAll();
|
||||
}
|
||||
} else {
|
||||
synchronized (_timedJobs) {
|
||||
@ -179,11 +180,6 @@ public class JobQueue {
|
||||
_log.debug("Not adding already enqueued job " + job.getName());
|
||||
}
|
||||
|
||||
if (isReady) {
|
||||
// wake up at most one runner
|
||||
awaken(1);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
@ -246,11 +242,15 @@ public class JobQueue {
|
||||
}
|
||||
synchronized (_readyJobs) {
|
||||
_readyJobs.clear();
|
||||
_readyJobs.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
void shutdown() {
|
||||
_alive = false;
|
||||
synchronized (_readyJobs) {
|
||||
_readyJobs.notifyAll();
|
||||
}
|
||||
if (_log.shouldLog(Log.WARN)) {
|
||||
StringBuffer buf = new StringBuffer(1024);
|
||||
buf.append("current jobs: \n");
|
||||
@ -339,31 +339,13 @@ public class JobQueue {
|
||||
*/
|
||||
Job getNext() {
|
||||
while (_alive) {
|
||||
Job rv = null;
|
||||
int ready = 0;
|
||||
synchronized (_readyJobs) {
|
||||
ready = _readyJobs.size();
|
||||
if (ready > 0)
|
||||
rv = (Job)_readyJobs.remove(0);
|
||||
}
|
||||
if (rv != null) {
|
||||
// we found one, but there may be more, so wake up enough
|
||||
// other runners
|
||||
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("Waking up " + (ready-1) + " job runners (and running one)");
|
||||
//awaken(ready-1);
|
||||
return rv;
|
||||
if (_readyJobs.size() > 0) {
|
||||
return (Job)_readyJobs.remove(0);
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("No jobs pending, waiting");
|
||||
try { _readyJobs.wait(); } catch (InterruptedException ie) {}
|
||||
}
|
||||
|
||||
try {
|
||||
synchronized (_runnerLock) {
|
||||
_runnerLock.wait();
|
||||
}
|
||||
} catch (InterruptedException ie) {}
|
||||
}
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("No longer alive, returning null");
|
||||
@ -415,23 +397,6 @@ public class JobQueue {
|
||||
|
||||
void removeRunner(int id) { _queueRunners.remove(new Integer(id)); }
|
||||
|
||||
/**
|
||||
* Notify a sufficient number of waiting runners, and if necessary, increase
|
||||
* the number of runners (up to maxRunners)
|
||||
*
|
||||
*/
|
||||
private void awaken(int numMadeReady) {
|
||||
// notify a sufficient number of waiting runners
|
||||
//for (int i = 0; i < numMadeReady; i++) {
|
||||
// synchronized (_runnerLock) {
|
||||
// _runnerLock.notify();
|
||||
// }
|
||||
//}
|
||||
synchronized (_runnerLock) {
|
||||
_runnerLock.notify();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Responsible for moving jobs from the timed queue to the ready queue,
|
||||
* adjusting the number of queue runners, as well as periodically updating the
|
||||
@ -480,9 +445,8 @@ public class JobQueue {
|
||||
// on some profiling data ;)
|
||||
for (int i = 0; i < toAdd.size(); i++)
|
||||
_readyJobs.add(toAdd.get(i));
|
||||
_readyJobs.notifyAll();
|
||||
}
|
||||
|
||||
awaken(toAdd.size());
|
||||
} else {
|
||||
if (timeToWait < 100)
|
||||
timeToWait = 100;
|
||||
@ -609,16 +573,15 @@ public class JobQueue {
|
||||
ArrayList justFinishedJobs = new ArrayList(4);
|
||||
out.write("<!-- jobQueue rendering -->\n");
|
||||
out.flush();
|
||||
synchronized (_readyJobs) { readyJobs = new ArrayList(_readyJobs); }
|
||||
out.write("<!-- jobQueue rendering: after readyJobs sync -->\n");
|
||||
out.flush();
|
||||
synchronized (_timedJobs) { timedJobs = new ArrayList(_timedJobs); }
|
||||
out.write("<!-- jobQueue rendering: after timedJobs sync -->\n");
|
||||
out.flush();
|
||||
|
||||
int states[] = null;
|
||||
int numRunners = 0;
|
||||
synchronized (_queueRunners) {
|
||||
for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext();) {
|
||||
states = new int[_queueRunners.size()];
|
||||
int i = 0;
|
||||
for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext(); i++) {
|
||||
JobQueueRunner runner = (JobQueueRunner)iter.next();
|
||||
states[i] = runner.getState();
|
||||
Job job = runner.getCurrentJob();
|
||||
if (job != null) {
|
||||
activeJobs.add(job);
|
||||
@ -630,13 +593,28 @@ public class JobQueue {
|
||||
numRunners = _queueRunners.size();
|
||||
}
|
||||
|
||||
out.write("<!-- jobQueue rendering: after queueRunners sync -->\n");
|
||||
StringBuffer str = new StringBuffer(128);
|
||||
str.append("<!-- after queueRunner sync: states: ");
|
||||
for (int i = 0; states != null && i < states.length; i++)
|
||||
str.append(states[i]).append(" ");
|
||||
str.append(" -->\n");
|
||||
out.write(str.toString());
|
||||
out.flush();
|
||||
|
||||
synchronized (_readyJobs) { readyJobs = new ArrayList(_readyJobs); }
|
||||
out.write("<!-- jobQueue rendering: after readyJobs sync -->\n");
|
||||
out.flush();
|
||||
synchronized (_timedJobs) { timedJobs = new ArrayList(_timedJobs); }
|
||||
out.write("<!-- jobQueue rendering: after timedJobs sync -->\n");
|
||||
out.flush();
|
||||
|
||||
StringBuffer buf = new StringBuffer(32*1024);
|
||||
buf.append("<h2>JobQueue</h2>");
|
||||
buf.append("# runners: ").append(numRunners);
|
||||
buf.append("<br />\n");
|
||||
buf.append("# runners: ").append(numRunners).append(" [states=");
|
||||
if (states != null)
|
||||
for (int i = 0; i < states.length; i++)
|
||||
buf.append(states[i]).append(" ");
|
||||
buf.append("]<br />\n");
|
||||
|
||||
long now = _context.clock().now();
|
||||
|
||||
|
@ -13,6 +13,7 @@ class JobQueueRunner implements Runnable {
|
||||
private Job _lastJob;
|
||||
private long _lastBegin;
|
||||
private long _lastEnd;
|
||||
private int _state;
|
||||
|
||||
public JobQueueRunner(RouterContext context, int id) {
|
||||
_context = context;
|
||||
@ -26,8 +27,11 @@ class JobQueueRunner implements Runnable {
|
||||
_context.statManager().createRateStat("jobQueue.jobLag", "How long jobs have to wait before running", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
_context.statManager().createRateStat("jobQueue.jobWait", "How long does a job sat on the job queue?", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
_context.statManager().createRateStat("jobQueue.jobRunnerInactive", "How long are runners inactive?", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
_state = 1;
|
||||
}
|
||||
|
||||
final int getState() { return _state; }
|
||||
|
||||
public Job getCurrentJob() { return _currentJob; }
|
||||
public Job getLastJob() { return _lastJob; }
|
||||
public int getRunnerId() { return _id; }
|
||||
@ -36,12 +40,16 @@ class JobQueueRunner implements Runnable {
|
||||
public long getLastBegin() { return _lastBegin; }
|
||||
public long getLastEnd() { return _lastEnd; }
|
||||
public void run() {
|
||||
_state = 2;
|
||||
long lastActive = _context.clock().now();
|
||||
long jobNum = 0;
|
||||
while ( (_keepRunning) && (_context.jobQueue().isAlive()) ) {
|
||||
_state = 3;
|
||||
try {
|
||||
Job job = _context.jobQueue().getNext();
|
||||
_state = 4;
|
||||
if (job == null) {
|
||||
_state = 5;
|
||||
if (_context.router().isAlive())
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("getNext returned null - dead?");
|
||||
@ -51,11 +59,14 @@ class JobQueueRunner implements Runnable {
|
||||
|
||||
long enqueuedTime = 0;
|
||||
if (job instanceof JobImpl) {
|
||||
_state = 6;
|
||||
long when = ((JobImpl)job).getMadeReadyOn();
|
||||
if (when <= 0) {
|
||||
_state = 7;
|
||||
_log.error("Job was not made ready?! " + job,
|
||||
new Exception("Not made ready?!"));
|
||||
} else {
|
||||
_state = 8;
|
||||
enqueuedTime = now - when;
|
||||
}
|
||||
}
|
||||
@ -63,16 +74,21 @@ class JobQueueRunner implements Runnable {
|
||||
long betweenJobs = now - lastActive;
|
||||
_currentJob = job;
|
||||
_lastJob = null;
|
||||
_state = 9;
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Runner " + _id + " running job " + job.getJobId() + ": " + job.getName());
|
||||
long origStartAfter = job.getTiming().getStartAfter();
|
||||
long doStart = _context.clock().now();
|
||||
_state = 10;
|
||||
job.getTiming().start();
|
||||
runCurrentJob();
|
||||
job.getTiming().end();
|
||||
_state = 11;
|
||||
long duration = job.getTiming().getActualEnd() - job.getTiming().getActualStart();
|
||||
long beforeUpdate = _context.clock().now();
|
||||
_state = 12;
|
||||
_context.jobQueue().updateStats(job, doStart, origStartAfter, duration);
|
||||
_state = 13;
|
||||
long diff = _context.clock().now() - beforeUpdate;
|
||||
|
||||
_context.statManager().addRateData("jobQueue.jobRunnerInactive", betweenJobs, betweenJobs);
|
||||
@ -80,6 +96,8 @@ class JobQueueRunner implements Runnable {
|
||||
_context.statManager().addRateData("jobQueue.jobLag", doStart - origStartAfter, 0);
|
||||
_context.statManager().addRateData("jobQueue.jobWait", enqueuedTime, enqueuedTime);
|
||||
|
||||
_state = 14;
|
||||
|
||||
if (diff > 100) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Updating statistics for the job took too long [" + diff + "ms]");
|
||||
@ -92,6 +110,7 @@ class JobQueueRunner implements Runnable {
|
||||
_currentJob = null;
|
||||
_lastEnd = lastActive;
|
||||
jobNum++;
|
||||
_state = 15;
|
||||
|
||||
//if ( (jobNum % 10) == 0)
|
||||
// System.gc();
|
||||
@ -100,17 +119,22 @@ class JobQueueRunner implements Runnable {
|
||||
_log.log(Log.CRIT, "WTF, error running?", t);
|
||||
}
|
||||
}
|
||||
_state = 16;
|
||||
if (_context.router().isAlive())
|
||||
if (_log.shouldLog(Log.CRIT))
|
||||
_log.log(Log.CRIT, "Queue runner " + _id + " exiting");
|
||||
_context.jobQueue().removeRunner(_id);
|
||||
_state = 17;
|
||||
}
|
||||
|
||||
private void runCurrentJob() {
|
||||
try {
|
||||
_state = 18;
|
||||
_lastBegin = _context.clock().now();
|
||||
_currentJob.runJob();
|
||||
_state = 19;
|
||||
} catch (OutOfMemoryError oom) {
|
||||
_state = 20;
|
||||
try {
|
||||
if (_log.shouldLog(Log.CRIT))
|
||||
_log.log(Log.CRIT, "Router ran out of memory, shutting down", oom);
|
||||
@ -122,6 +146,7 @@ class JobQueueRunner implements Runnable {
|
||||
try { Thread.sleep(1000); } catch (InterruptedException ie) {}
|
||||
System.exit(-1);
|
||||
} catch (Throwable t) {
|
||||
_state = 21;
|
||||
if (_log.shouldLog(Log.CRIT))
|
||||
_log.log(Log.CRIT, "Error processing job [" + _currentJob.getName()
|
||||
+ "] on thread " + _id + ": " + t.getMessage(), t);
|
||||
|
@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
|
||||
*
|
||||
*/
|
||||
public class RouterVersion {
|
||||
public final static String ID = "$Revision: 1.66 $ $Date: 2004/11/01 08:31:30 $";
|
||||
public final static String ID = "$Revision: 1.67 $ $Date: 2004/11/02 03:27:56 $";
|
||||
public final static String VERSION = "0.4.1.3";
|
||||
public final static long BUILD = 7;
|
||||
public final static long BUILD = 8;
|
||||
public static void main(String args[]) {
|
||||
System.out.println("I2P Router version: " + VERSION);
|
||||
System.out.println("Router ID: " + RouterVersion.ID);
|
||||
|
Reference in New Issue
Block a user