big ol' memory, cpu usage, and shutdown handling update. main changes include:

* rather than have all jobs created hooked into the clock for offset updates, have the jobQueue stay hooked up and update any active jobs accordingly (killing a memory leak of a JobTiming objects - one per job)
* dont go totally insane during shutdown and log like mad (though the clientApp things still log like mad, since they don't know the router is going down)
* adjust memory buffer sizes based on real world values so we don't have to expand/contract a lot
* dont display things that are completely useless (who cares what the first 32 bytes of a public key are?)
* reduce temporary object creation
* use more efficient collections at times
* on shutdown, log some state information (ready/timed jobs, pending messages, etc)
* explicit GC every 10 jobs.  yeah, not efficient, but just for now we'll keep 'er in there
* only reread the router config file if it changes (duh)
This commit is contained in:
jrandom
2004-05-16 04:54:50 +00:00
committed by zzz
parent 8c6bf5a1cc
commit ff0023a889
32 changed files with 195 additions and 99 deletions

View File

@ -121,6 +121,8 @@ public class I2PTunnelRunner extends I2PThread {
} catch (IOException ex) { } catch (IOException ex) {
ex.printStackTrace(); ex.printStackTrace();
_log.debug("Error forwarding", ex); _log.debug("Error forwarding", ex);
} catch (Exception e) {
_log.error("Internal error", e);
} finally { } finally {
try { try {
if (s != null) s.close(); if (s != null) s.close();

View File

@ -6,7 +6,7 @@ public class ByteCollector {
int size; int size;
public ByteCollector() { public ByteCollector() {
contents = new byte[80]; contents = new byte[1024];
size = 0; size = 0;
} }

View File

@ -69,10 +69,10 @@ public class PrivateKey extends DataStructureImpl {
buf.append("null key"); buf.append("null key");
} else { } else {
buf.append("size: ").append(_data.length); buf.append("size: ").append(_data.length);
int len = 32; //int len = 32;
if (len > _data.length) len = _data.length; //if (len > _data.length) len = _data.length;
buf.append(" first ").append(len).append(" bytes: "); //buf.append(" first ").append(len).append(" bytes: ");
buf.append(DataHelper.toString(_data, len)); //buf.append(DataHelper.toString(_data, len));
} }
buf.append("]"); buf.append("]");
return buf.toString(); return buf.toString();

View File

@ -68,10 +68,10 @@ public class PublicKey extends DataStructureImpl {
buf.append("null key"); buf.append("null key");
} else { } else {
buf.append("size: ").append(_data.length); buf.append("size: ").append(_data.length);
int len = 32; //int len = 32;
if (len > _data.length) len = _data.length; //if (len > _data.length) len = _data.length;
buf.append(" first ").append(len).append(" bytes: "); //buf.append(" first ").append(len).append(" bytes: ");
buf.append(DataHelper.toString(_data, len)); //buf.append(DataHelper.toString(_data, len));
} }
buf.append("]"); buf.append("]");
return buf.toString(); return buf.toString();

View File

@ -422,7 +422,7 @@ public class RouterInfo extends DataStructureImpl {
public String toString() { public String toString() {
if (_stringified != null) return _stringified; if (_stringified != null) return _stringified;
StringBuffer buf = new StringBuffer(128); StringBuffer buf = new StringBuffer(5*1024);
buf.append("[RouterInfo: "); buf.append("[RouterInfo: ");
buf.append("\n\tIdentity: ").append(getIdentity()); buf.append("\n\tIdentity: ").append(getIdentity());
buf.append("\n\tSignature: ").append(getSignature()); buf.append("\n\tSignature: ").append(getSignature());

View File

@ -67,10 +67,10 @@ public class SessionKey extends DataStructureImpl {
buf.append("null key"); buf.append("null key");
} else { } else {
buf.append("size: ").append(_data.length); buf.append("size: ").append(_data.length);
int len = 32; //int len = 32;
if (len > _data.length) len = _data.length; //if (len > _data.length) len = _data.length;
buf.append(" first ").append(len).append(" bytes: "); //buf.append(" first ").append(len).append(" bytes: ");
buf.append(DataHelper.toString(_data, len)); //buf.append(DataHelper.toString(_data, len));
} }
buf.append("]"); buf.append("]");
return buf.toString(); return buf.toString();

View File

@ -73,10 +73,10 @@ public class Signature extends DataStructureImpl {
buf.append("null signature"); buf.append("null signature");
} else { } else {
buf.append("size: ").append(_data.length); buf.append("size: ").append(_data.length);
int len = 32; //int len = 32;
if (len > _data.length) len = _data.length; //if (len > _data.length) len = _data.length;
buf.append(" first ").append(len).append(" bytes: "); //buf.append(" first ").append(len).append(" bytes: ");
buf.append(DataHelper.toString(_data, len)); //buf.append(DataHelper.toString(_data, len));
} }
buf.append("]"); buf.append("]");
return buf.toString(); return buf.toString();

View File

@ -69,10 +69,10 @@ public class SigningPrivateKey extends DataStructureImpl {
buf.append("null key"); buf.append("null key");
} else { } else {
buf.append("size: ").append(_data.length); buf.append("size: ").append(_data.length);
int len = 32; //int len = 32;
if (len > _data.length) len = _data.length; //if (len > _data.length) len = _data.length;
buf.append(" first ").append(len).append(" bytes: "); //buf.append(" first ").append(len).append(" bytes: ");
buf.append(DataHelper.toString(_data, len)); //buf.append(DataHelper.toString(_data, len));
} }
buf.append("]"); buf.append("]");
return buf.toString(); return buf.toString();

View File

@ -69,10 +69,10 @@ public class SigningPublicKey extends DataStructureImpl {
buf.append("null key"); buf.append("null key");
} else { } else {
buf.append("size: ").append(_data.length); buf.append("size: ").append(_data.length);
int len = 32; //int len = 32;
if (len > _data.length) len = _data.length; //if (len > _data.length) len = _data.length;
buf.append(" first ").append(len).append(" bytes: "); //buf.append(" first ").append(len).append(" bytes: ");
buf.append(DataHelper.toString(_data, len)); //buf.append(DataHelper.toString(_data, len));
} }
buf.append("]"); buf.append("]");
return buf.toString(); return buf.toString();

View File

@ -350,7 +350,7 @@ public class Rate {
} }
public void store(OutputStream out, String prefix) throws IOException { public void store(OutputStream out, String prefix) throws IOException {
StringBuffer buf = new StringBuffer(2048); StringBuffer buf = new StringBuffer(16*1048);
PersistenceHelper.add(buf, prefix, ".period", "Number of milliseconds in the period", _period); PersistenceHelper.add(buf, prefix, ".period", "Number of milliseconds in the period", _period);
PersistenceHelper.add(buf, prefix, ".creationDate", PersistenceHelper.add(buf, prefix, ".creationDate",
"When was this rate created? (milliseconds since the epoch, GMT)", _creationDate); "When was this rate created? (milliseconds since the epoch, GMT)", _creationDate);

View File

@ -76,7 +76,7 @@ public class RateStat {
private final static String NL = System.getProperty("line.separator"); private final static String NL = System.getProperty("line.separator");
public String toString() { public String toString() {
StringBuffer buf = new StringBuffer(512); StringBuffer buf = new StringBuffer(4096);
buf.append(getGroupName()).append('.').append(getName()).append(": ").append(getDescription()).append('\n'); buf.append(getGroupName()).append('.').append(getName()).append(": ").append(getDescription()).append('\n');
long periods[] = getPeriods(); long periods[] = getPeriods();
Arrays.sort(periods); Arrays.sort(periods);
@ -103,7 +103,7 @@ public class RateStat {
} }
public void store(OutputStream out, String prefix) throws IOException { public void store(OutputStream out, String prefix) throws IOException {
StringBuffer buf = new StringBuffer(128); StringBuffer buf = new StringBuffer(1024);
buf.append(NL); buf.append(NL);
buf.append("################################################################################").append(NL); buf.append("################################################################################").append(NL);
buf.append("# Rate: ").append(_groupName).append(": ").append(_statName).append(NL); buf.append("# Rate: ").append(_groupName).append(": ").append(_statName).append(NL);
@ -112,7 +112,7 @@ public class RateStat {
out.write(buf.toString().getBytes()); out.write(buf.toString().getBytes());
buf = null; buf = null;
for (int i = 0; i < _rates.length; i++) { for (int i = 0; i < _rates.length; i++) {
StringBuffer rbuf = new StringBuffer(256); StringBuffer rbuf = new StringBuffer(1024);
rbuf.append("#######").append(NL); rbuf.append("#######").append(NL);
rbuf.append("# Period : ").append(DataHelper.formatDuration(_rates[i].getPeriod())).append(" for rate ") rbuf.append("# Period : ").append(DataHelper.formatDuration(_rates[i].getPeriod())).append(" for rate ")
.append(_groupName).append(" - ").append(_statName).append(NL); .append(_groupName).append(" - ").append(_statName).append(NL);

View File

@ -80,18 +80,20 @@ public class StatManager {
} }
public void coallesceStats() { public void coallesceStats() {
for (Iterator iter = getFrequencyNames().iterator(); iter.hasNext();) { synchronized (_frequencyStats) {
String name = (String) iter.next(); for (Iterator iter = _frequencyStats.values().iterator(); iter.hasNext();) {
FrequencyStat stat = getFrequency(name); FrequencyStat stat = (FrequencyStat)iter.next();
if (stat != null) { if (stat != null) {
stat.coallesceStats(); stat.coallesceStats();
}
} }
} }
for (Iterator iter = getRateNames().iterator(); iter.hasNext();) { synchronized (_rateStats) {
String name = (String) iter.next(); for (Iterator iter = _rateStats.values().iterator(); iter.hasNext();) {
RateStat stat = getRate(name); RateStat stat = (RateStat)iter.next();
if (stat != null) { if (stat != null) {
stat.coallesceStats(); stat.coallesceStats();
}
} }
} }
} }
@ -105,11 +107,11 @@ public class StatManager {
} }
public Set getFrequencyNames() { public Set getFrequencyNames() {
return Collections.unmodifiableSet(new HashSet(_frequencyStats.keySet())); return new HashSet(_frequencyStats.keySet());
} }
public Set getRateNames() { public Set getRateNames() {
return Collections.unmodifiableSet(new HashSet(_rateStats.keySet())); return new HashSet(_rateStats.keySet());
} }
/** is the given stat a monitored rate? */ /** is the given stat a monitored rate? */

View File

@ -18,7 +18,7 @@ import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.ArrayList;
import java.util.Map; import java.util.Map;
import java.util.HashMap; import java.util.HashMap;
@ -68,7 +68,7 @@ public class LogManager {
private String _location; private String _location;
private List _records; private List _records;
private Set _limits; private List _limits;
private Map _logs; private Map _logs;
private LogWriter _writer; private LogWriter _writer;
@ -88,7 +88,7 @@ public class LogManager {
public LogManager(I2PAppContext context) { public LogManager(I2PAppContext context) {
_displayOnScreen = true; _displayOnScreen = true;
_records = new ArrayList(); _records = new ArrayList();
_limits = new HashSet(); _limits = new ArrayList(128);
_logs = new HashMap(128); _logs = new HashMap(128);
_defaultLimit = Log.DEBUG; _defaultLimit = Log.DEBUG;
_configLastRead = 0; _configLastRead = 0;
@ -197,7 +197,6 @@ public class LogManager {
// //
private void loadConfig() { private void loadConfig() {
Properties p = new Properties();
File cfgFile = new File(_location); File cfgFile = new File(_location);
if ((_configLastRead > 0) && (_configLastRead >= cfgFile.lastModified())) { if ((_configLastRead > 0) && (_configLastRead >= cfgFile.lastModified())) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@ -207,6 +206,7 @@ public class LogManager {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Loading config from " + _location); _log.debug("Loading config from " + _location);
} }
Properties p = new Properties();
FileInputStream fis = null; FileInputStream fis = null;
try { try {
fis = new FileInputStream(cfgFile); fis = new FileInputStream(cfgFile);
@ -293,7 +293,8 @@ public class LogManager {
LogLimit lim = new LogLimit(name, Log.getLevel(val)); LogLimit lim = new LogLimit(name, Log.getLevel(val));
//_log.debug("Limit found for " + name + " as " + val); //_log.debug("Limit found for " + name + " as " + val);
synchronized (_limits) { synchronized (_limits) {
_limits.add(lim); if (!_limits.contains(lim))
_limits.add(lim);
} }
} }
} }
@ -366,10 +367,10 @@ public class LogManager {
} }
private List getLimits(Log log) { private List getLimits(Log log) {
ArrayList limits = new ArrayList(); ArrayList limits = new ArrayList(4);
synchronized (_limits) { synchronized (_limits) {
for (Iterator iter = _limits.iterator(); iter.hasNext();) { for (int i = 0; i < _limits.size(); i++) {
LogLimit limit = (LogLimit) iter.next(); LogLimit limit = (LogLimit)_limits.get(i);
if (limit.matches(log)) limits.add(limit); if (limit.matches(log)) limits.add(limit);
} }
} }
@ -395,6 +396,8 @@ public class LogManager {
List _removeAll() { List _removeAll() {
List vals = null; List vals = null;
synchronized (_records) { synchronized (_records) {
if (_records.size() <= 0)
return null;
vals = new ArrayList(_records); vals = new ArrayList(_records);
_records.clear(); _records.clear();
} }
@ -431,7 +434,7 @@ public class LogManager {
} }
public void shutdown() { public void shutdown() {
_log.log(Log.CRIT, "Shutting down logger", new Exception("Shutdown")); _log.log(Log.CRIT, "Shutting down logger");
_writer.flushRecords(); _writer.flushRecords();
} }

View File

@ -27,7 +27,7 @@ class LogRecordFormatter {
private final static int MAX_PRIORITY_LENGTH = 5; private final static int MAX_PRIORITY_LENGTH = 5;
public static String formatRecord(LogManager manager, LogRecord rec) { public static String formatRecord(LogManager manager, LogRecord rec) {
StringBuffer buf = new StringBuffer(); StringBuffer buf = new StringBuffer(1024);
char format[] = manager._getFormat(); char format[] = manager._getFormat();
for (int i = 0; i < format.length; ++i) { for (int i = 0; i < format.length; ++i) {
switch ((int) format[i]) { switch ((int) format[i]) {

View File

@ -53,6 +53,7 @@ class LogWriter implements Runnable {
public void flushRecords() { public void flushRecords() {
try { try {
List records = _manager._removeAll(); List records = _manager._removeAll();
if (records == null) return;
for (int i = 0; i < records.size(); i++) { for (int i = 0; i < records.size(); i++) {
LogRecord rec = (LogRecord) records.get(i); LogRecord rec = (LogRecord) records.get(i);
writeRecord(rec); writeRecord(rec);
@ -64,13 +65,10 @@ class LogWriter implements Runnable {
System.err.println("Error flushing the records"); System.err.println("Error flushing the records");
} }
} }
records.clear();
try {
Thread.sleep(30);
} catch (InterruptedException ie) {
}
} catch (Throwable t) { } catch (Throwable t) {
t.printStackTrace(); t.printStackTrace();
} finally {
try { Thread.sleep(100); } catch (InterruptedException ie) {}
} }
long now = Clock.getInstance().now(); long now = Clock.getInstance().now();
if (now - _lastReadConfig > CONFIG_READ_ITERVAL) { if (now - _lastReadConfig > CONFIG_READ_ITERVAL) {

View File

@ -89,7 +89,7 @@ public class TunnelMessage extends I2NPMessageImpl {
if ( (_tunnelId == null) || (_data == null) || (_data.length <= 0) ) if ( (_tunnelId == null) || (_data == null) || (_data.length <= 0) )
throw new I2NPMessageException("Not enough data to write out"); throw new I2NPMessageException("Not enough data to write out");
ByteArrayOutputStream os = new ByteArrayOutputStream(32); ByteArrayOutputStream os = new ByteArrayOutputStream(4096);
try { try {
_tunnelId.writeBytes(os); _tunnelId.writeBytes(os);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))

View File

@ -9,6 +9,7 @@ package net.i2p.router;
*/ */
import net.i2p.util.Clock; import net.i2p.util.Clock;
import net.i2p.util.Log;
/** /**
* Base implementation of a Job * Base implementation of a Job
*/ */
@ -39,7 +40,8 @@ public abstract class JobImpl implements Job {
} }
void addedToQueue() { void addedToQueue() {
_addedBy = new Exception(); if (_context.logManager().getLog(JobImpl.class).shouldLog(Log.DEBUG))
_addedBy = new Exception();
} }
public Exception getAddedBy() { return _addedBy; } public Exception getAddedBy() { return _addedBy; }

View File

@ -90,9 +90,6 @@ public class JobQueue {
private final static int DEFAULT_MAX_WAITING_JOBS = 20; private final static int DEFAULT_MAX_WAITING_JOBS = 20;
private final static String PROP_MAX_WAITING_JOBS = "router.maxWaitingJobs"; private final static String PROP_MAX_WAITING_JOBS = "router.maxWaitingJobs";
static {
}
/** /**
* queue runners wait on this whenever they're not doing anything, and * queue runners wait on this whenever they're not doing anything, and
* this gets notified *once* whenever there are ready jobs * this gets notified *once* whenever there are ready jobs
@ -229,7 +226,17 @@ public class JobQueue {
} }
public void allowParallelOperation() { _allowParallelOperation = true; } public void allowParallelOperation() { _allowParallelOperation = true; }
void shutdown() { _alive = false; } void shutdown() {
_alive = false;
StringBuffer buf = new StringBuffer(1024);
buf.append("jobs: \nready jobs: ").append(_readyJobs.size()).append("\n\t");
for (int i = 0; i < _readyJobs.size(); i++)
buf.append(_readyJobs.get(i).toString()).append("\n\t");
buf.append("\n\ntimed jobs: ").append(_timedJobs.size()).append("\n\t");
for (int i = 0; i < _timedJobs.size(); i++)
buf.append(_timedJobs.get(i).toString()).append("\n\t");
_log.log(Log.CRIT, buf.toString());
}
boolean isAlive() { return _alive; } boolean isAlive() { return _alive; }
/** /**
@ -276,7 +283,7 @@ public class JobQueue {
private int checkJobTimings() { private int checkJobTimings() {
boolean newJobsReady = false; boolean newJobsReady = false;
long now = _context.clock().now(); long now = _context.clock().now();
ArrayList toAdd = new ArrayList(4); ArrayList toAdd = null;
synchronized (_timedJobs) { synchronized (_timedJobs) {
for (int i = 0; i < _timedJobs.size(); i++) { for (int i = 0; i < _timedJobs.size(); i++) {
Job j = (Job)_timedJobs.get(i); Job j = (Job)_timedJobs.get(i);
@ -285,6 +292,7 @@ public class JobQueue {
if (j instanceof JobImpl) if (j instanceof JobImpl)
((JobImpl)j).madeReady(); ((JobImpl)j).madeReady();
if (toAdd == null) toAdd = new ArrayList(4);
toAdd.add(j); toAdd.add(j);
_timedJobs.remove(i); _timedJobs.remove(i);
i--; // so the index stays consistent i--; // so the index stays consistent
@ -294,7 +302,15 @@ public class JobQueue {
int ready = 0; int ready = 0;
synchronized (_readyJobs) { synchronized (_readyJobs) {
_readyJobs.addAll(toAdd); if (toAdd != null) {
// rather than addAll, which allocs a byte array rv before adding,
// we iterate, since toAdd is usually going to only be 1 or 2 entries
// and since readyJobs will often have the space, we can avoid the
// extra alloc. (no, i'm not just being insane - i'm updating this based
// on some profiling data ;)
for (int i = 0; i < toAdd.size(); i++)
_readyJobs.add(toAdd.get(i));
}
ready = _readyJobs.size(); ready = _readyJobs.size();
} }
@ -399,10 +415,38 @@ public class JobQueue {
public void offsetChanged(long delta) { public void offsetChanged(long delta) {
if (_lastLimitUpdated > 0) if (_lastLimitUpdated > 0)
_lastLimitUpdated += delta; _lastLimitUpdated += delta;
updateJobTimings(delta);
} }
} }
/**
* Update the clock data for all jobs in process or scheduled for
* completion.
*/
private void updateJobTimings(long delta) {
synchronized (_timedJobs) {
for (int i = 0; i < _timedJobs.size(); i++) {
Job j = (Job)_timedJobs.get(i);
j.getTiming().offsetChanged(delta);
}
}
synchronized (_readyJobs) {
for (int i = 0; i < _readyJobs.size(); i++) {
Job j = (Job)_readyJobs.get(i);
j.getTiming().offsetChanged(delta);
}
}
synchronized (_runnerLock) {
for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext(); ) {
JobQueueRunner runner = (JobQueueRunner)iter.next();
Job job = runner.getCurrentJob();
if (job != null)
job.getTiming().offsetChanged(delta);
}
}
}
/** /**
* calculate and update the job timings * calculate and update the job timings
* if it was lagged too much or took too long to run, spit out * if it was lagged too much or took too long to run, spit out

View File

@ -34,12 +34,14 @@ class JobQueueRunner implements Runnable {
public void stopRunning() { _keepRunning = false; } public void stopRunning() { _keepRunning = false; }
public void run() { public void run() {
long lastActive = _context.clock().now(); long lastActive = _context.clock().now();
long jobNum = 0;
while ( (_keepRunning) && (_context.jobQueue().isAlive()) ) { while ( (_keepRunning) && (_context.jobQueue().isAlive()) ) {
try { try {
Job job = _context.jobQueue().getNext(); Job job = _context.jobQueue().getNext();
if (job == null) { if (job == null) {
if (_log.shouldLog(Log.ERROR)) if (_context.router().isAlive())
_log.error("getNext returned null - dead?"); if (_log.shouldLog(Log.ERROR))
_log.error("getNext returned null - dead?");
continue; continue;
} }
long now = _context.clock().now(); long now = _context.clock().now();
@ -85,13 +87,18 @@ class JobQueueRunner implements Runnable {
lastActive = _context.clock().now(); lastActive = _context.clock().now();
_lastJob = _currentJob; _lastJob = _currentJob;
_currentJob = null; _currentJob = null;
jobNum++;
if ( (jobNum % 10) == 0)
System.gc();
} catch (Throwable t) { } catch (Throwable t) {
if (_log.shouldLog(Log.CRIT)) if (_log.shouldLog(Log.CRIT))
_log.log(Log.CRIT, "WTF, error running?", t); _log.log(Log.CRIT, "WTF, error running?", t);
} }
} }
if (_log.shouldLog(Log.CRIT)) if (_context.router().isAlive())
_log.log(Log.CRIT, "Queue runner " + _id + " exiting"); if (_log.shouldLog(Log.CRIT))
_log.log(Log.CRIT, "Queue runner " + _id + " exiting");
_context.jobQueue().removeRunner(_id); _context.jobQueue().removeRunner(_id);
} }
@ -102,6 +109,7 @@ class JobQueueRunner implements Runnable {
try { try {
if (_log.shouldLog(Log.CRIT)) if (_log.shouldLog(Log.CRIT))
_log.log(Log.CRIT, "Router ran out of memory, shutting down", oom); _log.log(Log.CRIT, "Router ran out of memory, shutting down", oom);
_log.log(Log.CRIT, _currentJob.getClass().getName());
_context.router().shutdown(); _context.router().shutdown();
} catch (Throwable t) { } catch (Throwable t) {
System.err.println("***Router ran out of memory, shutting down hard"); System.err.println("***Router ran out of memory, shutting down hard");

View File

@ -25,7 +25,7 @@ public class JobTiming implements Clock.ClockUpdateListener {
_start = context.clock().now(); _start = context.clock().now();
_actualStart = 0; _actualStart = 0;
_actualEnd = 0; _actualEnd = 0;
context.clock().addUpdateListener(this); //context.clock().addUpdateListener(this);
} }
/** /**

View File

@ -5,7 +5,7 @@ import java.io.IOException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Date; import java.util.Date;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.TimeZone; import java.util.TimeZone;
@ -94,7 +94,7 @@ public class MessageHistory {
_doLog = DEFAULT_KEEP_MESSAGE_HISTORY; _doLog = DEFAULT_KEEP_MESSAGE_HISTORY;
_historyFile = filename; _historyFile = filename;
_localIdent = getName(_context.routerHash()); _localIdent = getName(_context.routerHash());
_unwrittenEntries = new LinkedList(); _unwrittenEntries = new ArrayList(64);
updateSettings(); updateSettings();
addEntry(getPrefix() + "** Router initialized (started up or changed identities)"); addEntry(getPrefix() + "** Router initialized (started up or changed identities)");
_context.jobQueue().addJob(_writeJob); _context.jobQueue().addJob(_writeJob);
@ -338,7 +338,7 @@ public class MessageHistory {
*/ */
public void sendMessage(String messageType, long messageId, Date expiration, Hash peer, boolean sentOk) { public void sendMessage(String messageType, long messageId, Date expiration, Hash peer, boolean sentOk) {
if (!_doLog) return; if (!_doLog) return;
StringBuffer buf = new StringBuffer(128); StringBuffer buf = new StringBuffer(256);
buf.append(getPrefix()); buf.append(getPrefix());
buf.append("send [").append(messageType).append("] message [").append(messageId).append("] "); buf.append("send [").append(messageType).append("] message [").append(messageId).append("] ");
buf.append("to [").append(getName(peer)).append("] "); buf.append("to [").append(getName(peer)).append("] ");
@ -363,7 +363,7 @@ public class MessageHistory {
*/ */
public void receiveMessage(String messageType, long messageId, Date expiration, Hash from, boolean isValid) { public void receiveMessage(String messageType, long messageId, Date expiration, Hash from, boolean isValid) {
if (!_doLog) return; if (!_doLog) return;
StringBuffer buf = new StringBuffer(128); StringBuffer buf = new StringBuffer(256);
buf.append(getPrefix()); buf.append(getPrefix());
buf.append("receive [").append(messageType).append("] with id [").append(messageId).append("] "); buf.append("receive [").append(messageType).append("] with id [").append(messageId).append("] ");
if (from != null) if (from != null)
@ -473,7 +473,7 @@ public class MessageHistory {
if (_doPause) return; if (_doPause) return;
List entries = null; List entries = null;
synchronized (_unwrittenEntries) { synchronized (_unwrittenEntries) {
entries = new LinkedList(_unwrittenEntries); entries = new ArrayList(_unwrittenEntries);
_unwrittenEntries.clear(); _unwrittenEntries.clear();
} }
writeEntries(entries); writeEntries(entries);

View File

@ -35,7 +35,7 @@ public class MessageValidator {
public MessageValidator(RouterContext context) { public MessageValidator(RouterContext context) {
_log = context.logManager().getLog(MessageValidator.class); _log = context.logManager().getLog(MessageValidator.class);
_receivedIdExpirations = new TreeMap(); _receivedIdExpirations = new TreeMap();
_receivedIds = new HashSet(1024); _receivedIds = new HashSet(32*1024);
_receivedIdLock = new Object(); _receivedIdLock = new Object();
_context = context; _context = context;
} }
@ -130,4 +130,10 @@ public class MessageValidator {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Cleaned out " + toRemoveDates.size() + " expired messageIds, leaving " + _receivedIds.size() + " remaining"); _log.info("Cleaned out " + toRemoveDates.size() + " expired messageIds, leaving " + _receivedIds.size() + " remaining");
} }
void shutdown() {
StringBuffer buf = new StringBuffer(1024);
buf.append("Validated messages: ").append(_receivedIds.size());
_log.log(Log.CRIT, buf.toString());
}
} }

View File

@ -115,7 +115,7 @@ public class OutNetMessage {
public long getMessageSize() { public long getMessageSize() {
if (_messageSize <= 0) { if (_messageSize <= 0) {
try { try {
ByteArrayOutputStream baos = new ByteArrayOutputStream(4096); // large enough to hold most messages ByteArrayOutputStream baos = new ByteArrayOutputStream(2048); // large enough to hold most messages
_message.writeBytes(baos); _message.writeBytes(baos);
long sz = baos.size(); long sz = baos.size();
baos.reset(); baos.reset();
@ -136,7 +136,7 @@ public class OutNetMessage {
ByteArrayOutputStream baos = new ByteArrayOutputStream(4096); // large enough to hold most messages ByteArrayOutputStream baos = new ByteArrayOutputStream(4096); // large enough to hold most messages
_message.writeBytes(baos); _message.writeBytes(baos);
byte data[] = baos.toByteArray(); byte data[] = baos.toByteArray();
baos.reset(); _messageSize = data.length;
return data; return data;
} catch (DataFormatException dfe) { } catch (DataFormatException dfe) {
_log.error("Error serializing the I2NPMessage for the OutNetMessage", dfe); _log.error("Error serializing the I2NPMessage for the OutNetMessage", dfe);

View File

@ -392,6 +392,8 @@ public class Router {
try { _context.netDb().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the networkDb", t); } try { _context.netDb().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the networkDb", t); }
try { _context.commSystem().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the comm system", t); } try { _context.commSystem().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the comm system", t); }
try { _context.peerManager().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the peer manager", t); } try { _context.peerManager().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the peer manager", t); }
try { _context.messageRegistry().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the message registry", t); }
try { _context.messageValidator().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the message validator", t); }
try { _sessionKeyPersistenceHelper.shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the session key manager", t); } try { _sessionKeyPersistenceHelper.shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the session key manager", t); }
dumpStats(); dumpStats();
_log.log(Log.CRIT, "Shutdown complete", new Exception("Shutdown")); _log.log(Log.CRIT, "Shutdown complete", new Exception("Shutdown"));
@ -413,7 +415,7 @@ public class Router {
private class ShutdownHook extends Thread { private class ShutdownHook extends Thread {
public void run() { public void run() {
_log.log(Log.CRIT, "Shutting down the router...", new Exception("Shutting down")); _log.log(Log.CRIT, "Shutting down the router...");
shutdown(); shutdown();
} }
} }

View File

@ -162,7 +162,7 @@ public class StatisticsManager implements Service {
} }
private static String renderRate(Rate rate, boolean fudgeQuantity) { private static String renderRate(Rate rate, boolean fudgeQuantity) {
StringBuffer buf = new StringBuffer(255); StringBuffer buf = new StringBuffer(128);
buf.append(num(rate.getAverageValue())).append(';'); buf.append(num(rate.getAverageValue())).append(';');
buf.append(num(rate.getExtremeAverageValue())).append(';'); buf.append(num(rate.getExtremeAverageValue())).append(';');
buf.append(pct(rate.getPercentageOfLifetimeValue())).append(';'); buf.append(pct(rate.getPercentageOfLifetimeValue())).append(';');

View File

@ -109,8 +109,9 @@ public class ClientConnectionRunner {
/** die a horrible death */ /** die a horrible death */
void stopRunning() { void stopRunning() {
if (_dead) return; if (_dead) return;
_log.error("Stop the I2CP connection! current leaseSet: " if (_context.router().isAlive())
+ _currentLeaseSet, new Exception("Stop client connection")); _log.error("Stop the I2CP connection! current leaseSet: "
+ _currentLeaseSet, new Exception("Stop client connection"));
_dead = true; _dead = true;
// we need these keys to unpublish the leaseSet // we need these keys to unpublish the leaseSet
if (_reader != null) _reader.stopReading(); if (_reader != null) _reader.stopReading();

View File

@ -73,28 +73,34 @@ public class ClientListenerRunner implements Runnable {
socket.close(); socket.close();
} }
} catch (IOException ioe) { } catch (IOException ioe) {
_log.error("Server error accepting", ioe); if (_context.router().isAlive())
_log.error("Server error accepting", ioe);
} catch (Throwable t) { } catch (Throwable t) {
_log.error("Fatal error running client listener - killing the thread!", t); if (_context.router().isAlive())
_log.error("Fatal error running client listener - killing the thread!", t);
return; return;
} }
} }
} catch (IOException ioe) { } catch (IOException ioe) {
_log.error("Error listening on port " + _port, ioe); if (_context.router().isAlive())
_log.error("Error listening on port " + _port, ioe);
} }
if (_socket != null) { if (_socket != null) {
try { _socket.close(); } catch (IOException ioe) {} try { _socket.close(); } catch (IOException ioe) {}
_socket = null; _socket = null;
} }
if (!_context.router().isAlive()) break;
_log.error("Error listening, waiting " + _nextFailDelay + "ms before we try again"); _log.error("Error listening, waiting " + _nextFailDelay + "ms before we try again");
try { Thread.sleep(_nextFailDelay); } catch (InterruptedException ie) {} try { Thread.sleep(_nextFailDelay); } catch (InterruptedException ie) {}
curDelay += _nextFailDelay; curDelay += _nextFailDelay;
_nextFailDelay *= 5; _nextFailDelay *= 5;
} }
_log.error("CANCELING I2CP LISTEN. delay = " + curDelay, new Exception("I2CP Listen cancelled!!!")); if (_context.router().isAlive())
_log.error("CANCELING I2CP LISTEN. delay = " + curDelay, new Exception("I2CP Listen cancelled!!!"));
_running = false; _running = false;
} }

View File

@ -75,11 +75,11 @@ public class ProfileOrganizer {
public ProfileOrganizer(RouterContext context) { public ProfileOrganizer(RouterContext context) {
_context = context; _context = context;
_log = context.logManager().getLog(ProfileOrganizer.class); _log = context.logManager().getLog(ProfileOrganizer.class);
_fastAndReliablePeers = new HashMap(64); _fastAndReliablePeers = new HashMap(16);
_reliablePeers = new HashMap(512); _reliablePeers = new HashMap(16);
_wellIntegratedPeers = new HashMap(256); _wellIntegratedPeers = new HashMap(16);
_notFailingPeers = new HashMap(1024); _notFailingPeers = new HashMap(16);
_failingPeers = new HashMap(4096); _failingPeers = new HashMap(16);
_strictReliabilityOrder = new TreeSet(new InverseReliabilityComparator()); _strictReliabilityOrder = new TreeSet(new InverseReliabilityComparator());
_thresholdSpeedValue = 0.0d; _thresholdSpeedValue = 0.0d;
_thresholdReliabilityValue = 0.0d; _thresholdReliabilityValue = 0.0d;
@ -466,11 +466,9 @@ public class ProfileOrganizer {
all.remove(_us); all.remove(_us);
howMany -= matches.size(); howMany -= matches.size();
Collections.shuffle(all, _random); Collections.shuffle(all, _random);
Set rv = new HashSet(howMany);
for (int i = 0; i < howMany && i < all.size(); i++) { for (int i = 0; i < howMany && i < all.size(); i++) {
rv.add(all.get(i)); matches.add(all.get(i));
} }
matches.addAll(rv);
} }
/** /**

View File

@ -26,6 +26,7 @@ import net.i2p.router.RouterContext;
*/ */
public class ReadConfigJob extends JobImpl { public class ReadConfigJob extends JobImpl {
private final static long DELAY = 30*1000; // reread every 30 seconds private final static long DELAY = 30*1000; // reread every 30 seconds
private long _lastRead = -1;
public ReadConfigJob(RouterContext ctx) { public ReadConfigJob(RouterContext ctx) {
super(ctx); super(ctx);
@ -33,11 +34,23 @@ public class ReadConfigJob extends JobImpl {
public String getName() { return "Read Router Configuration"; } public String getName() { return "Read Router Configuration"; }
public void runJob() { public void runJob() {
doRead(_context); if (shouldReread()) {
doRead(_context);
_lastRead = _context.clock().now();
}
getTiming().setStartAfter(_context.clock().now() + DELAY); getTiming().setStartAfter(_context.clock().now() + DELAY);
_context.jobQueue().addJob(this); _context.jobQueue().addJob(this);
} }
private boolean shouldReread() {
File configFile = new File(_context.router().getConfigFilename());
if (!configFile.exists()) return false;
if (configFile.lastModified() > _lastRead)
return true;
else
return false;
}
public static void doRead(RouterContext ctx) { public static void doRead(RouterContext ctx) {
Router r = ctx.router(); Router r = ctx.router();
String f = r.getConfigFilename(); String f = r.getConfigFilename();

View File

@ -41,6 +41,15 @@ public class OutboundMessageRegistry {
_context.jobQueue().addJob(new CleanupPendingMessagesJob()); _context.jobQueue().addJob(new CleanupPendingMessagesJob());
} }
public void shutdown() {
StringBuffer buf = new StringBuffer(1024);
buf.append("Pending messages: ").append(_pendingMessages.size()).append("\n");
for (Iterator iter = _pendingMessages.values().iterator(); iter.hasNext(); ) {
buf.append(iter.next().toString()).append("\n\t");
}
_log.log(Log.CRIT, buf.toString());
}
public List getOriginalMessages(I2NPMessage message) { public List getOriginalMessages(I2NPMessage message) {
HashSet matches = new HashSet(4); HashSet matches = new HashSet(4);
long beforeSync = _context.clock().now(); long beforeSync = _context.clock().now();

View File

@ -61,8 +61,9 @@ public class VMCommSystem extends CommSystemFacade {
} else { } else {
_context.jobQueue().addJob(msg.getOnSendJob()); _context.jobQueue().addJob(msg.getOnSendJob());
_context.profileManager().messageSent(msg.getTarget().getIdentity().getHash(), "vm", sendTime, msg.getMessageSize()); _context.profileManager().messageSent(msg.getTarget().getIdentity().getHash(), "vm", sendTime, msg.getMessageSize());
_context.statManager().addRateData("transport.sendMessageSize", msg.getMessageSize(), sendTime); byte data[] = msg.getMessageData();
peerSys.receive(msg.getMessage().toByteArray(), _context.routerHash()); _context.statManager().addRateData("transport.sendMessageSize", data.length, sendTime);
peerSys.receive(data, _context.routerHash());
//_context.jobQueue().addJob(new SendJob(peerSys, msg.getMessage(), _context)); //_context.jobQueue().addJob(new SendJob(peerSys, msg.getMessage(), _context));
sendSuccessful = true; sendSuccessful = true;
} }

View File

@ -568,7 +568,8 @@ class TunnelPool {
public void shutdown() { public void shutdown() {
if (_log.shouldLog(Log.INFO)) _log.info("Shutting down tunnel pool"); if (_log.shouldLog(Log.INFO)) _log.info("Shutting down tunnel pool");
_persistenceHelper.writePool(this); if (_persistenceHelper != null)
_persistenceHelper.writePool(this);
_isLive = false; // the subjobs [should] check getIsLive() on each run _isLive = false; // the subjobs [should] check getIsLive() on each run
_outboundTunnels = null; _outboundTunnels = null;
_freeInboundTunnels = null; _freeInboundTunnels = null;