2004-10-09 jrandom

* Added a watchdog timer to do some baseline liveliness checking to help
      debug some odd errors.
    * Added a pair of summary stats for bandwidth usage, allowing easy export
      with the other stats ("bw.sendBps" and "bw.receiveBps")
    * Trimmed another memory allocation on message reception.
This commit is contained in:
jrandom
2004-10-10 00:03:25 +00:00
committed by zzz
parent 65676f8988
commit 8dc261da79
15 changed files with 183 additions and 14 deletions

View File

@ -1,4 +1,11 @@
$Id: history.txt,v 1.40 2004/10/08 13:38:49 jrandom Exp $
$Id: history.txt,v 1.41 2004/10/08 17:53:03 jrandom Exp $
2004-10-09 jrandom
* Added a watchdog timer to do some baseline liveliness checking to help
debug some odd errors.
* Added a pair of summary stats for bandwidth usage, allowing easy export
with the other stats ("bw.sendBps" and "bw.receiveBps")
* Trimmed another memory allocation on message reception.
2004-10-08 jrandom
* Revamp the AESInputStream so it doesn't allocate any temporary objects

View File

@ -29,11 +29,12 @@ public interface I2NPMessage extends DataStructure {
* @param in stream to read from
* @param type I2NP message type
* @param buffer scratch buffer to be used when reading and parsing
* @return size of the message read (including headers)
* @throws I2NPMessageException if the stream doesn't contain a valid message
* that this class can read.
* @throws IOException if there is a problem reading from the stream
*/
public void readBytes(InputStream in, int type, byte buffer[]) throws I2NPMessageException, IOException;
public int readBytes(InputStream in, int type, byte buffer[]) throws I2NPMessageException, IOException;
/**
* Read the body into the data structures, after the initial type byte and

View File

@ -26,11 +26,13 @@ public class I2NPMessageHandler {
private I2PAppContext _context;
private long _lastReadBegin;
private long _lastReadEnd;
private int _lastSize;
private byte _messageBuffer[];
public I2NPMessageHandler(I2PAppContext context) {
_context = context;
_log = context.logManager().getLog(I2NPMessageHandler.class);
_messageBuffer = null;
_lastSize = -1;
}
/**
@ -49,7 +51,7 @@ public class I2NPMessageHandler {
if (msg == null)
throw new I2NPMessageException("The type "+ type + " is an unknown I2NP message");
try {
msg.readBytes(in, type, _messageBuffer);
_lastSize = msg.readBytes(in, type, _messageBuffer);
} catch (IOException ioe) {
throw ioe;
} catch (I2NPMessageException ime) {
@ -67,6 +69,7 @@ public class I2NPMessageHandler {
}
}
public long getLastReadTime() { return _lastReadEnd - _lastReadBegin; }
public int getLastSize() { return _lastSize; }
/**
* Yes, this is fairly ugly, but its the only place it ever happens.

View File

@ -51,7 +51,7 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM
throw new DataFormatException("Bad bytes", ime);
}
}
public void readBytes(InputStream in, int type, byte buffer[]) throws I2NPMessageException, IOException {
public int readBytes(InputStream in, int type, byte buffer[]) throws I2NPMessageException, IOException {
try {
if (type < 0)
type = (int)DataHelper.readLong(in, 1);
@ -85,6 +85,7 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM
long time = _context.clock().now() - start;
if (time > 50)
_context.statManager().addRateData("i2np.readTime", time, time);
return size + Hash.HASH_LENGTH + 1 + 4 + DataHelper.DATE_LENGTH;
} catch (DataFormatException dfe) {
throw new I2NPMessageException("Error reading the message header", dfe);
}

View File

@ -81,7 +81,7 @@ public class I2NPMessageReader {
* reader
*
*/
public void messageReceived(I2NPMessageReader reader, I2NPMessage message, long msToRead);
public void messageReceived(I2NPMessageReader reader, I2NPMessage message, long msToRead, int bytesRead);
/**
* Notify the listener that an exception was thrown while reading from the given
* reader
@ -122,7 +122,8 @@ public class I2NPMessageReader {
I2NPMessage msg = _handler.readMessage(_stream);
if (msg != null) {
long msToRead = _handler.getLastReadTime();
_listener.messageReceived(I2NPMessageReader.this, msg, msToRead);
int bytesRead = _handler.getLastSize();
_listener.messageReceived(I2NPMessageReader.this, msg, msToRead, bytesRead);
}
} catch (I2NPMessageException ime) {
if (_log.shouldLog(Log.WARN))

View File

@ -65,6 +65,8 @@ public abstract class ClientManagerFacade implements Service {
public abstract void messageReceived(ClientMessage msg);
public boolean verifyClientLiveliness() { return true; }
/**
* Return the client's current config, or null if not connected
*

View File

@ -289,6 +289,37 @@ public class JobQueue {
}
boolean isAlive() { return _alive; }
/**
* When did the most recently begin job start?
*/
public long getLastJobBegin() {
long when = -1;
// not synchronized, so might b0rk if the runners are changed
for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext(); ) {
long cur = ((JobQueueRunner)iter.next()).getLastBegin();
if (cur > when)
cur = when;
}
return when;
}
/**
* retrieve the most recently begin and still currently active job, or null if
* no jobs are running
*/
public Job getLastJob() {
Job j = null;
long when = -1;
// not synchronized, so might b0rk if the runners are changed
for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext(); ) {
JobQueueRunner cur = (JobQueueRunner)iter.next();
if (cur.getLastBegin() > when) {
j = cur.getCurrentJob();
when = cur.getLastBegin();
}
}
return j;
}
/**
* Blocking call to retrieve the next ready job
*

View File

@ -11,6 +11,7 @@ class JobQueueRunner implements Runnable {
private long _numJobs;
private Job _currentJob;
private Job _lastJob;
private long _lastBegin;
public JobQueueRunner(RouterContext context, int id) {
_context = context;
@ -31,6 +32,7 @@ class JobQueueRunner implements Runnable {
public int getRunnerId() { return _id; }
public void stopRunning() { _keepRunning = false; }
public void startRunning() { _keepRunning = true; }
public long getLastBegin() { return _lastBegin; }
public void run() {
long lastActive = _context.clock().now();
long jobNum = 0;
@ -103,6 +105,7 @@ class JobQueueRunner implements Runnable {
private void runCurrentJob() {
try {
_lastBegin = _context.clock().now();
_currentJob.runJob();
} catch (OutOfMemoryError oom) {
try {

View File

@ -127,6 +127,11 @@ public class Router {
_gracefulShutdownDetector.setName("Graceful shutdown hook");
_gracefulShutdownDetector.start();
I2PThread watchdog = new I2PThread(new RouterWatchdog(_context));
watchdog.setName("RouterWatchdog");
watchdog.setDaemon(true);
watchdog.start();
_shutdownTasks = new HashSet(0);
}
@ -260,10 +265,35 @@ public class Router {
*
*/
private final class CoallesceStatsJob extends JobImpl {
public CoallesceStatsJob() { super(Router.this._context); }
public CoallesceStatsJob() {
super(Router.this._context);
Router.this._context.statManager().createRateStat("bw.receiveBps", "How fast we receive data", "Bandwidth", new long[] { 60*1000, 5*60*1000, 60*60*1000 });
Router.this._context.statManager().createRateStat("bw.sendBps", "How fast we send data", "Bandwidth", new long[] { 60*1000, 5*60*1000, 60*60*1000 });
}
public String getName() { return "Coallesce stats"; }
public void runJob() {
Router.this._context.statManager().coallesceStats();
RateStat receiveRate = _context.statManager().getRate("transport.receiveMessageSize");
if (receiveRate != null) {
Rate rate = receiveRate.getRate(60*1000);
if (rate != null) {
double bytes = rate.getLastTotalValue();
double bps = (bytes*1000.0d)/(rate.getPeriod()*1024.0d);
Router.this._context.statManager().addRateData("bw.receiveBps", (long)bps, 60*1000);
}
}
RateStat sendRate = _context.statManager().getRate("transport.sendMessageSize");
if (sendRate != null) {
Rate rate = receiveRate.getRate(60*1000);
if (rate != null) {
double bytes = rate.getLastTotalValue();
double bps = (bytes*1000.0d)/(rate.getPeriod()*1024.0d);
Router.this._context.statManager().addRateData("bw.sendBps", (long)bps, 60*1000);
}
}
requeue(60*1000);
}
}

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.48 $ $Date: 2004/10/08 13:38:48 $";
public final static String ID = "$Revision: 1.49 $ $Date: 2004/10/08 17:53:04 $";
public final static String VERSION = "0.4.1.1";
public final static long BUILD = 14;
public final static long BUILD = 15;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -0,0 +1,67 @@
package net.i2p.router;
import net.i2p.data.DataHelper;
import net.i2p.util.Log;
/**
* Periodically check to make sure things haven't gone totally haywire (and if
* they have, restart the JVM)
*
*/
class RouterWatchdog implements Runnable {
private Log _log;
private RouterContext _context;
private static final long MAX_JOB_RUN_LAG = 60*1000;
public RouterWatchdog(RouterContext ctx) {
_context = ctx;
_log = ctx.logManager().getLog(RouterWatchdog.class);
}
public boolean verifyJobQueueLiveliness() {
long when = _context.jobQueue().getLastJobBegin();
if (when < 0)
return true;
long howLongAgo = _context.clock().now() - when;
if (howLongAgo > MAX_JOB_RUN_LAG) {
Job cur = _context.jobQueue().getLastJob();
if (cur != null) {
if (_log.shouldLog(Log.ERROR))
_log.error("Last job was queued up " + DataHelper.formatDuration(howLongAgo)
+ " ago: " + cur);
return false;
} else {
// no prob, just normal lag
return true;
}
} else {
return true;
}
}
public boolean verifyClientLiveliness() {
return _context.clientManager().verifyClientLiveliness();
}
private boolean shutdownOnHang() {
return true;
}
public void run() {
while (true) {
try { Thread.sleep(60*1000); } catch (InterruptedException ie) {}
monitorRouter();
}
}
public void monitorRouter() {
boolean ok = verifyJobQueueLiveliness();
ok = ok && verifyClientLiveliness();
if (!ok && shutdownOnHang()) {
_log.log(Log.CRIT, "Router hung! hard restart!");
System.exit(Router.EXIT_HARD_RESTART);
}
}
}

