* implement new 'capacity' concept, which replaces the old 'reliability'

one for peer selection and organization.  reliability is kept around
  for the moment and shown on the router console, but only to provide a
  comparison (it is not used in any way)
* new stat in the TunnelHistory: failRate
* coallesce TunnelHistory stats (duh!)
* new ProfileOrganizer CLI ("ProfileOrganizer[ filename]*"
* implement reasonable 'failure' logic - if they are actively rejecting
  tunnels or tunnels they've agreed to are failing, mark them as failing
* when choosing peers to test, exclude all fast ones
This commit is contained in:
jrandom
2004-07-20 03:27:34 +00:00
committed by zzz
parent ef0f1ca1e7
commit e8e8c37496
9 changed files with 377 additions and 208 deletions

View File

@ -0,0 +1,111 @@
package net.i2p.router.peermanager;
import net.i2p.router.RouterContext;
import net.i2p.stat.RateStat;
import net.i2p.stat.Rate;
import net.i2p.util.Log;
/**
* Estimate how many of our tunnels the peer can join per hour.
* Pseudocode:
* <pre>
* int growthFactor = 5;
* int capacity = 0;
* foreach i (10, 30, 60) {
* if (# tunnels rejected in last $i minutes > 0) continue;
* int val = (# tunnels joined in last $i minutes) * (60 / $i);
* val -= (# tunnels failed in last $i minutes) * (60 / $i);
* if (val &gt;= 0) // if we're failing lots of tunnels, dont grow
* capacity += ((val + growthFactor) * periodWeight($i));
* }
*
* periodWeight(int curWeight) {
* switch (curWeight) {
* case 10: return .6;
* case 30: return .3;
* case 60: return .1;
* }
* }
* </pre>
*
*/
public class CapacityCalculator extends Calculator {
private Log _log;
private RouterContext _context;
public CapacityCalculator(RouterContext context) {
_context = context;
_log = context.logManager().getLog(CapacityCalculator.class);
}
/** used to adjust each period so that we keep trying to expand the peer's capacity */
private static long GROWTH_FACTOR = 5;
/** the calculator estimates over a 1 hour period */
private static long ESTIMATE_PERIOD = 60*60*1000;
public double calc(PeerProfile profile) {
double capacity = 0;
RateStat acceptStat = profile.getTunnelCreateResponseTime();
RateStat rejectStat = profile.getTunnelHistory().getRejectionRate();
RateStat failedStat = profile.getTunnelHistory().getFailedRate();
capacity += estimatePartial(acceptStat, rejectStat, failedStat, 10*60*1000);
capacity += estimatePartial(acceptStat, rejectStat, failedStat, 30*60*1000);
capacity += estimatePartial(acceptStat, rejectStat, failedStat, 60*60*1000);
capacity += estimatePartial(acceptStat, rejectStat, failedStat, 24*60*60*1000);
if (tooOld(profile))
capacity = 1;
capacity += profile.getReliabilityBonus();
return capacity;
}
/**
* If we haven't heard from them in an hour, they aren't too useful.
*
*/
private boolean tooOld(PeerProfile profile) {
if (profile.getIsActive())
return false;
else
return true;
}
private double estimatePartial(RateStat acceptStat, RateStat rejectStat, RateStat failedStat, int period) {
Rate curAccepted = acceptStat.getRate(period);
Rate curRejected = rejectStat.getRate(period);
Rate curFailed = failedStat.getRate(period);
if (curRejected.getCurrentEventCount() + curRejected.getLastEventCount() > 0)
return 0.0d;
long eventCount = 0;
if (curAccepted != null)
eventCount = curAccepted.getCurrentEventCount() + curAccepted.getLastEventCount();
double stretch = ESTIMATE_PERIOD / period;
double val = eventCount * stretch;
long failed = 0;
if (curFailed != null)
failed = curFailed.getCurrentEventCount() + curFailed.getLastEventCount();
if (failed > 0)
val -= failed * stretch;
if (val >= 0) {
return (val + GROWTH_FACTOR) * periodWeight(period);
} else {
// failed too much, don't grow
return 0.0d;
}
}
private double periodWeight(int period) {
switch (period) {
case 10*60*1000: return .4;
case 30*60*1000: return .3;
case 60*60*1000: return .2;
case 24*60*60*1000: return .1;
default: throw new IllegalArgumentException("wtf, period [" + period + "]???");
}
}
}

View File

@ -1,6 +1,7 @@
package net.i2p.router.peermanager; package net.i2p.router.peermanager;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.stat.Rate;
import net.i2p.util.Log; import net.i2p.util.Log;
/** /**
@ -31,40 +32,33 @@ public class IsFailingCalculator extends Calculator {
public boolean calcBoolean(PeerProfile profile) { public boolean calcBoolean(PeerProfile profile) {
// have we failed in the last 119 seconds? // have we failed in the last 119 seconds?
if ( (profile.getCommError().getRate(60*1000).getCurrentEventCount() > 0) || if ( (profile.getCommError().getRate(60*1000).getCurrentEventCount() > 0) ||
(profile.getCommError().getRate(60*1000).getLastEventCount() > 0) ) { (profile.getCommError().getRate(60*1000).getLastEventCount() > 0) ||
(profile.getCommError().getRate(10*60*1000).getCurrentEventCount() > 0) ) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Peer " + profile.getPeer().toBase64() _log.debug("Peer " + profile.getPeer().toBase64()
+ " is failing because it had a comm error in the last 2 minutes"); + " is failing because it had a comm error recently ");
return true; return true;
} else { } else {
//if ( (profile.getDBHistory().getFailedLookupRate().getRate(60*1000).getCurrentEventCount() > 0) || //if ( (profile.getDBHistory().getFailedLookupRate().getRate(60*1000).getCurrentEventCount() > 0) ||
// (profile.getDBHistory().getFailedLookupRate().getRate(60*1000).getLastEventCount() > 0) ) { // (profile.getDBHistory().getFailedLookupRate().getRate(60*1000).getLastEventCount() > 0) ) {
// // are they overloaded (or disconnected)? // // are they overloaded (or disconnected)?
// return true; // return true;
//} //}
long recently = _context.clock().now() - GRACE_PERIOD; Rate rejectRate = profile.getTunnelHistory().getRejectionRate().getRate(10*60*1000);
if (rejectRate.getCurrentEventCount() >= 2) {
if (false && (profile.getTunnelHistory().getLastRejected() >= recently) ) {
// have they refused to participate in a tunnel in the last 5 minutes?
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Peer " + profile.getPeer().toBase64() _log.debug("Peer " + profile.getPeer().toBase64()
+ " is failing because it refused to participate in a tunnel in the last few minutes"); + " is failing because they rejected some tunnels recently");
return true; return true;
} }
if (false && (profile.getTunnelHistory().getLastFailed() >= recently) ) { Rate failedRate = profile.getTunnelHistory().getFailedRate().getRate(60*1000);
// has a tunnel they participate in failed in the last 5 minutes? if (failedRate.getCurrentEventCount() >= 2) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Peer " + profile.getPeer().toBase64() _log.debug("Peer " + profile.getPeer().toBase64()
+ " is failing because it one of their tunnels failed in the last few minutes"); + " is failing because too many of their tunnels failed recently");
return true;
}
if (false && profile.getLastSendFailed() >= recently) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Peer " + profile.getPeer().toBase64()
+ " is failing because we couldnt send to it recently");
return true; return true;
} }

View File

@ -84,16 +84,16 @@ class PeerManager {
case PeerSelectionCriteria.PURPOSE_TEST: case PeerSelectionCriteria.PURPOSE_TEST:
// for now, the peers we test will be the reliable ones // for now, the peers we test will be the reliable ones
//_organizer.selectWellIntegratedPeers(criteria.getMinimumRequired(), exclude, curVals); //_organizer.selectWellIntegratedPeers(criteria.getMinimumRequired(), exclude, curVals);
_organizer.selectReliablePeers(criteria.getMinimumRequired(), exclude, curVals); _organizer.selectHighCapacityPeers(criteria.getMinimumRequired(), exclude, curVals);
break; break;
case PeerSelectionCriteria.PURPOSE_TUNNEL: case PeerSelectionCriteria.PURPOSE_TUNNEL:
_organizer.selectFastAndReliablePeers(criteria.getMinimumRequired(), exclude, curVals); _organizer.selectFastPeers(criteria.getMinimumRequired(), exclude, curVals);
break; break;
case PeerSelectionCriteria.PURPOSE_SOURCE_ROUTE: case PeerSelectionCriteria.PURPOSE_SOURCE_ROUTE:
_organizer.selectReliablePeers(criteria.getMinimumRequired(), exclude, curVals); _organizer.selectHighCapacityPeers(criteria.getMinimumRequired(), exclude, curVals);
break; break;
case PeerSelectionCriteria.PURPOSE_GARLIC: case PeerSelectionCriteria.PURPOSE_GARLIC:
_organizer.selectReliablePeers(criteria.getMinimumRequired(), exclude, curVals); _organizer.selectHighCapacityPeers(criteria.getMinimumRequired(), exclude, curVals);
break; break;
default: default:
break; break;
@ -103,8 +103,8 @@ class PeerManager {
_log.warn("We ran out of peers when looking for reachable ones after finding " _log.warn("We ran out of peers when looking for reachable ones after finding "
+ rv.size() + " with " + rv.size() + " with "
+ _organizer.countWellIntegratedPeers() + "/" + _organizer.countWellIntegratedPeers() + "/"
+ _organizer.countReliablePeers() + "/" + _organizer.countHighCapacityPeers() + "/"
+ _organizer.countFastAndReliablePeers() + " integrated/reliable/fast peers"); + _organizer.countFastPeers() + " integrated/high capacity/fast peers");
break; break;
} else { } else {
for (Iterator iter = curVals.iterator(); iter.hasNext(); ) { for (Iterator iter = curVals.iterator(); iter.hasNext(); ) {

View File

@ -32,10 +32,12 @@ public class PeerProfile {
// calculation bonuses // calculation bonuses
private long _speedBonus; private long _speedBonus;
private long _reliabilityBonus; private long _reliabilityBonus;
private long _capacityBonus;
private long _integrationBonus; private long _integrationBonus;
// calculation values // calculation values
private double _speedValue; private double _speedValue;
private double _reliabilityValue; private double _reliabilityValue;
private double _capacityValue;
private double _integrationValue; private double _integrationValue;
private boolean _isFailing; private boolean _isFailing;
// good vs bad behavior // good vs bad behavior
@ -56,6 +58,7 @@ public class PeerProfile {
_expanded = false; _expanded = false;
_speedValue = 0; _speedValue = 0;
_reliabilityValue = 0; _reliabilityValue = 0;
_capacityValue = 0;
_integrationValue = 0; _integrationValue = 0;
_isFailing = false; _isFailing = false;
_peer = peer; _peer = peer;
@ -151,6 +154,14 @@ public class PeerProfile {
public long getReliabilityBonus() { return _reliabilityBonus; } public long getReliabilityBonus() { return _reliabilityBonus; }
public void setReliabilityBonus(long bonus) { _reliabilityBonus = bonus; } public void setReliabilityBonus(long bonus) { _reliabilityBonus = bonus; }
/**
* extra factor added to the capacity ranking - this can be updated in the profile
* written to disk to affect how the algorithm ranks capacity. Negative values are
* penalties
*/
public double getCapacityBonus() { return _capacityBonus; }
public void setCapacityBonus(long bonus) { _capacityBonus = bonus; }
/** /**
* extra factor added to the integration ranking - this can be updated in the profile * extra factor added to the integration ranking - this can be updated in the profile
* written to disk to affect how the algorithm ranks integration. Negative values are * written to disk to affect how the algorithm ranks integration. Negative values are
@ -173,6 +184,11 @@ public class PeerProfile {
* *
*/ */
public double getReliabilityValue() { return _reliabilityValue; } public double getReliabilityValue() { return _reliabilityValue; }
/**
* How many tunnels do we think this peer can handle over the next hour?
*
*/
public double getCapacityValue() { return _capacityValue; }
/** /**
* How well integrated into the network is this peer (as measured by how much they've * How well integrated into the network is this peer (as measured by how much they've
* told us that we didn't already know). Higher numbers means better integrated * told us that we didn't already know). Higher numbers means better integrated
@ -226,7 +242,7 @@ public class PeerProfile {
if (_tunnelTestResponseTime == null) if (_tunnelTestResponseTime == null)
_tunnelTestResponseTime = new RateStat("tunnelTestResponseTime", "how long it takes to successfully test a tunnel this peer participates in (in milliseconds)", "profile", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000 } ); _tunnelTestResponseTime = new RateStat("tunnelTestResponseTime", "how long it takes to successfully test a tunnel this peer participates in (in milliseconds)", "profile", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000 } );
if (_commError == null) if (_commError == null)
_commError = new RateStat("commErrorRate", "how long between communication errors with the peer (e.g. disconnection)", "profile", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000 } ); _commError = new RateStat("commErrorRate", "how long between communication errors with the peer (e.g. disconnection)", "profile", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000 } );
if (_dbIntroduction == null) if (_dbIntroduction == null)
_dbIntroduction = new RateStat("dbIntroduction", "how many new peers we get from dbSearchReplyMessages or dbStore messages", "profile", new long[] { 60*60*1000l, 24*60*60*1000l, 7*24*60*60*1000l }); _dbIntroduction = new RateStat("dbIntroduction", "how many new peers we get from dbSearchReplyMessages or dbStore messages", "profile", new long[] { 60*60*1000l, 24*60*60*1000l, 7*24*60*60*1000l });
@ -250,18 +266,21 @@ public class PeerProfile {
_tunnelCreateResponseTime.coallesceStats(); _tunnelCreateResponseTime.coallesceStats();
_tunnelTestResponseTime.coallesceStats(); _tunnelTestResponseTime.coallesceStats();
_dbHistory.coallesceStats(); _dbHistory.coallesceStats();
_tunnelHistory.coallesceStats();
_speedValue = calculateSpeed(); _speedValue = calculateSpeed();
_reliabilityValue = calculateReliability(); _reliabilityValue = calculateReliability();
_capacityValue = calculateCapacity();
_integrationValue = calculateIntegration(); _integrationValue = calculateIntegration();
_isFailing = calculateIsFailing(); _isFailing = calculateIsFailing();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Coallesced: speed [" + _speedValue + "] reliability [" + _reliabilityValue + "] integration [" + _integrationValue + "] failing? [" + _isFailing + "]"); _log.debug("Coallesced: speed [" + _speedValue + "] reliability [" + _reliabilityValue + "] capacity [" + _capacityValue + "] integration [" + _integrationValue + "] failing? [" + _isFailing + "]");
} }
private double calculateSpeed() { return _context.speedCalculator().calc(this); } private double calculateSpeed() { return _context.speedCalculator().calc(this); }
private double calculateReliability() { return _context.reliabilityCalculator().calc(this); } private double calculateReliability() { return _context.reliabilityCalculator().calc(this); }
private double calculateCapacity() { return _context.capacityCalculator().calc(this); }
private double calculateIntegration() { return _context.integrationCalculator().calc(this); } private double calculateIntegration() { return _context.integrationCalculator().calc(this); }
private boolean calculateIsFailing() { return _context.isFailingCalculator().calcBoolean(this); } private boolean calculateIsFailing() { return _context.isFailingCalculator().calcBoolean(this); }
void setIsFailing(boolean val) { _isFailing = val; } void setIsFailing(boolean val) { _isFailing = val; }
@ -314,6 +333,7 @@ public class PeerProfile {
buf.append("Peer " + profile.getPeer().toBase64() buf.append("Peer " + profile.getPeer().toBase64()
+ ":\t Speed:\t" + fmt.format(profile.calculateSpeed()) + ":\t Speed:\t" + fmt.format(profile.calculateSpeed())
+ " Reliability:\t" + fmt.format(profile.calculateReliability()) + " Reliability:\t" + fmt.format(profile.calculateReliability())
+ " Capacity:\t" + fmt.format(profile.calculateCapacity())
+ " Integration:\t" + fmt.format(profile.calculateIntegration()) + " Integration:\t" + fmt.format(profile.calculateIntegration())
+ " Active?\t" + profile.getIsActive() + " Active?\t" + profile.getIsActive()
+ " Failing?\t" + profile.calculateIsFailing() + " Failing?\t" + profile.calculateIsFailing()

View File

@ -230,6 +230,10 @@ public class PeerTestJob extends JobImpl {
return false; return false;
} }
} }
private boolean getShouldFailTunnels() { return true; }
/** /**
* Called when the peer's response is found * Called when the peer's response is found
*/ */
@ -256,22 +260,25 @@ public class PeerTestJob extends JobImpl {
+ _replyTunnel.getTunnelId().getTunnelId()); + _replyTunnel.getTunnelId().getTunnelId());
getContext().profileManager().dbLookupSuccessful(_peer.getIdentity().getHash(), responseTime); getContext().profileManager().dbLookupSuccessful(_peer.getIdentity().getHash(), responseTime);
_sendTunnel.setLastTested(getContext().clock().now()); // only honor success if we also honor failure
_replyTunnel.setLastTested(getContext().clock().now()); if (getShouldFailTunnels()) {
_sendTunnel.setLastTested(getContext().clock().now());
TunnelInfo cur = _replyTunnel; _replyTunnel.setLastTested(getContext().clock().now());
while (cur != null) {
Hash peer = cur.getThisHop(); TunnelInfo cur = _replyTunnel;
if ( (peer != null) && (!getContext().routerHash().equals(peer)) ) while (cur != null) {
getContext().profileManager().tunnelTestSucceeded(peer, responseTime); Hash peer = cur.getThisHop();
cur = cur.getNextHopInfo(); if ( (peer != null) && (!getContext().routerHash().equals(peer)) )
} getContext().profileManager().tunnelTestSucceeded(peer, responseTime);
cur = _sendTunnel; cur = cur.getNextHopInfo();
while (cur != null) { }
Hash peer = cur.getThisHop(); cur = _sendTunnel;
if ( (peer != null) && (!getContext().routerHash().equals(peer)) ) while (cur != null) {
getContext().profileManager().tunnelTestSucceeded(peer, responseTime); Hash peer = cur.getThisHop();
cur = cur.getNextHopInfo(); if ( (peer != null) && (!getContext().routerHash().equals(peer)) )
getContext().profileManager().tunnelTestSucceeded(peer, responseTime);
cur = cur.getNextHopInfo();
}
} }
} }
@ -294,7 +301,6 @@ public class PeerTestJob extends JobImpl {
_sendTunnel = sendTunnel; _sendTunnel = sendTunnel;
} }
public String getName() { return "Peer test failed"; } public String getName() { return "Peer test failed"; }
private boolean getShouldFailTunnels() { return true; }
private boolean getShouldFailPeer() { return true; } private boolean getShouldFailPeer() { return true; }
public void runJob() { public void runJob() {
if (getShouldFailPeer()) if (getShouldFailPeer())
@ -307,7 +313,6 @@ public class PeerTestJob extends JobImpl {
+ _replyTunnel.getTunnelId().getTunnelId()); + _replyTunnel.getTunnelId().getTunnelId());
if (getShouldFailTunnels()) { if (getShouldFailTunnels()) {
_sendTunnel.setLastTested(getContext().clock().now()); _sendTunnel.setLastTested(getContext().clock().now());
_replyTunnel.setLastTested(getContext().clock().now()); _replyTunnel.setLastTested(getContext().clock().now());

View File

@ -258,7 +258,7 @@ public class ProfileManagerImpl implements ProfileManager {
Set peers = new HashSet(numPeers); Set peers = new HashSet(numPeers);
// lets get the fastest ones we've got (this fails over to include just plain reliable, // lets get the fastest ones we've got (this fails over to include just plain reliable,
// or even notFailing peers if there aren't enough fast ones) // or even notFailing peers if there aren't enough fast ones)
_context.profileOrganizer().selectFastAndReliablePeers(numPeers, null, peers); _context.profileOrganizer().selectFastPeers(numPeers, null, peers);
Properties props = new Properties(); Properties props = new Properties();
for (Iterator iter = peers.iterator(); iter.hasNext(); ) { for (Iterator iter = peers.iterator(); iter.hasNext(); ) {
Hash peer = (Hash)iter.next(); Hash peer = (Hash)iter.next();
@ -268,10 +268,10 @@ public class ProfileManagerImpl implements ProfileManager {
StringBuffer buf = new StringBuffer(64); StringBuffer buf = new StringBuffer(64);
buf.append("status: "); buf.append("status: ");
if (_context.profileOrganizer().isFastAndReliable(peer)) { if (_context.profileOrganizer().isFast(peer)) {
buf.append("fastReliable"); buf.append("fast");
} else if (_context.profileOrganizer().isReliable(peer)) { } else if (_context.profileOrganizer().isHighCapacity(peer)) {
buf.append("reliable"); buf.append("highCapacity");
} else if (_context.profileOrganizer().isFailing(peer)) { } else if (_context.profileOrganizer().isFailing(peer)) {
buf.append("failing"); buf.append("failing");
} else { } else {
@ -283,7 +283,7 @@ public class ProfileManagerImpl implements ProfileManager {
else else
buf.append(" "); buf.append(" ");
buf.append("reliability: ").append(num(prof.getReliabilityValue())).append(" "); buf.append("capacity: ").append(num(prof.getCapacityValue())).append(" ");
buf.append("speed: ").append(num(prof.getSpeedValue())).append(" "); buf.append("speed: ").append(num(prof.getSpeedValue())).append(" ");
buf.append("integration: ").append(num(prof.getIntegrationValue())); buf.append("integration: ").append(num(prof.getIntegrationValue()));

View File

@ -32,10 +32,10 @@ import net.i2p.util.Log;
public class ProfileOrganizer { public class ProfileOrganizer {
private Log _log; private Log _log;
private RouterContext _context; private RouterContext _context;
/** H(routerIdentity) to PeerProfile for all peers that are fast and reliable */ /** H(routerIdentity) to PeerProfile for all peers that are fast and high capacity*/
private Map _fastAndReliablePeers; private Map _fastPeers;
/** H(routerIdentity) to PeerProfile for all peers that are reliable */ /** H(routerIdentity) to PeerProfile for all peers that have high capacities */
private Map _reliablePeers; private Map _highCapacityPeers;
/** H(routerIdentity) to PeerProfile for all peers that well integrated into the network and not failing horribly */ /** H(routerIdentity) to PeerProfile for all peers that well integrated into the network and not failing horribly */
private Map _wellIntegratedPeers; private Map _wellIntegratedPeers;
/** H(routerIdentity) to PeerProfile for all peers that are not failing horribly */ /** H(routerIdentity) to PeerProfile for all peers that are not failing horribly */
@ -46,27 +46,18 @@ public class ProfileOrganizer {
private Hash _us; private Hash _us;
private ProfilePersistenceHelper _persistenceHelper; private ProfilePersistenceHelper _persistenceHelper;
/** PeerProfile objects for all peers profiled, orderd by most reliable first */ /** PeerProfile objects for all peers profiled, orderd by the ones with the highest capacity first */
private Set _strictReliabilityOrder; private Set _strictCapacityOrder;
/** threshold speed value, seperating fast from slow */ /** threshold speed value, seperating fast from slow */
private double _thresholdSpeedValue; private double _thresholdSpeedValue;
/** threshold reliability value, seperating reliable from unreliable */ /** threshold reliability value, seperating reliable from unreliable */
private double _thresholdReliabilityValue; private double _thresholdCapacityValue;
/** integration value, seperating well integrated from not well integrated */ /** integration value, seperating well integrated from not well integrated */
private double _thresholdIntegrationValue; private double _thresholdIntegrationValue;
private InverseReliabilityComparator _calc; private InverseCapacityComparator _comp;
/**
* Defines what percentage of the average reliability will be used as the
* reliability threshold. For example, .5 means all peers with the reliability
* greater than half of the average will be considered "reliable".
*
*/
public static final String PROP_RELIABILITY_THRESHOLD_FACTOR = "profileOrganizer.reliabilityThresholdFactor";
public static final double DEFAULT_RELIABILITY_THRESHOLD_FACTOR = .5d;
/** /**
* Defines the minimum number of 'fast' peers that the organizer should select. See * Defines the minimum number of 'fast' peers that the organizer should select. See
* {@link ProfileOrganizer#getMinimumFastPeers} * {@link ProfileOrganizer#getMinimumFastPeers}
@ -84,28 +75,28 @@ public class ProfileOrganizer {
public ProfileOrganizer(RouterContext context) { public ProfileOrganizer(RouterContext context) {
_context = context; _context = context;
_log = context.logManager().getLog(ProfileOrganizer.class); _log = context.logManager().getLog(ProfileOrganizer.class);
_calc = new InverseReliabilityComparator(); _comp = new InverseCapacityComparator();
_fastAndReliablePeers = new HashMap(16); _fastPeers = new HashMap(16);
_reliablePeers = new HashMap(16); _highCapacityPeers = new HashMap(16);
_wellIntegratedPeers = new HashMap(16); _wellIntegratedPeers = new HashMap(16);
_notFailingPeers = new HashMap(16); _notFailingPeers = new HashMap(16);
_failingPeers = new HashMap(16); _failingPeers = new HashMap(16);
_strictReliabilityOrder = new TreeSet(_calc); _strictCapacityOrder = new TreeSet(_comp);
_thresholdSpeedValue = 0.0d; _thresholdSpeedValue = 0.0d;
_thresholdReliabilityValue = 0.0d; _thresholdCapacityValue = 0.0d;
_thresholdIntegrationValue = 0.0d; _thresholdIntegrationValue = 0.0d;
_persistenceHelper = new ProfilePersistenceHelper(_context); _persistenceHelper = new ProfilePersistenceHelper(_context);
} }
/** /**
* Order profiles by their reliability, but backwards (most reliable / highest value first). * Order profiles by their capacity, but backwards (highest capacity / value first).
* *
*/ */
private final class InverseReliabilityComparator implements Comparator { private final class InverseCapacityComparator implements Comparator {
/** /**
* Compare the two objects backwards. The standard comparator returns * Compare the two objects backwards. The standard comparator returns
* -1 if lhs is less than rhs, 1 if lhs is greater than rhs, or 0 if they're * -1 if lhs is less than rhs, 1 if lhs is greater than rhs, or 0 if they're
* equal. To keep a strict ordering, we measure peers with equal reliability * equal. To keep a strict ordering, we measure peers with equal capacity
* values according to their hashes * values according to their hashes
* *
* @return -1 if the right hand side is smaller, 1 if the left hand side is * @return -1 if the right hand side is smaller, 1 if the left hand side is
@ -117,8 +108,8 @@ public class ProfileOrganizer {
PeerProfile left = (PeerProfile)lhs; PeerProfile left = (PeerProfile)lhs;
PeerProfile right= (PeerProfile)rhs; PeerProfile right= (PeerProfile)rhs;
double rval = right.getReliabilityValue(); double rval = right.getCapacityValue();
double lval = left.getReliabilityValue(); double lval = left.getCapacityValue();
if (lval == rval) // note the following call inverts right and left (see: classname) if (lval == rval) // note the following call inverts right and left (see: classname)
return DataHelper.compareTo(right.getPeer().getData(), left.getPeer().getData()); return DataHelper.compareTo(right.getPeer().getData(), left.getPeer().getData());
@ -126,10 +117,11 @@ public class ProfileOrganizer {
boolean rightBigger = rval > lval; boolean rightBigger = rval > lval;
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("The reliability of " + right.getPeer().toBase64() _log.debug("The capacity of " + right.getPeer().toBase64()
+ " and " + left.getPeer().toBase64() + " marks " + (rightBigger ? "right" : "left") + " and " + left.getPeer().toBase64() + " marks " + (rightBigger ? "right" : "left")
+ " as larger: r=" + right.getReliabilityValue() + " l=" + " as larger: r=" + right.getCapacityValue()
+ left.getReliabilityValue()); + " l="
+ left.getCapacityValue());
if (rightBigger) if (rightBigger)
return 1; return 1;
@ -164,26 +156,26 @@ public class ProfileOrganizer {
PeerProfile old = locked_getProfile(profile.getPeer()); PeerProfile old = locked_getProfile(profile.getPeer());
profile.coallesceStats(); profile.coallesceStats();
locked_placeProfile(profile); locked_placeProfile(profile);
_strictReliabilityOrder.add(profile); _strictCapacityOrder.add(profile);
return old; return old;
} }
} }
public int countFastAndReliablePeers() { synchronized (_reorganizeLock) { return _fastAndReliablePeers.size(); } } public int countFastPeers() { synchronized (_reorganizeLock) { return _fastPeers.size(); } }
public int countReliablePeers() { synchronized (_reorganizeLock) { return _reliablePeers.size(); } } public int countHighCapacityPeers() { synchronized (_reorganizeLock) { return _highCapacityPeers.size(); } }
public int countWellIntegratedPeers() { synchronized (_reorganizeLock) { return _wellIntegratedPeers.size(); } } public int countWellIntegratedPeers() { synchronized (_reorganizeLock) { return _wellIntegratedPeers.size(); } }
public int countNotFailingPeers() { synchronized (_reorganizeLock) { return _notFailingPeers.size(); } } public int countNotFailingPeers() { synchronized (_reorganizeLock) { return _notFailingPeers.size(); } }
public int countFailingPeers() { synchronized (_reorganizeLock) { return _failingPeers.size(); } } public int countFailingPeers() { synchronized (_reorganizeLock) { return _failingPeers.size(); } }
public boolean isFastAndReliable(Hash peer) { synchronized (_reorganizeLock) { return _fastAndReliablePeers.containsKey(peer); } } public boolean isFast(Hash peer) { synchronized (_reorganizeLock) { return _fastPeers.containsKey(peer); } }
public boolean isReliable(Hash peer) { synchronized (_reorganizeLock) { return _reliablePeers.containsKey(peer); } } public boolean isHighCapacity(Hash peer) { synchronized (_reorganizeLock) { return _highCapacityPeers.containsKey(peer); } }
public boolean isWellIntegrated(Hash peer) { synchronized (_reorganizeLock) { return _wellIntegratedPeers.containsKey(peer); } } public boolean isWellIntegrated(Hash peer) { synchronized (_reorganizeLock) { return _wellIntegratedPeers.containsKey(peer); } }
public boolean isFailing(Hash peer) { synchronized (_reorganizeLock) { return _failingPeers.containsKey(peer); } } public boolean isFailing(Hash peer) { synchronized (_reorganizeLock) { return _failingPeers.containsKey(peer); } }
/** /**
* Return a set of Hashes for peers that are both fast and reliable. If an insufficient * Return a set of Hashes for peers that are both fast and reliable. If an insufficient
* number of peers are both fast and reliable, fall back onto reliable peers, and if reliable * number of peers are both fast and reliable, fall back onto high capacity peers, and if that
* peers doesn't contain sufficient peers, fall back onto not failing peers, and even THAT doesn't * doesn't contain sufficient peers, fall back onto not failing peers, and even THAT doesn't
* have sufficient peers, fall back onto failing peers. * have sufficient peers, fall back onto failing peers.
* *
* @param howMany how many peers are desired * @param howMany how many peers are desired
@ -191,22 +183,26 @@ public class ProfileOrganizer {
* @param matches set to store the return value in * @param matches set to store the return value in
* *
*/ */
public void selectFastAndReliablePeers(int howMany, Set exclude, Set matches) { public void selectFastPeers(int howMany, Set exclude, Set matches) {
synchronized (_reorganizeLock) { synchronized (_reorganizeLock) {
locked_selectPeers(_fastAndReliablePeers, howMany, exclude, matches); locked_selectPeers(_fastPeers, howMany, exclude, matches);
} }
if (matches.size() < howMany) if (matches.size() < howMany)
selectReliablePeers(howMany, exclude, matches); selectHighCapacityPeers(howMany, exclude, matches);
return; return;
} }
/** /**
* Return a set of Hashes for peers that are reliable. * Return a set of Hashes for peers that have a high capacity
* *
*/ */
public void selectReliablePeers(int howMany, Set exclude, Set matches) { public void selectHighCapacityPeers(int howMany, Set exclude, Set matches) {
synchronized (_reorganizeLock) { synchronized (_reorganizeLock) {
locked_selectPeers(_reliablePeers, howMany, exclude, matches); // we only use selectHighCapacityPeers when we are selecting for PURPOSE_TEST
// or we are falling back due to _fastPeers being too small, so we can always
// exclude the fast peers
exclude.addAll(_fastPeers.keySet());
locked_selectPeers(_highCapacityPeers, howMany, exclude, matches);
} }
if (matches.size() < howMany) if (matches.size() < howMany)
selectNotFailingPeers(howMany, exclude, matches); selectNotFailingPeers(howMany, exclude, matches);
@ -274,7 +270,7 @@ public class ProfileOrganizer {
int needed = howMany - orig; int needed = howMany - orig;
List selected = new ArrayList(needed); List selected = new ArrayList(needed);
synchronized (_reorganizeLock) { synchronized (_reorganizeLock) {
for (Iterator iter = _strictReliabilityOrder.iterator(); selected.size() < needed && iter.hasNext(); ) { for (Iterator iter = _strictCapacityOrder.iterator(); selected.size() < needed && iter.hasNext(); ) {
PeerProfile prof = (PeerProfile)iter.next(); PeerProfile prof = (PeerProfile)iter.next();
if (matches.contains(prof.getPeer()) || if (matches.contains(prof.getPeer()) ||
(exclude != null && exclude.contains(prof.getPeer())) || (exclude != null && exclude.contains(prof.getPeer())) ||
@ -309,11 +305,11 @@ public class ProfileOrganizer {
*/ */
public Set selectAllPeers() { public Set selectAllPeers() {
synchronized (_reorganizeLock) { synchronized (_reorganizeLock) {
Set allPeers = new HashSet(_failingPeers.size() + _notFailingPeers.size() + _reliablePeers.size() + _fastAndReliablePeers.size()); Set allPeers = new HashSet(_failingPeers.size() + _notFailingPeers.size() + _highCapacityPeers.size() + _fastPeers.size());
allPeers.addAll(_failingPeers.keySet()); allPeers.addAll(_failingPeers.keySet());
allPeers.addAll(_notFailingPeers.keySet()); allPeers.addAll(_notFailingPeers.keySet());
allPeers.addAll(_reliablePeers.keySet()); allPeers.addAll(_highCapacityPeers.keySet());
allPeers.addAll(_fastAndReliablePeers.keySet()); allPeers.addAll(_fastPeers.keySet());
return allPeers; return allPeers;
} }
} }
@ -326,23 +322,23 @@ public class ProfileOrganizer {
*/ */
public void reorganize() { public void reorganize() {
synchronized (_reorganizeLock) { synchronized (_reorganizeLock) {
Set allPeers = new HashSet(_failingPeers.size() + _notFailingPeers.size() + _reliablePeers.size() + _fastAndReliablePeers.size()); Set allPeers = new HashSet(_failingPeers.size() + _notFailingPeers.size() + _highCapacityPeers.size() + _fastPeers.size());
allPeers.addAll(_failingPeers.values()); allPeers.addAll(_failingPeers.values());
allPeers.addAll(_notFailingPeers.values()); allPeers.addAll(_notFailingPeers.values());
allPeers.addAll(_reliablePeers.values()); allPeers.addAll(_highCapacityPeers.values());
allPeers.addAll(_fastAndReliablePeers.values()); allPeers.addAll(_fastPeers.values());
_failingPeers.clear(); _failingPeers.clear();
_notFailingPeers.clear(); _notFailingPeers.clear();
_reliablePeers.clear(); _highCapacityPeers.clear();
_fastAndReliablePeers.clear(); _fastPeers.clear();
Set reordered = new TreeSet(_calc); Set reordered = new TreeSet(_comp);
for (Iterator iter = _strictReliabilityOrder.iterator(); iter.hasNext(); ) { for (Iterator iter = _strictCapacityOrder.iterator(); iter.hasNext(); ) {
PeerProfile prof = (PeerProfile)iter.next(); PeerProfile prof = (PeerProfile)iter.next();
reordered.add(prof); reordered.add(prof);
} }
_strictReliabilityOrder = reordered; _strictCapacityOrder = reordered;
calculateThresholds(allPeers); calculateThresholds(allPeers);
@ -355,14 +351,15 @@ public class ProfileOrganizer {
locked_promoteFastAsNecessary(); locked_promoteFastAsNecessary();
if (_log.shouldLog(Log.DEBUG)) { if (_log.shouldLog(Log.DEBUG)) {
_log.debug("Profiles reorganized. averages: [integration: " + _thresholdIntegrationValue + ", reliability: " + _thresholdReliabilityValue + ", speed: " + _thresholdSpeedValue + "]"); _log.debug("Profiles reorganized. averages: [integration: " + _thresholdIntegrationValue
+ ", capacity: " + _thresholdCapacityValue + ", speed: " + _thresholdSpeedValue + "]");
StringBuffer buf = new StringBuffer(512); StringBuffer buf = new StringBuffer(512);
for (Iterator iter = _strictReliabilityOrder.iterator(); iter.hasNext(); ) { for (Iterator iter = _strictCapacityOrder.iterator(); iter.hasNext(); ) {
PeerProfile prof = (PeerProfile)iter.next(); PeerProfile prof = (PeerProfile)iter.next();
buf.append('[').append(prof.toString()).append('=').append(prof.getReliabilityValue()).append("] "); buf.append('[').append(prof.toString()).append('=').append(prof.getCapacityValue()).append("] ");
} }
_log.debug("Strictly organized (most reliable first): " + buf.toString()); _log.debug("Strictly organized (highest capacity first): " + buf.toString());
_log.debug("fast and reliable: " + _fastAndReliablePeers.values()); _log.debug("fast: " + _fastPeers.values());
} }
} }
} }
@ -370,23 +367,23 @@ public class ProfileOrganizer {
/** /**
* As with locked_unfailAsNecessary, I'm not sure how much I like this - if there * As with locked_unfailAsNecessary, I'm not sure how much I like this - if there
* aren't enough fast peers, move some of the not-so-fast peers into the fast group. * aren't enough fast peers, move some of the not-so-fast peers into the fast group.
* This picks the not-so-fast peers based on reliability, not speed, and skips over any * This picks the not-so-fast peers based on capacity, not speed, and skips over any
* failing peers. Perhaps it should build a seperate strict ordering by speed? Nah, not * failing peers. Perhaps it should build a seperate strict ordering by speed? Nah, not
* worth the maintenance and memory overhead, at least not for now. * worth the maintenance and memory overhead, at least not for now.
* *
*/ */
private void locked_promoteFastAsNecessary() { private void locked_promoteFastAsNecessary() {
int minFastPeers = getMinimumFastPeers(); int minFastPeers = getMinimumFastPeers();
int numToPromote = minFastPeers - _fastAndReliablePeers.size(); int numToPromote = minFastPeers - _fastPeers.size();
if (numToPromote > 0) { if (numToPromote > 0) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Need to explicitly promote " + numToPromote + " peers to the fast+reliable group"); _log.debug("Need to explicitly promote " + numToPromote + " peers to the fast group");
for (Iterator iter = _strictReliabilityOrder.iterator(); iter.hasNext(); ) { for (Iterator iter = _strictCapacityOrder.iterator(); iter.hasNext(); ) {
PeerProfile cur = (PeerProfile)iter.next(); PeerProfile cur = (PeerProfile)iter.next();
if ( (!_fastAndReliablePeers.containsKey(cur.getPeer())) && (!cur.getIsFailing()) ) { if ( (!_fastPeers.containsKey(cur.getPeer())) && (!cur.getIsFailing()) ) {
_fastAndReliablePeers.put(cur.getPeer(), cur); _fastPeers.put(cur.getPeer(), cur);
// no need to remove it from any of the other groups, since if it is // no need to remove it from any of the other groups, since if it is
// fast and reliable, it is reliable, and it is not failing // fast, it has a high capacity, and it is not failing
numToPromote--; numToPromote--;
if (numToPromote <= 0) if (numToPromote <= 0)
break; break;
@ -421,7 +418,7 @@ public class ProfileOrganizer {
int needToUnfail = MIN_NOT_FAILING_ACTIVE - notFailingActive; int needToUnfail = MIN_NOT_FAILING_ACTIVE - notFailingActive;
if (needToUnfail > 0) { if (needToUnfail > 0) {
int unfailed = 0; int unfailed = 0;
for (Iterator iter = _strictReliabilityOrder.iterator(); iter.hasNext(); ) { for (Iterator iter = _strictCapacityOrder.iterator(); iter.hasNext(); ) {
PeerProfile best = (PeerProfile)iter.next(); PeerProfile best = (PeerProfile)iter.next();
if ( (best.getIsActive()) && (best.getIsFailing()) ) { if ( (best.getIsActive()) && (best.getIsFailing()) ) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
@ -436,19 +433,23 @@ public class ProfileOrganizer {
} }
} }
public double getSpeedThreshold() { return _thresholdSpeedValue; }
public double getCapacityThreshold() { return _thresholdCapacityValue; }
//////// ////////
// no more public stuff below // no more public stuff below
//////// ////////
/** /**
* Update the thresholds based on the profiles in this set. currently * Update the thresholds based on the profiles in this set. currently
* implements the reliability threshold based on the median reliability (ignoring * implements the capacity threshold based on the median capacity (ignoring
* failing peers) with integration and speed thresholds being derived from the average * failing or inactive peers), using the median speed from that group to
* of the active reliable peers. * define the speed threshold, and use the mean integration value from the
* high capacity group to define the integration threshold.
* *
*/ */
private void calculateThresholds(Set allPeers) { private void calculateThresholds(Set allPeers) {
Set reordered = new TreeSet(_calc); Set reordered = new TreeSet(_comp);
for (Iterator iter = allPeers.iterator(); iter.hasNext(); ) { for (Iterator iter = allPeers.iterator(); iter.hasNext(); ) {
PeerProfile profile = (PeerProfile)iter.next(); PeerProfile profile = (PeerProfile)iter.next();
@ -460,42 +461,54 @@ public class ProfileOrganizer {
reordered.add(profile); reordered.add(profile);
} }
int numNotFailing = reordered.size();
// how many are in the "top half" of the reliability peers?
int topCount = 0;
if (numNotFailing != 0)
topCount = (int)(numNotFailing / 2);
if (_log.shouldLog(Log.DEBUG))
_log.debug("top count is " + topCount + " out of " + numNotFailing);
int numNotFailing = reordered.size();
// how many are in the "top half" of the high capacity peers?
int i = 0;
int threshold = 0;
if (numNotFailing > 0)
threshold = numNotFailing / 2;
for (Iterator iter = reordered.iterator(); iter.hasNext(); i++) {
PeerProfile profile = (PeerProfile)iter.next();
if (i >= threshold) {
_thresholdCapacityValue = profile.getCapacityValue();
break;
}
}
Set speeds = new TreeSet();
int numActive = 0; int numActive = 0;
double totalIntegration = 0; double totalIntegration = 0;
double totalSpeed = 0; double totalSpeed = 0;
int i = 0;
for (Iterator iter = reordered.iterator(); iter.hasNext(); i++) { for (Iterator iter = reordered.iterator(); iter.hasNext(); i++) {
PeerProfile profile = (PeerProfile)iter.next(); PeerProfile profile = (PeerProfile)iter.next();
if (i < topCount) { if (profile.getCapacityValue() >= _thresholdCapacityValue) {
if (profile.getIsActive()) { if (profile.getIsActive()) {
numActive++; numActive++;
if (profile.getIntegrationValue() > 0) if (profile.getIntegrationValue() > 0)
totalIntegration += profile.getIntegrationValue(); totalIntegration += profile.getIntegrationValue();
if (profile.getSpeedValue() > 0) // duplicates being clobbered is fine by us
totalSpeed += profile.getSpeedValue(); speeds.add(new Double(0-profile.getSpeedValue()));
} }
} else if (i == topCount) {
if (profile.getReliabilityValue() < 0)
_thresholdReliabilityValue = 0;
else
_thresholdReliabilityValue = profile.getReliabilityValue();
break;
} else { } else {
// its ordered
break; break;
} }
} }
// calc the median speed of high capacity peers
i = 0;
for (Iterator iter = speeds.iterator(); iter.hasNext(); i++) {
Double speed = (Double)iter.next();
if (i >= (speeds.size() / 2)) {
_thresholdSpeedValue = 0-speed.doubleValue();
break;
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Threshold value for speed: " + _thresholdSpeedValue + " with speeds: " + speeds);
_thresholdIntegrationValue = 1.0d * avg(totalIntegration, numActive); _thresholdIntegrationValue = 1.0d * avg(totalIntegration, numActive);
_thresholdSpeedValue = 1.0d * avg(totalSpeed, numActive);
} }
/** simple average, or 0 if NaN */ /** simple average, or 0 if NaN */
@ -543,23 +556,23 @@ public class ProfileOrganizer {
if (profile.getIsFailing()) { if (profile.getIsFailing()) {
if (!shouldDrop(profile)) if (!shouldDrop(profile))
_failingPeers.put(profile.getPeer(), profile); _failingPeers.put(profile.getPeer(), profile);
_fastAndReliablePeers.remove(profile.getPeer()); _fastPeers.remove(profile.getPeer());
_reliablePeers.remove(profile.getPeer()); _highCapacityPeers.remove(profile.getPeer());
_wellIntegratedPeers.remove(profile.getPeer()); _wellIntegratedPeers.remove(profile.getPeer());
_notFailingPeers.remove(profile.getPeer()); _notFailingPeers.remove(profile.getPeer());
} else { } else {
_failingPeers.remove(profile.getPeer()); _failingPeers.remove(profile.getPeer());
_fastAndReliablePeers.remove(profile.getPeer()); _fastPeers.remove(profile.getPeer());
_reliablePeers.remove(profile.getPeer()); _highCapacityPeers.remove(profile.getPeer());
_wellIntegratedPeers.remove(profile.getPeer()); _wellIntegratedPeers.remove(profile.getPeer());
_notFailingPeers.put(profile.getPeer(), profile); _notFailingPeers.put(profile.getPeer(), profile);
if (_thresholdReliabilityValue <= profile.getReliabilityValue()) { if (_thresholdCapacityValue <= profile.getCapacityValue()) {
_reliablePeers.put(profile.getPeer(), profile); _highCapacityPeers.put(profile.getPeer(), profile);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Reliable: \t" + profile.getPeer().toBase64()); _log.debug("High capacity: \t" + profile.getPeer().toBase64());
if (_thresholdSpeedValue <= profile.getSpeedValue()) { if (_thresholdSpeedValue <= profile.getSpeedValue()) {
_fastAndReliablePeers.put(profile.getPeer(), profile); _fastPeers.put(profile.getPeer(), profile);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Fast: \t" + profile.getPeer().toBase64()); _log.debug("Fast: \t" + profile.getPeer().toBase64());
} }
@ -570,7 +583,7 @@ public class ProfileOrganizer {
_log.debug("Integrated: \t" + profile.getPeer().toBase64()); _log.debug("Integrated: \t" + profile.getPeer().toBase64());
} }
} else { } else {
// not reliable, but not failing (yet) // not high capacity, but not failing (yet)
} }
} }
} }
@ -615,9 +628,10 @@ public class ProfileOrganizer {
buf.append("<td><b>Peer</b> (").append(order.size()).append(", hiding ").append(peers.size()-order.size()).append(" inactive ones)</td>"); buf.append("<td><b>Peer</b> (").append(order.size()).append(", hiding ").append(peers.size()-order.size()).append(" inactive ones)</td>");
buf.append("<td><b>Groups</b></td>"); buf.append("<td><b>Groups</b></td>");
buf.append("<td><b>Speed</b></td>"); buf.append("<td><b>Speed</b></td>");
buf.append("<td><b>Reliability</b></td>"); buf.append("<td><b>Capacity</b></td>");
buf.append("<td><b>Integration</b></td>"); buf.append("<td><b>Integration</b></td>");
buf.append("<td><b>Failing?</b></td>"); buf.append("<td><b>Failing?</b></td>");
buf.append("<td><b>Reliability (deprecated)</b></td>");
buf.append("<td><b>Profile data</b></td>"); buf.append("<td><b>Profile data</b></td>");
buf.append("</tr>"); buf.append("</tr>");
for (Iterator iter = order.keySet().iterator(); iter.hasNext();) { for (Iterator iter = order.keySet().iterator(); iter.hasNext();) {
@ -641,11 +655,11 @@ public class ProfileOrganizer {
int tier = 0; int tier = 0;
boolean isIntegrated = false; boolean isIntegrated = false;
synchronized (_reorganizeLock) { synchronized (_reorganizeLock) {
if (_fastAndReliablePeers.containsKey(peer)) { if (_fastPeers.containsKey(peer)) {
tier = 1; tier = 1;
fast++; fast++;
reliable++; reliable++;
} else if (_reliablePeers.containsKey(peer)) { } else if (_highCapacityPeers.containsKey(peer)) {
tier = 2; tier = 2;
reliable++; reliable++;
} else if (_notFailingPeers.containsKey(peer)) { } else if (_notFailingPeers.containsKey(peer)) {
@ -661,76 +675,40 @@ public class ProfileOrganizer {
} }
switch (tier) { switch (tier) {
case 1: buf.append("Fast+Reliable"); break; case 1: buf.append("Fast+High Capacity"); break;
case 2: buf.append("Reliable"); break; case 2: buf.append("High Capacity"); break;
case 3: buf.append("Not Failing"); break; case 3: buf.append("Not Failing"); break;
default: buf.append("Failing"); break; default: buf.append("Failing"); break;
} }
if (isIntegrated) buf.append(", Well integrated"); if (isIntegrated) buf.append(", Well integrated");
buf.append("<td align=\"right\">").append(num(prof.getSpeedValue())).append("</td>"); buf.append("<td align=\"right\">").append(num(prof.getSpeedValue())).append("</td>");
buf.append("<td align=\"right\">").append(num(prof.getReliabilityValue())).append("</td>"); buf.append("<td align=\"right\">").append(num(prof.getCapacityValue())).append("</td>");
buf.append("<td align=\"right\">").append(num(prof.getIntegrationValue())).append("</td>"); buf.append("<td align=\"right\">").append(num(prof.getIntegrationValue())).append("</td>");
buf.append("<td align=\"right\">").append(prof.getIsFailing()).append("</td>"); buf.append("<td align=\"right\">").append(prof.getIsFailing()).append("</td>");
buf.append("<td align=\"right\">").append(num(prof.getReliabilityValue())).append("</td>");
buf.append("<td><a href=\"/profile/").append(prof.getPeer().toBase64().substring(0, 32)).append("\">profile.txt</a> "); buf.append("<td><a href=\"/profile/").append(prof.getPeer().toBase64().substring(0, 32)).append("\">profile.txt</a> ");
buf.append(" <a href=\"#").append(prof.getPeer().toBase64().substring(0, 32)).append("\">netDb</a></td>"); buf.append(" <a href=\"#").append(prof.getPeer().toBase64().substring(0, 32)).append("\">netDb</a></td>");
buf.append("</tr>"); buf.append("</tr>");
} }
buf.append("</table>"); buf.append("</table>");
buf.append("<i>Note that the speed, reliability, and integration values are relative"); buf.append("<i>Definitions:<ul>");
buf.append(" - they do NOT correspond with any particular throughput, latency, uptime, "); buf.append("<li><b>speed</b>: how many round trip messages can we pump through the peer per minute?</li>");
buf.append("or other metric. Higher numbers are better. "); buf.append("<li><b>capacity</b>: how many tunnels can we ask them to join in an hour?</li>");
buf.append("<li><b>integration</b>: how many new peers have they told us about lately?</li>");
buf.append("<li><b>failing?</b>: is the peer currently swamped (and if possible we should avoid nagging them)?</li>");
buf.append("<li><b>reliability</b>: no sound semantics... just a random kludge of a value.</li>");
buf.append("</ul></i>");
buf.append("Red peers prefixed with '--' means the peer is failing, and blue peers prefixed "); buf.append("Red peers prefixed with '--' means the peer is failing, and blue peers prefixed ");
buf.append("with '++' means we've sent or received a message from them "); buf.append("with '++' means we've sent or received a message from them ");
buf.append("in the last five minutes</i><br />"); buf.append("in the last five minutes</i><br />");
buf.append("<b>Thresholds:</b><br />"); buf.append("<b>Thresholds:</b><br />");
buf.append("<b>Speed:</b> ").append(num(_thresholdSpeedValue)).append(" (").append(fast).append(" fast peers)<br />"); buf.append("<b>Speed:</b> ").append(num(_thresholdSpeedValue)).append(" (").append(fast).append(" fast peers)<br />");
buf.append("<b>Reliability:</b> ").append(num(_thresholdReliabilityValue)).append(" (").append(reliable).append(" reliable peers)<br />"); buf.append("<b>Capacity:</b> ").append(num(_thresholdCapacityValue)).append(" (").append(reliable).append(" high capacity peers)<br />");
buf.append("<b>Integration:</b> ").append(num(_thresholdIntegrationValue)).append(" (").append(integrated).append(" well integrated peers)<br />"); buf.append("<b>Integration:</b> ").append(num(_thresholdIntegrationValue)).append(" (").append(integrated).append(" well integrated peers)<br />");
out.write(buf.toString().getBytes()); out.write(buf.toString().getBytes());
} }
/**
* How much should we shrink (or grow) the average reliability to determine the
* threshold - numbers greater than 1 increase the threshold, less than 1 decrease
* it. This can be changed during runtime by updating the router.config
*
* @return factor to multiply the average reliability with to determine the threshold
*/
private double getReliabilityThresholdFactor() {
if (_context.router() != null) {
String val = _context.router().getConfigSetting(PROP_RELIABILITY_THRESHOLD_FACTOR);
if (val != null) {
try {
double rv = Double.parseDouble(val);
if (_log.shouldLog(Log.DEBUG))
_log.debug("router config said " + PROP_RELIABILITY_THRESHOLD_FACTOR + '=' + val);
return rv;
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Reliability threshold factor improperly set in the router config [" + val + "]", nfe);
}
}
}
String val = _context.getProperty(PROP_RELIABILITY_THRESHOLD_FACTOR, ""+DEFAULT_RELIABILITY_THRESHOLD_FACTOR);
if (val != null) {
try {
double rv = Double.parseDouble(val);
if (_log.shouldLog(Log.DEBUG))
_log.debug("router context said " + PROP_RELIABILITY_THRESHOLD_FACTOR+ '=' + val);
return rv;
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Reliability threshold factor improperly set in the router environment [" + val + "]", nfe);
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("no config for " + PROP_RELIABILITY_THRESHOLD_FACTOR + ", using " + DEFAULT_RELIABILITY_THRESHOLD_FACTOR);
return DEFAULT_RELIABILITY_THRESHOLD_FACTOR;
}
/** /**
* Defines the minimum number of 'fast' peers that the organizer should select. If * Defines the minimum number of 'fast' peers that the organizer should select. If
* the profile calculators derive a threshold that does not select at least this many peers, * the profile calculators derive a threshold that does not select at least this many peers,
@ -738,7 +716,7 @@ public class ProfileOrganizer {
* This parameter should help deal with a lack of diversity in the tunnels created when some * This parameter should help deal with a lack of diversity in the tunnels created when some
* peers are particularly fast. * peers are particularly fast.
* *
* @return minimum number of peers to be placed in the 'fast+reliable' group * @return minimum number of peers to be placed in the 'fast' group
*/ */
protected int getMinimumFastPeers() { protected int getMinimumFastPeers() {
if (_context.router() != null) { if (_context.router() != null) {
@ -775,4 +753,50 @@ public class ProfileOrganizer {
private final static DecimalFormat _fmt = new DecimalFormat("###,##0.00", new DecimalFormatSymbols(Locale.UK)); private final static DecimalFormat _fmt = new DecimalFormat("###,##0.00", new DecimalFormatSymbols(Locale.UK));
private final static String num(double num) { synchronized (_fmt) { return _fmt.format(num); } } private final static String num(double num) { synchronized (_fmt) { return _fmt.format(num); } }
/**
* Read in all of the profiles specified and print out
* their calculated values. Usage: <pre>
* ProfileOrganizer [filename]*
* </pre>
*/
public static void main(String args[]) {
RouterContext ctx = new RouterContext(new net.i2p.router.Router());
ProfileOrganizer organizer = new ProfileOrganizer(ctx);
organizer.setUs(Hash.FAKE_HASH);
ProfilePersistenceHelper helper = new ProfilePersistenceHelper(ctx);
for (int i = 0; i < args.length; i++) {
PeerProfile profile = helper.readProfile(new java.io.File(args[i]));
if (profile == null) {
System.err.println("Could not load profile " + args[i]);
continue;
}
organizer.addProfile(profile);
}
organizer.reorganize();
DecimalFormat fmt = new DecimalFormat("0,000.0");
fmt.setPositivePrefix("+");
for (Iterator iter = organizer.selectAllPeers().iterator(); iter.hasNext(); ) {
Hash peer = (Hash)iter.next();
PeerProfile profile = organizer.getProfile(peer);
if (!profile.getIsActive()) continue;
System.out.println("Peer " + profile.getPeer().toBase64().substring(0,4)
+ " [" + (organizer.isFast(peer) ? "F+R" :
organizer.isHighCapacity(peer) ? "R " :
organizer.isFailing(peer) ? "X " : " ") + "]: "
+ "\t Speed:\t" + fmt.format(profile.getSpeedValue())
+ " Reliability:\t" + fmt.format(profile.getReliabilityValue())
+ " Capacity:\t" + fmt.format(profile.getCapacityValue())
+ " Integration:\t" + fmt.format(profile.getIntegrationValue())
+ " Active?\t" + profile.getIsActive()
+ " Failing?\t" + profile.getIsFailing());
}
System.out.println("Thresholds:");
System.out.println("Speed: " + num(organizer.getSpeedThreshold()) + " (" + organizer.countFastPeers() + " fast peers)");
System.out.println("Capacity: " + num(organizer.getCapacityThreshold()) + " (" + organizer.countHighCapacityPeers() + " reliable peers)");
}
} }

View File

@ -65,10 +65,10 @@ class ProfilePersistenceHelper {
String groups = null; String groups = null;
if (_context.profileOrganizer().isFailing(profile.getPeer())) { if (_context.profileOrganizer().isFailing(profile.getPeer())) {
groups = "failing"; groups = "failing";
} else if (!_context.profileOrganizer().isReliable(profile.getPeer())) { } else if (!_context.profileOrganizer().isHighCapacity(profile.getPeer())) {
groups = "not failing"; groups = "not failing";
} else { } else {
if (_context.profileOrganizer().isFastAndReliable(profile.getPeer())) if (_context.profileOrganizer().isFast(profile.getPeer()))
groups = "fast and reliable"; groups = "fast and reliable";
else else
groups = "reliable"; groups = "reliable";

View File

@ -22,6 +22,7 @@ public class TunnelHistory {
private volatile long _lifetimeFailed; private volatile long _lifetimeFailed;
private volatile long _lastFailed; private volatile long _lastFailed;
private RateStat _rejectRate; private RateStat _rejectRate;
private RateStat _failRate;
public TunnelHistory(RouterContext context) { public TunnelHistory(RouterContext context) {
_context = context; _context = context;
@ -36,7 +37,8 @@ public class TunnelHistory {
} }
private void createRates() { private void createRates() {
_rejectRate = new RateStat("tunnelHistory.rejectRate", "How often does this peer reject a tunnel request?", "tunnelHistory", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); _rejectRate = new RateStat("tunnelHistory.rejectRate", "How often does this peer reject a tunnel request?", "tunnelHistory", new long[] { 60*1000l, 10*60*1000l, 30*60*1000l, 60*60*1000l, 24*60*60*1000l });
_failRate = new RateStat("tunnelHistory.failRate", "How often do tunnels this peer accepts fail?", "tunnelHistory", new long[] { 60*1000l, 10*60*1000l, 30*60*1000l, 60*60*1000l, 24*60*60*1000l });
} }
/** total tunnels the peer has agreed to participate in */ /** total tunnels the peer has agreed to participate in */
@ -63,6 +65,7 @@ public class TunnelHistory {
} }
public void incrementFailed() { public void incrementFailed() {
_lifetimeFailed++; _lifetimeFailed++;
_failRate.addData(1, 1);
_lastFailed = _context.clock().now(); _lastFailed = _context.clock().now();
} }
@ -74,6 +77,14 @@ public class TunnelHistory {
public void setLastFailed(long when) { _lastFailed = when; } public void setLastFailed(long when) { _lastFailed = when; }
public RateStat getRejectionRate() { return _rejectRate; } public RateStat getRejectionRate() { return _rejectRate; }
public RateStat getFailedRate() { return _failRate; }
public void coallesceStats() {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Coallescing stats");
_rejectRate.coallesceStats();
_failRate.coallesceStats();
}
private final static String NL = System.getProperty("line.separator"); private final static String NL = System.getProperty("line.separator");
@ -91,6 +102,7 @@ public class TunnelHistory {
add(buf, "lifetimeRejected", _lifetimeRejected, "How many tunnels has the peer ever refused to participate in?"); add(buf, "lifetimeRejected", _lifetimeRejected, "How many tunnels has the peer ever refused to participate in?");
out.write(buf.toString().getBytes()); out.write(buf.toString().getBytes());
_rejectRate.store(out, "tunnelHistory.rejectRate"); _rejectRate.store(out, "tunnelHistory.rejectRate");
_rejectRate.store(out, "tunnelHistory.failRate");
} }
private void add(StringBuffer buf, String name, long val, String description) { private void add(StringBuffer buf, String name, long val, String description) {
@ -107,12 +119,15 @@ public class TunnelHistory {
_lifetimeRejected = getLong(props, "tunnels.lifetimeRejected"); _lifetimeRejected = getLong(props, "tunnels.lifetimeRejected");
try { try {
_rejectRate.load(props, "tunnelHistory.rejectRate", true); _rejectRate.load(props, "tunnelHistory.rejectRate", true);
_log.debug("Loading tunnelHistory.rejectRate"); if (_log.shouldLog(Log.DEBUG))
_log.debug("Loading tunnelHistory.rejectRate");
_rejectRate.load(props, "tunnelHistory.failRate", true);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Loading tunnelHistory.failRate");
} catch (IllegalArgumentException iae) { } catch (IllegalArgumentException iae) {
_log.warn("TunnelHistory reject rate is corrupt, resetting", iae); _log.warn("TunnelHistory rates are corrupt, resetting", iae);
createRates(); createRates();
} }
} }
private final static long getLong(Properties props, String key) { private final static long getLong(Properties props, String key) {