diff --git a/router/java/src/net/i2p/router/JobQueue.java b/router/java/src/net/i2p/router/JobQueue.java index fa3465a5c8..d261387d5f 100644 --- a/router/java/src/net/i2p/router/JobQueue.java +++ b/router/java/src/net/i2p/router/JobQueue.java @@ -86,7 +86,7 @@ public class JobQueue { /** max ready and waiting jobs before we start dropping 'em */ private int _maxWaitingJobs = DEFAULT_MAX_WAITING_JOBS; - private final static int DEFAULT_MAX_WAITING_JOBS = 20; + private final static int DEFAULT_MAX_WAITING_JOBS = 100; private final static String PROP_MAX_WAITING_JOBS = "router.maxWaitingJobs"; /** @@ -117,7 +117,7 @@ public class JobQueue { I2PThread pumperThread = new I2PThread(_pumper); pumperThread.setDaemon(true); pumperThread.setName("QueuePumper"); - pumperThread.setPriority(I2PThread.MIN_PRIORITY); + //pumperThread.setPriority(I2PThread.MIN_PRIORITY); pumperThread.start(); } @@ -286,12 +286,12 @@ public class JobQueue { return rv; } else { if (_log.shouldLog(Log.DEBUG)) - _log.debug("No jobs pending, waiting a second"); + _log.debug("No jobs pending, waiting"); } try { synchronized (_runnerLock) { - _runnerLock.wait(1000); + _runnerLock.wait(); } } catch (InterruptedException ie) {} } @@ -367,6 +367,7 @@ public class JobQueue { _queueRunners.put(new Integer(i), runner); Thread t = new I2PThread(runner); t.setName("JobQueue"+(_runnerId++)); + //t.setPriority(I2PThread.MAX_PRIORITY-1); t.setDaemon(false); t.start(); } @@ -661,7 +662,7 @@ public class JobQueue { public String renderStatusHTML() { ArrayList readyJobs = null; ArrayList timedJobs = null; - ArrayList activeJobs = new ArrayList(4); + ArrayList activeJobs = new ArrayList(1); ArrayList justFinishedJobs = new ArrayList(4); synchronized (_readyJobs) { readyJobs = new ArrayList(_readyJobs); } synchronized (_timedJobs) { timedJobs = new ArrayList(_timedJobs); } @@ -670,10 +671,10 @@ public class JobQueue { JobQueueRunner runner = (JobQueueRunner)iter.next(); Job job = runner.getCurrentJob(); if (job != null) { - activeJobs.add(job.getName()); + activeJobs.add(job); } else { job = runner.getLastJob(); - justFinishedJobs.add(job.getName()); + justFinishedJobs.add(job); } } } @@ -684,19 +685,28 @@ public class JobQueue { buf.append(_queueRunners.size()); } buf.append("
\n"); + + long now = _context.clock().now(); + buf.append("# active jobs: ").append(activeJobs.size()).append("
    \n"); for (int i = 0; i < activeJobs.size(); i++) { - buf.append("
  1. ").append(activeJobs.get(i)).append("
  2. \n"); + Job j = (Job)activeJobs.get(i); + buf.append("
  3. [started ").append(now-j.getTiming().getStartAfter()).append("ms ago]: "); + buf.append(j.toString()).append("
  4. \n"); } buf.append("
\n"); buf.append("# just finished jobs: ").append(justFinishedJobs.size()).append("
    \n"); for (int i = 0; i < justFinishedJobs.size(); i++) { - buf.append("
  1. ").append(justFinishedJobs.get(i)).append("
  2. \n"); + Job j = (Job)justFinishedJobs.get(i); + buf.append("
  3. [finished ").append(now-j.getTiming().getActualEnd()).append("ms ago]: "); + buf.append(j.toString()).append("
  4. \n"); } buf.append("
\n"); buf.append("# ready/waiting jobs: ").append(readyJobs.size()).append(" (lots of these mean there's likely a big problem)
    \n"); for (int i = 0; i < readyJobs.size(); i++) { - buf.append("
  1. ").append(readyJobs.get(i)).append("
  2. \n"); + Job j = (Job)readyJobs.get(i); + buf.append("
  3. [waiting ").append(now-j.getTiming().getStartAfter()).append("ms]: "); + buf.append(j.toString()).append("
  4. \n"); } buf.append("
\n"); diff --git a/router/java/src/net/i2p/router/admin/AdminListener.java b/router/java/src/net/i2p/router/admin/AdminListener.java index 77cc818761..92b8bbcc39 100644 --- a/router/java/src/net/i2p/router/admin/AdminListener.java +++ b/router/java/src/net/i2p/router/admin/AdminListener.java @@ -97,7 +97,7 @@ public class AdminListener implements Runnable { AdminRunner runner = new AdminRunner(_context, socket); I2PThread t = new I2PThread(runner); t.setName("Admin Runner"); - t.setPriority(Thread.MIN_PRIORITY); + //t.setPriority(Thread.MIN_PRIORITY); t.setDaemon(true); t.start(); } diff --git a/router/java/src/net/i2p/router/admin/AdminManager.java b/router/java/src/net/i2p/router/admin/AdminManager.java index 0d916bcbf7..39ddfa0b70 100644 --- a/router/java/src/net/i2p/router/admin/AdminManager.java +++ b/router/java/src/net/i2p/router/admin/AdminManager.java @@ -48,7 +48,7 @@ public class AdminManager implements Service { I2PThread t = new I2PThread(_listener); t.setName("Admin Listener:" + port); t.setDaemon(true); - t.setPriority(Thread.MIN_PRIORITY); + //t.setPriority(Thread.MIN_PRIORITY); t.start(); } } diff --git a/router/java/src/net/i2p/router/message/HandleTunnelMessageJob.java b/router/java/src/net/i2p/router/message/HandleTunnelMessageJob.java index d426f6c35a..d40ee21551 100644 --- a/router/java/src/net/i2p/router/message/HandleTunnelMessageJob.java +++ b/router/java/src/net/i2p/router/message/HandleTunnelMessageJob.java @@ -31,6 +31,7 @@ import net.i2p.router.ClientMessage; import net.i2p.router.InNetMessage; import net.i2p.router.JobImpl; import net.i2p.router.MessageReceptionInfo; +import net.i2p.router.Router; import net.i2p.router.RouterContext; import net.i2p.router.TunnelInfo; import net.i2p.util.Log; @@ -64,18 +65,25 @@ public class HandleTunnelMessageJob extends JobImpl { TunnelId id = _message.getTunnelId(); long excessLag = _context.clock().now() - _message.getMessageExpiration().getTime(); - if (excessLag > 0) { + if (excessLag > Router.CLOCK_FUDGE_FACTOR) { // expired while on the queue if (_log.shouldLog(Log.WARN)) _log.warn("Accepted message (" + _message.getUniqueId() + ") expired on the queue for tunnel " + id.getTunnelId() + " expiring " - + (_context.clock().now() - _message.getMessageExpiration().getTime()) + + excessLag + "ms ago"); _context.statManager().addRateData("tunnel.expiredAfterAcceptTime", excessLag, excessLag); _context.messageHistory().messageProcessingError(_message.getUniqueId(), TunnelMessage.class.getName(), "tunnel message expired on the queue"); return; + } else if (excessLag > 0) { + // almost expired while on the queue + if (_log.shouldLog(Log.WARN)) + _log.warn("Accepted message (" + _message.getUniqueId() + ") *almost* expired on the queue for tunnel " + + id.getTunnelId() + " expiring " + + excessLag + + "ms ago"); } TunnelInfo info = _context.tunnelManager().getTunnelInfo(id); diff --git a/router/java/src/net/i2p/router/transport/tcp/RestrictiveTCPConnection.java b/router/java/src/net/i2p/router/transport/tcp/RestrictiveTCPConnection.java index 47ec67d2fd..e555ceac5b 100644 --- a/router/java/src/net/i2p/router/transport/tcp/RestrictiveTCPConnection.java +++ b/router/java/src/net/i2p/router/transport/tcp/RestrictiveTCPConnection.java @@ -224,7 +224,7 @@ class RestrictiveTCPConnection extends TCPConnection { I2PThread sockCreator = new I2PThread(creator); sockCreator.setDaemon(true); sockCreator.setName("PeerCallback:" + _transport.getListenPort()); - sockCreator.setPriority(I2PThread.MIN_PRIORITY); + //sockCreator.setPriority(I2PThread.MIN_PRIORITY); sockCreator.start(); if (_log.shouldLog(Log.DEBUG)) diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java index b5e49a601f..bfa3811809 100644 --- a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java +++ b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java @@ -237,7 +237,7 @@ public class TCPTransport extends TransportImpl { I2PThread sockCreator = new I2PThread(creator); sockCreator.setDaemon(true); sockCreator.setName("SocketCreator_:" + _listenPort); - sockCreator.setPriority(I2PThread.MIN_PRIORITY); + //sockCreator.setPriority(I2PThread.MIN_PRIORITY); sockCreator.start(); try { @@ -536,7 +536,7 @@ public class TCPTransport extends TransportImpl { String lifetime = null; for (int i = 0; i < curCons.size(); i++) { TCPConnection con = (TCPConnection)curCons.get(i); - if (con.getLifetime() > 0) { + if (con.getLifetime() > 30*1000) { established++; lifetime = DataHelper.formatDuration(con.getLifetime()); } @@ -545,6 +545,7 @@ public class TCPTransport extends TransportImpl { buf.append(lifetime); else buf.append("[pending]"); + buf.append("\n"); } buf.append("\n");