2004-09-07 jrandom

* Write the native libraries to the current directory when they are loaded
      from a resource, and load them from that file on subsequent runs (in
      turn, we no longer *cough* delete the running libraries...)
    * Added support for a graceful restart.
    * Added new pseudo-shutdown hook specific to the router, allowing
      applications to request tasks to be run when the router shuts down.  We
      use this for integration with the service manager, since otherwise a
      graceful shutdown would cause a timeout, followed by a forced hard
      shutdown.
    * Handle a bug in the SimpleTimer with requeued tasks.
    * Made the capacity calculator a bit more dynamic by not outright ignoring
      the otherwise valid capacity data for a period with a single rejected
      tunnel (except for the 10 minute period).  In addition, peers with an
      equal capacity are ordered by speed rather than by their hashes.
    * Cleaned up the SimpleTimer, addressing some threading and synchronization
      issues.
    * When an I2PTunnel client or httpclient is explicitly closed, destroy the
      associated session (unless there are other clients using it), and deal
      with a closed session when starting a new I2PTunnel instance.
    * Refactoring and logging.
This commit is contained in:
jrandom
2004-09-07 07:17:02 +00:00
committed by zzz
parent e57aa68854
commit 6151d63eac
18 changed files with 611 additions and 394 deletions

View File

@ -59,6 +59,7 @@ public class Router {
private I2PThread.OOMEventListener _oomListener;
private ShutdownHook _shutdownHook;
private I2PThread _gracefulShutdownDetector;
private Set _shutdownTasks;
public final static String PROP_CONFIG_FILE = "router.configLocation";
@ -123,6 +124,8 @@ public class Router {
_gracefulShutdownDetector.setDaemon(true);
_gracefulShutdownDetector.setName("Graceful shutdown hook");
_gracefulShutdownDetector.start();
_shutdownTasks = new HashSet(0);
}
/**
@ -542,6 +545,12 @@ public class Router {
buf.setLength(0);
}
public void addShutdownTask(Runnable task) {
synchronized (_shutdownTasks) {
_shutdownTasks.add(task);
}
}
public static final int EXIT_GRACEFUL = 2;
public static final int EXIT_HARD = 3;
public static final int EXIT_OOM = 10;
@ -564,6 +573,14 @@ public class Router {
try { _sessionKeyPersistenceHelper.shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the session key manager", t); }
_context.listContexts().remove(_context);
dumpStats();
try {
for (Iterator iter = _shutdownTasks.iterator(); iter.hasNext(); ) {
Runnable task = (Runnable)iter.next();
task.run();
}
} catch (Throwable t) {
_log.log(Log.CRIT, "Error running shutdown task", t);
}
_log.log(Log.CRIT, "Shutdown(" + exitCode + ") complete", new Exception("Shutdown"));
try { _context.logManager().shutdown(); } catch (Throwable t) { }
File f = new File(getPingFile());

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.23 $ $Date: 2004/09/04 16:54:09 $";
public final static String ID = "$Revision: 1.24 $ $Date: 2004/09/06 00:21:26 $";
public final static String VERSION = "0.4";
public final static long BUILD = 6;
public final static long BUILD = 7;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -78,8 +78,6 @@ public class CapacityCalculator extends Calculator {
Rate curAccepted = acceptStat.getRate(period);
Rate curRejected = rejectStat.getRate(period);
Rate curFailed = failedStat.getRate(period);
if (curRejected.getCurrentEventCount() + curRejected.getLastEventCount() > 0)
return 0.0d;
long eventCount = 0;
if (curAccepted != null)
@ -91,6 +89,12 @@ public class CapacityCalculator extends Calculator {
failed = curFailed.getCurrentEventCount() + curFailed.getLastEventCount();
if (failed > 0)
val -= failed * stretch;
if ( (period == 10*60*1000) && (curRejected.getCurrentEventCount() + curRejected.getLastEventCount() > 0) )
return 0.0d;
else
val -= stretch * (curRejected.getCurrentEventCount() + curRejected.getLastEventCount());
if (val >= 0) {
return (val + GROWTH_FACTOR) * periodWeight(period);
} else {

View File

@ -0,0 +1,54 @@
package net.i2p.router.peermanager;
import java.util.Comparator;
import net.i2p.data.DataHelper;
/**
* Order profiles by their capacity, but backwards (highest capacity / value first).
*
*/
class InverseCapacityComparator implements Comparator {
/**
* Compare the two objects backwards. The standard comparator returns
* -1 if lhs is less than rhs, 1 if lhs is greater than rhs, or 0 if they're
* equal. To keep a strict ordering, we measure peers with equal capacity
* values according to their speed
*
* @return -1 if the right hand side is smaller, 1 if the left hand side is
* smaller, or 0 if they are the same peer (Comparator.compare() inverted)
*/
public int compare(Object lhs, Object rhs) {
if ( (lhs == null) || (rhs == null) || (!(lhs instanceof PeerProfile)) || (!(rhs instanceof PeerProfile)) )
throw new ClassCastException("Only profiles can be compared - lhs = " + lhs + " rhs = " + rhs);
PeerProfile left = (PeerProfile)lhs;
PeerProfile right= (PeerProfile)rhs;
double rval = right.getCapacityValue();
double lval = left.getCapacityValue();
if (lval == rval) {
rval = right.getSpeedValue();
lval = left.getSpeedValue();
if (lval == rval) {
// note the following call inverts right and left (see: classname)
return DataHelper.compareTo(right.getPeer().getData(), left.getPeer().getData());
} else {
// ok, fall through and compare based on speed, since the capacity is equal
}
}
boolean rightBigger = rval > lval;
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("The capacity of " + right.getPeer().toBase64()
// + " and " + left.getPeer().toBase64() + " marks " + (rightBigger ? "right" : "left")
// + " as larger: r=" + right.getCapacityValue()
// + " l="
// + left.getCapacityValue());
if (rightBigger)
return 1;
else
return -1;
}
}

View File

@ -99,49 +99,12 @@ public class ProfileOrganizer {
_persistenceHelper = new ProfilePersistenceHelper(_context);
}
/**
* Order profiles by their capacity, but backwards (highest capacity / value first).
*
*/
private final class InverseCapacityComparator implements Comparator {
/**
* Compare the two objects backwards. The standard comparator returns
* -1 if lhs is less than rhs, 1 if lhs is greater than rhs, or 0 if they're
* equal. To keep a strict ordering, we measure peers with equal capacity
* values according to their hashes
*
* @return -1 if the right hand side is smaller, 1 if the left hand side is
* smaller, or 0 if they are the same peer (Comparator.compare() inverted)
*/
public int compare(Object lhs, Object rhs) {
if ( (lhs == null) || (rhs == null) || (!(lhs instanceof PeerProfile)) || (!(rhs instanceof PeerProfile)) )
throw new ClassCastException("Only profiles can be compared - lhs = " + lhs + " rhs = " + rhs);
PeerProfile left = (PeerProfile)lhs;
PeerProfile right= (PeerProfile)rhs;
double rval = right.getCapacityValue();
double lval = left.getCapacityValue();
if (lval == rval) // note the following call inverts right and left (see: classname)
return DataHelper.compareTo(right.getPeer().getData(), left.getPeer().getData());
boolean rightBigger = rval > lval;
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("The capacity of " + right.getPeer().toBase64()
// + " and " + left.getPeer().toBase64() + " marks " + (rightBigger ? "right" : "left")
// + " as larger: r=" + right.getCapacityValue()
// + " l="
// + left.getCapacityValue());
if (rightBigger)
return 1;
else
return -1;
}
}
public void setUs(Hash us) { _us = us; }
Hash getUs() { return _us; }
public double getSpeedThreshold() { return _thresholdSpeedValue; }
public double getCapacityThreshold() { return _thresholdCapacityValue; }
public double getIntegrationThreshold() { return _thresholdIntegrationValue; }
/**
* Retrieve the profile for the given peer, if one exists (else null)
@ -207,7 +170,6 @@ public class ProfileOrganizer {
public boolean isHighCapacity(Hash peer) { synchronized (_reorganizeLock) { return _highCapacityPeers.containsKey(peer); } }
public boolean isWellIntegrated(Hash peer) { synchronized (_reorganizeLock) { return _wellIntegratedPeers.containsKey(peer); } }
public boolean isFailing(Hash peer) { synchronized (_reorganizeLock) { return _failingPeers.containsKey(peer); } }
/**
* if a peer sends us more than 5 replies in a searchReply that we cannot
@ -234,8 +196,17 @@ public class ProfileOrganizer {
}
return false;
}
public void exportProfile(Hash profile, OutputStream out) throws IOException {
PeerProfile prof = getProfile(profile);
if (prof != null)
_persistenceHelper.writeProfile(prof, out);
}
public void renderStatusHTML(OutputStream out) throws IOException {
ProfileOrganizerRenderer rend = new ProfileOrganizerRenderer(this, _context);
rend.renderStatusHTML(out);
}
/**
* Return a set of Hashes for peers that are both fast and reliable. If an insufficient
@ -422,12 +393,7 @@ public class ProfileOrganizer {
allPeers.addAll(_notFailingPeers.values());
allPeers.addAll(_highCapacityPeers.values());
allPeers.addAll(_fastPeers.values());
_failingPeers.clear();
_notFailingPeers.clear();
_highCapacityPeers.clear();
_fastPeers.clear();
Set reordered = new TreeSet(_comp);
for (Iterator iter = _strictCapacityOrder.iterator(); iter.hasNext(); ) {
PeerProfile prof = (PeerProfile)iter.next();
@ -534,9 +500,6 @@ public class ProfileOrganizer {
}
}
public double getSpeedThreshold() { return _thresholdSpeedValue; }
public double getCapacityThreshold() { return _thresholdCapacityValue; }
////////
// no more public stuff below
////////
@ -578,7 +541,6 @@ public class ProfileOrganizer {
_thresholdIntegrationValue = 1.0d * avg(totalIntegration, reordered.size());
}
/**
* Update the _thresholdCapacityValue by using a few simple formulas run
* against the specified peers. Ideally, we set the threshold capacity to
@ -775,116 +737,6 @@ public class ProfileOrganizer {
*/
private boolean shouldDrop(PeerProfile profile) { return false; }
public void exportProfile(Hash profile, OutputStream out) throws IOException {
PeerProfile prof = getProfile(profile);
if (prof != null)
_persistenceHelper.writeProfile(prof, out);
}
public void renderStatusHTML(OutputStream out) throws IOException {
Set peers = selectAllPeers();
long hideBefore = _context.clock().now() - 6*60*60*1000;
TreeMap order = new TreeMap();
for (Iterator iter = peers.iterator(); iter.hasNext();) {
Hash peer = (Hash)iter.next();
if (_us.equals(peer)) continue;
PeerProfile prof = getProfile(peer);
if (prof.getLastSendSuccessful() <= hideBefore) continue;
order.put(peer.toBase64(), prof);
}
int fast = 0;
int reliable = 0;
int integrated = 0;
int failing = 0;
StringBuffer buf = new StringBuffer(16*1024);
buf.append("<h2>Peer Profiles</h2>\n");
buf.append("<table border=\"1\">");
buf.append("<tr>");
buf.append("<td><b>Peer</b> (").append(order.size()).append(", hiding ").append(peers.size()-order.size()).append(")</td>");
buf.append("<td><b>Groups</b></td>");
buf.append("<td><b>Speed</b></td>");
buf.append("<td><b>Capacity</b></td>");
buf.append("<td><b>Integration</b></td>");
buf.append("<td><b>Failing?</b></td>");
buf.append("<td>&nbsp;</td>");
buf.append("</tr>");
for (Iterator iter = order.keySet().iterator(); iter.hasNext();) {
String name = (String)iter.next();
PeerProfile prof = (PeerProfile)order.get(name);
Hash peer = prof.getPeer();
buf.append("<tr>");
buf.append("<td><code>");
if (prof.getIsFailing()) {
buf.append("<font color=\"red\">--").append(peer.toBase64().substring(0,6)).append("</font>");
} else {
if (prof.getIsActive()) {
buf.append("<font color=\"blue\">++").append(peer.toBase64().substring(0,6)).append("</font>");
} else {
buf.append("__").append(peer.toBase64().substring(0,6));
}
}
buf.append("</code></td>");
buf.append("<td>");
int tier = 0;
boolean isIntegrated = false;
synchronized (_reorganizeLock) {
if (_fastPeers.containsKey(peer)) {
tier = 1;
fast++;
reliable++;
} else if (_highCapacityPeers.containsKey(peer)) {
tier = 2;
reliable++;
} else if (_notFailingPeers.containsKey(peer)) {
tier = 3;
} else {
failing++;
}
if (_wellIntegratedPeers.containsKey(peer)) {
isIntegrated = true;
integrated++;
}
}
switch (tier) {
case 1: buf.append("Fast"); break;
case 2: buf.append("High Capacity"); break;
case 3: buf.append("Not Failing"); break;
default: buf.append("Failing"); break;
}
if (isIntegrated) buf.append(", Integrated");
buf.append("<td align=\"right\">").append(num(prof.getSpeedValue())).append("</td>");
buf.append("<td align=\"right\">").append(num(prof.getCapacityValue())).append("</td>");
buf.append("<td align=\"right\">").append(num(prof.getIntegrationValue())).append("</td>");
buf.append("<td align=\"right\">").append(prof.getIsFailing()).append("</td>");
//buf.append("<td><a href=\"/profile/").append(prof.getPeer().toBase64().substring(0, 32)).append("\">profile.txt</a> ");
//buf.append(" <a href=\"#").append(prof.getPeer().toBase64().substring(0, 32)).append("\">netDb</a></td>");
buf.append("<td><a href=\"netdb.jsp#").append(peer.toBase64().substring(0,6)).append("\">netDb</a></td>\n");
buf.append("</tr>");
}
buf.append("</table>");
buf.append("<i>Definitions:<ul>");
buf.append("<li><b>speed</b>: how many round trip messages can we pump through the peer per minute?</li>");
buf.append("<li><b>capacity</b>: how many tunnels can we ask them to join in an hour?</li>");
buf.append("<li><b>integration</b>: how many new peers have they told us about lately?</li>");
buf.append("<li><b>failing?</b>: is the peer currently swamped (and if possible we should avoid nagging them)?</li>");
buf.append("</ul></i>");
buf.append("Red peers prefixed with '--' means the peer is failing, and blue peers prefixed ");
buf.append("with '++' means we've sent or received a message from them ");
buf.append("in the last five minutes</i><br />");
buf.append("<b>Thresholds:</b><br />");
buf.append("<b>Speed:</b> ").append(num(_thresholdSpeedValue)).append(" (").append(fast).append(" fast peers)<br />");
buf.append("<b>Capacity:</b> ").append(num(_thresholdCapacityValue)).append(" (").append(reliable).append(" high capacity peers)<br />");
buf.append("<b>Integration:</b> ").append(num(_thresholdIntegrationValue)).append(" (").append(integrated).append(" well integrated peers)<br />");
out.write(buf.toString().getBytes());
}
/**
* Defines the minimum number of 'fast' peers that the organizer should select. If
* the profile calculators derive a threshold that does not select at least this many peers,
@ -895,20 +747,6 @@ public class ProfileOrganizer {
* @return minimum number of peers to be placed in the 'fast' group
*/
protected int getMinimumFastPeers() {
if (_context.router() != null) {
String val = _context.router().getConfigSetting(PROP_MINIMUM_FAST_PEERS);
if (val != null) {
try {
int rv = Integer.parseInt(val);
if (_log.shouldLog(Log.DEBUG))
_log.debug("router config said " + PROP_MINIMUM_FAST_PEERS + '=' + val);
return rv;
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Minimum fast peers improperly set in the router config [" + val + "]", nfe);
}
}
}
String val = _context.getProperty(PROP_MINIMUM_FAST_PEERS, ""+DEFAULT_MINIMUM_FAST_PEERS);
if (val != null) {
try {
@ -938,20 +776,6 @@ public class ProfileOrganizer {
* @return minimum number of peers to be placed in the 'fast' group
*/
protected int getMinimumHighCapacityPeers() {
if (_context.router() != null) {
String val = _context.router().getConfigSetting(PROP_MINIMUM_HIGH_CAPACITY_PEERS);
if (val != null) {
try {
int rv = Integer.parseInt(val);
if (_log.shouldLog(Log.DEBUG))
_log.debug("router config said " + PROP_MINIMUM_HIGH_CAPACITY_PEERS + '=' + val);
return rv;
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Minimum high capacity peers improperly set in the router config [" + val + "]", nfe);
}
}
}
String val = _context.getProperty(PROP_MINIMUM_HIGH_CAPACITY_PEERS, ""+DEFAULT_MINIMUM_HIGH_CAPACITY_PEERS);
if (val != null) {
try {
@ -970,7 +794,6 @@ public class ProfileOrganizer {
return DEFAULT_MINIMUM_HIGH_CAPACITY_PEERS;
}
private final static DecimalFormat _fmt = new DecimalFormat("###,##0.00", new DecimalFormatSymbols(Locale.UK));
private final static String num(double num) { synchronized (_fmt) { return _fmt.format(num); } }

View File

@ -0,0 +1,134 @@
package net.i2p.router.peermanager;
import java.io.IOException;
import java.io.OutputStream;
import java.text.DecimalFormat;
import java.text.DecimalFormatSymbols;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import net.i2p.data.Hash;
import net.i2p.router.RouterContext;
/**
* Helper class to refactor the HTML rendering from out of the ProfileOrganizer
*
*/
class ProfileOrganizerRenderer {
private RouterContext _context;
private ProfileOrganizer _organizer;
public ProfileOrganizerRenderer(ProfileOrganizer organizer, RouterContext context) {
_context = context;
_organizer = organizer;
}
public void renderStatusHTML(OutputStream out) throws IOException {
Set peers = _organizer.selectAllPeers();
long hideBefore = _context.clock().now() - 3*60*60*1000;
TreeMap order = new TreeMap();
for (Iterator iter = peers.iterator(); iter.hasNext();) {
Hash peer = (Hash)iter.next();
if (_organizer.getUs().equals(peer)) continue;
PeerProfile prof = _organizer.getProfile(peer);
if (prof.getLastSendSuccessful() <= hideBefore) continue;
order.put(peer.toBase64(), prof);
}
int fast = 0;
int reliable = 0;
int integrated = 0;
int failing = 0;
StringBuffer buf = new StringBuffer(16*1024);
buf.append("<h2>Peer Profiles</h2>\n");
buf.append("<table border=\"1\">");
buf.append("<tr>");
buf.append("<td><b>Peer</b> (").append(order.size()).append(", hiding ").append(peers.size()-order.size()).append(")</td>");
buf.append("<td><b>Groups</b></td>");
buf.append("<td><b>Speed</b></td>");
buf.append("<td><b>Capacity</b></td>");
buf.append("<td><b>Integration</b></td>");
buf.append("<td><b>Failing?</b></td>");
buf.append("<td>&nbsp;</td>");
buf.append("</tr>");
for (Iterator iter = order.keySet().iterator(); iter.hasNext();) {
String name = (String)iter.next();
PeerProfile prof = (PeerProfile)order.get(name);
Hash peer = prof.getPeer();
buf.append("<tr>");
buf.append("<td><code>");
if (prof.getIsFailing()) {
buf.append("<font color=\"red\">--").append(peer.toBase64().substring(0,6)).append("</font>");
} else {
if (prof.getIsActive()) {
buf.append("<font color=\"blue\">++").append(peer.toBase64().substring(0,6)).append("</font>");
} else {
buf.append("__").append(peer.toBase64().substring(0,6));
}
}
buf.append("</code></td>");
buf.append("<td>");
int tier = 0;
boolean isIntegrated = false;
if (_organizer.isFast(peer)) {
tier = 1;
fast++;
reliable++;
} else if (_organizer.isHighCapacity(peer)) {
tier = 2;
reliable++;
} else if (_organizer.isFailing(peer)) {
failing++;
} else {
tier = 3;
}
if (_organizer.isWellIntegrated(peer)) {
isIntegrated = true;
integrated++;
}
switch (tier) {
case 1: buf.append("Fast"); break;
case 2: buf.append("High Capacity"); break;
case 3: buf.append("Not Failing"); break;
default: buf.append("Failing"); break;
}
if (isIntegrated) buf.append(", Integrated");
buf.append("<td align=\"right\">").append(num(prof.getSpeedValue())).append("</td>");
buf.append("<td align=\"right\">").append(num(prof.getCapacityValue())).append("</td>");
buf.append("<td align=\"right\">").append(num(prof.getIntegrationValue())).append("</td>");
buf.append("<td align=\"right\">").append(prof.getIsFailing()).append("</td>");
//buf.append("<td><a href=\"/profile/").append(prof.getPeer().toBase64().substring(0, 32)).append("\">profile.txt</a> ");
//buf.append(" <a href=\"#").append(prof.getPeer().toBase64().substring(0, 32)).append("\">netDb</a></td>");
buf.append("<td><a href=\"netdb.jsp#").append(peer.toBase64().substring(0,6)).append("\">netDb</a></td>\n");
buf.append("</tr>");
}
buf.append("</table>");
buf.append("<i>Definitions:<ul>");
buf.append("<li><b>speed</b>: how many round trip messages can we pump through the peer per minute?</li>");
buf.append("<li><b>capacity</b>: how many tunnels can we ask them to join in an hour?</li>");
buf.append("<li><b>integration</b>: how many new peers have they told us about lately?</li>");
buf.append("<li><b>failing?</b>: is the peer currently swamped (and if possible we should avoid nagging them)?</li>");
buf.append("</ul></i>");
buf.append("Red peers prefixed with '--' means the peer is failing, and blue peers prefixed ");
buf.append("with '++' means we've sent or received a message from them ");
buf.append("in the last five minutes</i><br />");
buf.append("<b>Thresholds:</b><br />");
buf.append("<b>Speed:</b> ").append(num(_organizer.getSpeedThreshold())).append(" (").append(fast).append(" fast peers)<br />");
buf.append("<b>Capacity:</b> ").append(num(_organizer.getCapacityThreshold())).append(" (").append(reliable).append(" high capacity peers)<br />");
buf.append("<b>Integration:</b> ").append(num(_organizer.getIntegrationThreshold())).append(" (").append(integrated).append(" well integrated peers)<br />");
out.write(buf.toString().getBytes());
}
private final static DecimalFormat _fmt = new DecimalFormat("###,##0.00", new DecimalFormatSymbols(Locale.UK));
private final static String num(double num) { synchronized (_fmt) { return _fmt.format(num); } }
}