* refactored and revamped the capacity threshold calculation to take

into account various skew situations and the capacity growth constant with
the intent of producing a higher quality threshold whenever possible
* increased the minimum # of fast peers from 4 to 8 (yay), which means we'll
try to have at least some peers to choose from
* added a new router config option - "router.maxParticipatingTunnels".  This is
useful for gracefully shutting down the router (aka set it to 0 and wait until
the router is no longer participating in tunnels, then shutdown).  You can
probably also come up with other situations where this is useful, but I don't
want to spoil all the fun ;)
This commit is contained in:
jrandom
2004-08-23 03:54:55 +00:00
committed by zzz
parent 4564a6e8fd
commit 8e3e8ada32
3 changed files with 183 additions and 40 deletions

View File

@ -29,6 +29,8 @@ class RouterThrottleImpl implements RouterThrottle {
*/
private static int THROTTLE_EVENT_LIMIT = 300;
private static final String PROP_MAX_TUNNELS = "router.maxParticipatingTunnels";
public RouterThrottleImpl(RouterContext context) {
_context = context;
_log = context.logManager().getLog(RouterThrottleImpl.class);
@ -38,6 +40,7 @@ class RouterThrottleImpl implements RouterThrottle {
_context.statManager().createRateStat("tunnel.bytesAllocatedAtAccept", "How many bytes had been 'allocated' for participating tunnels when we accepted a request?", "Tunnels", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("router.throttleTunnelProcessingTime1m", "How long it takes to process a message (1 minute average) when we throttle a tunnel?", "Throttle", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("router.throttleTunnelProcessingTime10m", "How long it takes to process a message (10 minute average) when we throttle a tunnel?", "Throttle", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("router.throttleTunnelMaxExceeded", "How many tunnels we are participating in when we refuse one due to excees?", "Throttle", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
}
public boolean acceptNetworkMessage() {
@ -116,10 +119,28 @@ class RouterThrottleImpl implements RouterThrottle {
double bytesPerMsg = (r != null ? r.getAverageValue() : 0);
double bytesPerTunnel = msgsPerTunnel * bytesPerMsg;
int numTunnels = _context.tunnelManager().getParticipatingCount();
double bytesAllocated = (numTunnels + 1) * bytesPerTunnel;
// the max # tunnels throttle is useful for shutting down the router -
// set this to 0, wait a few minutes, and the router can be shut off
// without killing anyone's tunnels
String maxTunnels = _context.getProperty(PROP_MAX_TUNNELS);
if (maxTunnels != null) {
try {
int max = Integer.parseInt(maxTunnels);
if (numTunnels >= max) {
if (_log.shouldLog(Log.WARN))
_log.warn("Refusing tunnel request since we are already participating in "
+ numTunnels + " (our max is " + max + ")");
_context.statManager().addRateData("router.throttleTunnelMaxExceeded", numTunnels, 0);
return false;
}
} catch (NumberFormatException nfe) {
// no default, ignore it
}
}
_context.statManager().addRateData("tunnel.bytesAllocatedAtAccept", (long)bytesAllocated, msg.getTunnelDurationSeconds()*1000);
// todo: um, throttle (include bw usage of the netDb, our own tunnels, the clients,
// and check to see that they are less than the bandwidth limits

View File

@ -39,7 +39,7 @@ public class CapacityCalculator extends Calculator {
}
/** used to adjust each period so that we keep trying to expand the peer's capacity */
private static long GROWTH_FACTOR = 5;
static long GROWTH_FACTOR = 5;
/** the calculator estimates over a 1 hour period */
private static long ESTIMATE_PERIOD = 60*60*1000;

View File

@ -66,7 +66,15 @@ public class ProfileOrganizer {
*
*/
public static final String PROP_MINIMUM_FAST_PEERS = "profileOrganizer.minFastPeers";
public static final int DEFAULT_MINIMUM_FAST_PEERS = 4;
public static final int DEFAULT_MINIMUM_FAST_PEERS = 8;
/**
* Defines the minimum number of 'high capacity' peers that the organizer should
* select when using the mean - if less than this many are available, select the
* capacity by the median.
*
*/
public static final String PROP_MINIMUM_HIGH_CAPACITY_PEERS = "profileOrganizer.minHighCapacityPeers";
public static final int DEFAULT_MINIMUM_HIGH_CAPACITY_PEERS = 10;
/** synchronized against this lock when updating the tier that peers are located in (and when fetching them from a peer) */
private Object _reorganizeLock = new Object();
@ -118,12 +126,12 @@ public class ProfileOrganizer {
boolean rightBigger = rval > lval;
if (_log.shouldLog(Log.DEBUG))
_log.debug("The capacity of " + right.getPeer().toBase64()
+ " and " + left.getPeer().toBase64() + " marks " + (rightBigger ? "right" : "left")
+ " as larger: r=" + right.getCapacityValue()
+ " l="
+ left.getCapacityValue());
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("The capacity of " + right.getPeer().toBase64()
// + " and " + left.getPeer().toBase64() + " marks " + (rightBigger ? "right" : "left")
// + " as larger: r=" + right.getCapacityValue()
// + " l="
// + left.getCapacityValue());
if (rightBigger)
return 1;
@ -399,7 +407,13 @@ public class ProfileOrganizer {
}
_strictCapacityOrder = reordered;
calculateThresholds(allPeers);
locked_calculateThresholds(allPeers);
_failingPeers.clear();
_fastPeers.clear();
_highCapacityPeers.clear();
_notFailingPeers.clear();
_wellIntegratedPeers.clear();
for (Iterator iter = allPeers.iterator(); iter.hasNext(); ) {
PeerProfile profile = (PeerProfile)iter.next();
@ -501,13 +515,16 @@ public class ProfileOrganizer {
/**
* Update the thresholds based on the profiles in this set. currently
* implements the capacity threshold based on the median capacity (ignoring
* failing or inactive peers), using the median speed from that group to
* define the speed threshold, and use the mean integration value from the
* implements the capacity threshold based on the mean capacity of active
* and nonfailing peers (falling back on the median if that results in too
* few peers. We then use the median speed from that group to define the
* speed threshold, and use the mean integration value from the
* high capacity group to define the integration threshold.
*
*/
private void calculateThresholds(Set allPeers) {
private void locked_calculateThresholds(Set allPeers) {
double totalCapacity = 0;
double totalIntegration = 0;
Set reordered = new TreeSet(_comp);
for (Iterator iter = allPeers.iterator(); iter.hasNext(); ) {
PeerProfile profile = (PeerProfile)iter.next();
@ -518,37 +535,100 @@ public class ProfileOrganizer {
if (profile.getIsFailing() || (!profile.getIsActive()))
continue;
totalCapacity += profile.getCapacityValue();
totalIntegration += profile.getIntegrationValue();
reordered.add(profile);
}
locked_calculateCapacityThreshold(totalCapacity, reordered);
locked_calculateSpeedThreshold(reordered);
_thresholdIntegrationValue = 1.0d * avg(totalIntegration, reordered.size());
}
/**
* Update the _thresholdCapacityValue by using a few simple formulas run
* against the specified peers. Ideally, we set the threshold capacity to
* the mean, as long as that gives us enough peers and the mean is a "growth"
* value. We fall back on the capacity of the top K-th capacity, or the
* mean, or the base growth value, depending on various circumstances.
*
* @param reordered ordered set of PeerProfile objects, ordered by capacity
* (highest first) for active nonfailing peers
*/
private void locked_calculateCapacityThreshold(double totalCapacity, Set reordered) {
int numNotFailing = reordered.size();
// how many are in the "top half" of the high capacity peers?
int i = 0;
int threshold = 0;
if (numNotFailing > 0)
threshold = numNotFailing / 2;
for (Iterator iter = reordered.iterator(); iter.hasNext(); i++) {
double meanCapacity = avg(totalCapacity, numNotFailing);
long baseline = CapacityCalculator.GROWTH_FACTOR;
int minHighCapacityPeers = getMinimumHighCapacityPeers();
int numExceedingMean = 0;
int numExceedingBaseline = 0;
double thresholdAtMedian = 0;
double thresholdAtMinHighCap = 0;
int cur = 0;
for (Iterator iter = reordered.iterator(); iter.hasNext(); ) {
PeerProfile profile = (PeerProfile)iter.next();
if (i >= threshold) {
_thresholdCapacityValue = profile.getCapacityValue();
break;
double val = profile.getCapacityValue();
if (val > meanCapacity)
numExceedingMean++;
if (val > baseline)
numExceedingBaseline++;
if (cur == reordered.size()/2)
thresholdAtMedian = val;
if (cur == minHighCapacityPeers)
thresholdAtMinHighCap = val;
cur++;
}
if (meanCapacity > baseline) {
// our average is doing well (growing, not recovering from failures)
if (numExceedingMean > minHighCapacityPeers) {
if (_log.shouldLog(Log.INFO))
_log.info("Our average capacity is doing well [" + meanCapacity
+ "], and includes " + numExceedingMean);
_thresholdCapacityValue = meanCapacity;
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Our average capacity is doing well [" + meanCapacity
+ "], but it is skewed to only have " + numExceedingMean
+ " so falling back on the top few to " + thresholdAtMinHighCap);
_thresholdCapacityValue = thresholdAtMinHighCap;
}
} else {
// our average isn't doing well (its recovering from failures)
if (numExceedingBaseline > minHighCapacityPeers) {
if (_log.shouldLog(Log.INFO))
_log.info("Our average capacity isn't doing well [" + meanCapacity
+ "], but the baseline has " + numExceedingBaseline);
_thresholdCapacityValue = baseline+.0001;
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Our average capacity isn't doing well [" + meanCapacity
+ "], and the baseline has " + numExceedingBaseline
+ " so falling back on the median of " + thresholdAtMedian);
_thresholdCapacityValue = thresholdAtMedian;
}
}
}
/**
* Update the _thresholdSpeedValue by calculating the median speed of all
* high capacity peers.
*
* @param reordered ordered set of PeerProfile objects, ordered by capacity
* (highest first) for active nonfailing peers
*/
private void locked_calculateSpeedThreshold(Set reordered) {
Set speeds = new TreeSet();
int numActive = 0;
double totalIntegration = 0;
double totalSpeed = 0;
for (Iterator iter = reordered.iterator(); iter.hasNext(); i++) {
for (Iterator iter = reordered.iterator(); iter.hasNext(); ) {
PeerProfile profile = (PeerProfile)iter.next();
if (profile.getCapacityValue() >= _thresholdCapacityValue) {
if (profile.getIsActive()) {
numActive++;
if (profile.getIntegrationValue() > 0)
totalIntegration += profile.getIntegrationValue();
// duplicates being clobbered is fine by us
speeds.add(new Double(0-profile.getSpeedValue()));
}
// duplicates being clobbered is fine by us
speeds.add(new Double(0-profile.getSpeedValue()));
} else {
// its ordered
break;
@ -556,7 +636,7 @@ public class ProfileOrganizer {
}
// calc the median speed of high capacity peers
i = 0;
int i = 0;
for (Iterator iter = speeds.iterator(); iter.hasNext(); i++) {
Double speed = (Double)iter.next();
if (i >= (speeds.size() / 2)) {
@ -564,10 +644,8 @@ public class ProfileOrganizer {
break;
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Threshold value for speed: " + _thresholdSpeedValue + " with speeds: " + speeds);
_thresholdIntegrationValue = 1.0d * avg(totalIntegration, numActive);
if (_log.shouldLog(Log.INFO))
_log.info("Threshold value for speed: " + _thresholdSpeedValue + " out of speeds: " + speeds);
}
/** simple average, or 0 if NaN */
@ -808,6 +886,50 @@ public class ProfileOrganizer {
return DEFAULT_MINIMUM_FAST_PEERS;
}
/**
* Defines the minimum number of 'fast' peers that the organizer should select. If
* the profile calculators derive a threshold that does not select at least this many peers,
* the threshold will be overridden to make sure this many peers are in the fast+reliable group.
* This parameter should help deal with a lack of diversity in the tunnels created when some
* peers are particularly fast.
*
* @return minimum number of peers to be placed in the 'fast' group
*/
protected int getMinimumHighCapacityPeers() {
if (_context.router() != null) {
String val = _context.router().getConfigSetting(PROP_MINIMUM_HIGH_CAPACITY_PEERS);
if (val != null) {
try {
int rv = Integer.parseInt(val);
if (_log.shouldLog(Log.DEBUG))
_log.debug("router config said " + PROP_MINIMUM_HIGH_CAPACITY_PEERS + '=' + val);
return rv;
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Minimum high capacity peers improperly set in the router config [" + val + "]", nfe);
}
}
}
String val = _context.getProperty(PROP_MINIMUM_HIGH_CAPACITY_PEERS, ""+DEFAULT_MINIMUM_HIGH_CAPACITY_PEERS);
if (val != null) {
try {
int rv = Integer.parseInt(val);
if (_log.shouldLog(Log.DEBUG))
_log.debug("router context said " + PROP_MINIMUM_HIGH_CAPACITY_PEERS + '=' + val);
return rv;
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Minimum high capacity peers improperly set in the router environment [" + val + "]", nfe);
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("no config for " + PROP_MINIMUM_HIGH_CAPACITY_PEERS + ", using " + DEFAULT_MINIMUM_HIGH_CAPACITY_PEERS);
return DEFAULT_MINIMUM_HIGH_CAPACITY_PEERS;
}
private final static DecimalFormat _fmt = new DecimalFormat("###,##0.00", new DecimalFormatSymbols(Locale.UK));
private final static String num(double num) { synchronized (_fmt) { return _fmt.format(num); } }