View File

@ -234,7 +234,7 @@ public class ClientManager {
return false;
}
private ClientConnectionRunner getRunner(Destination dest) {
ClientConnectionRunner getRunner(Destination dest) {
ClientConnectionRunner rv = null;
long beforeLock = _context.clock().now();
long inLock = 0;
@ -300,7 +300,7 @@ public class ClientManager {
}
}
private Set getRunnerDestinations() {
Set getRunnerDestinations() {
Set dests = new HashSet();
long beforeLock = _context.clock().now();
long inLock = 0;

View File

@ -10,7 +10,9 @@ package net.i2p.router.client;
import java.io.IOException;
import java.io.Writer;
import java.util.Iterator;
import net.i2p.data.DataHelper;
import net.i2p.data.Destination;
import net.i2p.data.Hash;
import net.i2p.data.LeaseSet;
@ -68,6 +70,27 @@ public class ClientManagerFacadeImpl extends ClientManagerFacade {
startup();
}
private static final long MAX_TIME_TO_REBUILD = 5*60*1000;
public boolean verifyClientLiveliness() {
boolean lively = true;
for (Iterator iter = _manager.getRunnerDestinations().iterator(); iter.hasNext(); ) {
Destination dest = (Destination)iter.next();
ClientConnectionRunner runner = _manager.getRunner(dest);
if ( (runner == null) || (runner.getIsDead())) continue;
LeaseSet ls = runner.getLeaseSet();
if (ls == null)
continue; // still building
long howLongAgo = _context.clock().now() - ls.getEarliestLeaseDate();
if (howLongAgo > MAX_TIME_TO_REBUILD) {
if (_log.shouldLog(Log.ERROR))
_log.error("Client " + dest.calculateHash().toBase64().substring(0,6)
+ " has a leaseSet that expired " + DataHelper.formatDuration(howLongAgo));
lively = false;
}
}
return lively;
}
/**
* Request that a particular client authorize the Leases contained in the
* LeaseSet, after which the onCreateJob is queued up. If that doesn't occur

View File

@ -29,12 +29,12 @@ public class MessageHandler implements I2NPMessageReader.I2NPMessageEventListene
_con.closeConnection();
}
public void messageReceived(I2NPMessageReader reader, I2NPMessage message, long msToRead) {
public void messageReceived(I2NPMessageReader reader, I2NPMessage message, long msToRead, int size) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Just received message " + message.getUniqueId() + " from "
+ _identHash.toBase64().substring(0,6)
+ " readTime = " + msToRead + "ms type = " + message.getClass().getName());
_transport.messageReceived(message, _ident, _identHash, msToRead, message.getMessageSize());
_transport.messageReceived(message, _ident, _identHash, msToRead, size);
}
public void readError(I2NPMessageReader reader, Exception error) {

View File

@ -69,7 +69,7 @@ class I2NPMessageReaderTest implements I2NPMessageReader.I2NPMessageEventListene
_log.debug("Disconnected");
}
public void messageReceived(I2NPMessageReader reader, I2NPMessage message, long msToRead) {
public void messageReceived(I2NPMessageReader reader, I2NPMessage message, long msToRead, int size) {
_log.debug("Message received: " + message);
}