leave all threads at base priority (except the client runner, where we push at max)

don't consider a connection valid until it has been up for 30 seconds (so people who are simply establishing connections but whose nats are still messed up get the error)
when dealing with expired after accepted, dont drop unless it expired outside the fudge factor
increase the default maxWaitingJobs to 100, since we can get lots at once (and we dont gobble as much memory as we used to)
also, don't wake up the jobQueueRunner in getNext once a second, instead just let the threads updating the queue notify
This commit is contained in:
jrandom
2004-07-06 14:38:35 +00:00
committed by zzz
parent 49090014cc
commit dca66c8de8
6 changed files with 36 additions and 17 deletions

View File

@ -86,7 +86,7 @@ public class JobQueue {
/** max ready and waiting jobs before we start dropping 'em */ /** max ready and waiting jobs before we start dropping 'em */
private int _maxWaitingJobs = DEFAULT_MAX_WAITING_JOBS; private int _maxWaitingJobs = DEFAULT_MAX_WAITING_JOBS;
private final static int DEFAULT_MAX_WAITING_JOBS = 20; private final static int DEFAULT_MAX_WAITING_JOBS = 100;
private final static String PROP_MAX_WAITING_JOBS = "router.maxWaitingJobs"; private final static String PROP_MAX_WAITING_JOBS = "router.maxWaitingJobs";
/** /**
@ -117,7 +117,7 @@ public class JobQueue {
I2PThread pumperThread = new I2PThread(_pumper); I2PThread pumperThread = new I2PThread(_pumper);
pumperThread.setDaemon(true); pumperThread.setDaemon(true);
pumperThread.setName("QueuePumper"); pumperThread.setName("QueuePumper");
pumperThread.setPriority(I2PThread.MIN_PRIORITY); //pumperThread.setPriority(I2PThread.MIN_PRIORITY);
pumperThread.start(); pumperThread.start();
} }
@ -286,12 +286,12 @@ public class JobQueue {
return rv; return rv;
} else { } else {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("No jobs pending, waiting a second"); _log.debug("No jobs pending, waiting");
} }
try { try {
synchronized (_runnerLock) { synchronized (_runnerLock) {
_runnerLock.wait(1000); _runnerLock.wait();
} }
} catch (InterruptedException ie) {} } catch (InterruptedException ie) {}
} }
@ -367,6 +367,7 @@ public class JobQueue {
_queueRunners.put(new Integer(i), runner); _queueRunners.put(new Integer(i), runner);
Thread t = new I2PThread(runner); Thread t = new I2PThread(runner);
t.setName("JobQueue"+(_runnerId++)); t.setName("JobQueue"+(_runnerId++));
//t.setPriority(I2PThread.MAX_PRIORITY-1);
t.setDaemon(false); t.setDaemon(false);
t.start(); t.start();
} }
@ -661,7 +662,7 @@ public class JobQueue {
public String renderStatusHTML() { public String renderStatusHTML() {
ArrayList readyJobs = null; ArrayList readyJobs = null;
ArrayList timedJobs = null; ArrayList timedJobs = null;
ArrayList activeJobs = new ArrayList(4); ArrayList activeJobs = new ArrayList(1);
ArrayList justFinishedJobs = new ArrayList(4); ArrayList justFinishedJobs = new ArrayList(4);
synchronized (_readyJobs) { readyJobs = new ArrayList(_readyJobs); } synchronized (_readyJobs) { readyJobs = new ArrayList(_readyJobs); }
synchronized (_timedJobs) { timedJobs = new ArrayList(_timedJobs); } synchronized (_timedJobs) { timedJobs = new ArrayList(_timedJobs); }
@ -670,10 +671,10 @@ public class JobQueue {
JobQueueRunner runner = (JobQueueRunner)iter.next(); JobQueueRunner runner = (JobQueueRunner)iter.next();
Job job = runner.getCurrentJob(); Job job = runner.getCurrentJob();
if (job != null) { if (job != null) {
activeJobs.add(job.getName()); activeJobs.add(job);
} else { } else {
job = runner.getLastJob(); job = runner.getLastJob();
justFinishedJobs.add(job.getName()); justFinishedJobs.add(job);
} }
} }
} }
@ -684,19 +685,28 @@ public class JobQueue {
buf.append(_queueRunners.size()); buf.append(_queueRunners.size());
} }
buf.append("<br />\n"); buf.append("<br />\n");
long now = _context.clock().now();
buf.append("# active jobs: ").append(activeJobs.size()).append("<ol>\n"); buf.append("# active jobs: ").append(activeJobs.size()).append("<ol>\n");
for (int i = 0; i < activeJobs.size(); i++) { for (int i = 0; i < activeJobs.size(); i++) {
buf.append("<li>").append(activeJobs.get(i)).append("</li>\n"); Job j = (Job)activeJobs.get(i);
buf.append("<li> [started ").append(now-j.getTiming().getStartAfter()).append("ms ago]: ");
buf.append(j.toString()).append("</li>\n");
} }
buf.append("</ol>\n"); buf.append("</ol>\n");
buf.append("# just finished jobs: ").append(justFinishedJobs.size()).append("<ol>\n"); buf.append("# just finished jobs: ").append(justFinishedJobs.size()).append("<ol>\n");
for (int i = 0; i < justFinishedJobs.size(); i++) { for (int i = 0; i < justFinishedJobs.size(); i++) {
buf.append("<li>").append(justFinishedJobs.get(i)).append("</li>\n"); Job j = (Job)justFinishedJobs.get(i);
buf.append("<li> [finished ").append(now-j.getTiming().getActualEnd()).append("ms ago]: ");
buf.append(j.toString()).append("</li>\n");
} }
buf.append("</ol>\n"); buf.append("</ol>\n");
buf.append("# ready/waiting jobs: ").append(readyJobs.size()).append(" <i>(lots of these mean there's likely a big problem)</i><ol>\n"); buf.append("# ready/waiting jobs: ").append(readyJobs.size()).append(" <i>(lots of these mean there's likely a big problem)</i><ol>\n");
for (int i = 0; i < readyJobs.size(); i++) { for (int i = 0; i < readyJobs.size(); i++) {
buf.append("<li>").append(readyJobs.get(i)).append("</li>\n"); Job j = (Job)readyJobs.get(i);
buf.append("<li> [waiting ").append(now-j.getTiming().getStartAfter()).append("ms]: ");
buf.append(j.toString()).append("</li>\n");
} }
buf.append("</ol>\n"); buf.append("</ol>\n");

View File

@ -97,7 +97,7 @@ public class AdminListener implements Runnable {
AdminRunner runner = new AdminRunner(_context, socket); AdminRunner runner = new AdminRunner(_context, socket);
I2PThread t = new I2PThread(runner); I2PThread t = new I2PThread(runner);
t.setName("Admin Runner"); t.setName("Admin Runner");
t.setPriority(Thread.MIN_PRIORITY); //t.setPriority(Thread.MIN_PRIORITY);
t.setDaemon(true); t.setDaemon(true);
t.start(); t.start();
} }

View File

@ -48,7 +48,7 @@ public class AdminManager implements Service {
I2PThread t = new I2PThread(_listener); I2PThread t = new I2PThread(_listener);
t.setName("Admin Listener:" + port); t.setName("Admin Listener:" + port);
t.setDaemon(true); t.setDaemon(true);
t.setPriority(Thread.MIN_PRIORITY); //t.setPriority(Thread.MIN_PRIORITY);
t.start(); t.start();
} }
} }

View File

@ -31,6 +31,7 @@ import net.i2p.router.ClientMessage;
import net.i2p.router.InNetMessage; import net.i2p.router.InNetMessage;
import net.i2p.router.JobImpl; import net.i2p.router.JobImpl;
import net.i2p.router.MessageReceptionInfo; import net.i2p.router.MessageReceptionInfo;
import net.i2p.router.Router;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.router.TunnelInfo; import net.i2p.router.TunnelInfo;
import net.i2p.util.Log; import net.i2p.util.Log;
@ -64,18 +65,25 @@ public class HandleTunnelMessageJob extends JobImpl {
TunnelId id = _message.getTunnelId(); TunnelId id = _message.getTunnelId();
long excessLag = _context.clock().now() - _message.getMessageExpiration().getTime(); long excessLag = _context.clock().now() - _message.getMessageExpiration().getTime();
if (excessLag > 0) { if (excessLag > Router.CLOCK_FUDGE_FACTOR) {
// expired while on the queue // expired while on the queue
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Accepted message (" + _message.getUniqueId() + ") expired on the queue for tunnel " _log.warn("Accepted message (" + _message.getUniqueId() + ") expired on the queue for tunnel "
+ id.getTunnelId() + " expiring " + id.getTunnelId() + " expiring "
+ (_context.clock().now() - _message.getMessageExpiration().getTime()) + excessLag
+ "ms ago"); + "ms ago");
_context.statManager().addRateData("tunnel.expiredAfterAcceptTime", excessLag, excessLag); _context.statManager().addRateData("tunnel.expiredAfterAcceptTime", excessLag, excessLag);
_context.messageHistory().messageProcessingError(_message.getUniqueId(), _context.messageHistory().messageProcessingError(_message.getUniqueId(),
TunnelMessage.class.getName(), TunnelMessage.class.getName(),
"tunnel message expired on the queue"); "tunnel message expired on the queue");
return; return;
} else if (excessLag > 0) {
// almost expired while on the queue
if (_log.shouldLog(Log.WARN))
_log.warn("Accepted message (" + _message.getUniqueId() + ") *almost* expired on the queue for tunnel "
+ id.getTunnelId() + " expiring "
+ excessLag
+ "ms ago");
} }
TunnelInfo info = _context.tunnelManager().getTunnelInfo(id); TunnelInfo info = _context.tunnelManager().getTunnelInfo(id);

View File

@ -224,7 +224,7 @@ class RestrictiveTCPConnection extends TCPConnection {
I2PThread sockCreator = new I2PThread(creator); I2PThread sockCreator = new I2PThread(creator);
sockCreator.setDaemon(true); sockCreator.setDaemon(true);
sockCreator.setName("PeerCallback:" + _transport.getListenPort()); sockCreator.setName("PeerCallback:" + _transport.getListenPort());
sockCreator.setPriority(I2PThread.MIN_PRIORITY); //sockCreator.setPriority(I2PThread.MIN_PRIORITY);
sockCreator.start(); sockCreator.start();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))

View File

@ -237,7 +237,7 @@ public class TCPTransport extends TransportImpl {
I2PThread sockCreator = new I2PThread(creator); I2PThread sockCreator = new I2PThread(creator);
sockCreator.setDaemon(true); sockCreator.setDaemon(true);
sockCreator.setName("SocketCreator_:" + _listenPort); sockCreator.setName("SocketCreator_:" + _listenPort);
sockCreator.setPriority(I2PThread.MIN_PRIORITY); //sockCreator.setPriority(I2PThread.MIN_PRIORITY);
sockCreator.start(); sockCreator.start();
try { try {
@ -536,7 +536,7 @@ public class TCPTransport extends TransportImpl {
String lifetime = null; String lifetime = null;
for (int i = 0; i < curCons.size(); i++) { for (int i = 0; i < curCons.size(); i++) {
TCPConnection con = (TCPConnection)curCons.get(i); TCPConnection con = (TCPConnection)curCons.get(i);
if (con.getLifetime() > 0) { if (con.getLifetime() > 30*1000) {
established++; established++;
lifetime = DataHelper.formatDuration(con.getLifetime()); lifetime = DataHelper.formatDuration(con.getLifetime());
} }
@ -545,6 +545,7 @@ public class TCPTransport extends TransportImpl {
buf.append(lifetime); buf.append(lifetime);
else else
buf.append("[pending]"); buf.append("[pending]");
buf.append("</li>\n"); buf.append("</li>\n");
} }
buf.append("</ul>\n"); buf.append("</ul>\n");