2005-03-16 jrandom

* Adjust the old speed calculator to include end to end RTT data in its
      estimates, and use that as the primary speed calculator again.
    * Use the mean of the high capacity speeds to determine the fast
      threshold, rather than the median.  Perhaps we should use the mean of
      all active non-failing peers?
    * Updated the profile page to sort by tier, then alphabetically.
    * Added some alternative socketManager factories (good call aum!)
This commit is contained in:
jrandom
2005-03-17 05:29:55 +00:00
committed by zzz
parent 046778404e
commit 538dd07e7b
13 changed files with 248 additions and 60 deletions

View File

@ -116,8 +116,8 @@ public class RouterContext extends I2PAppContext {
_throttle = new RouterDoSThrottle(this);
_isFailingCalc = new IsFailingCalculator(this);
_integrationCalc = new IntegrationCalculator(this);
_speedCalc = new StrictSpeedCalculator(this);
_oldSpeedCalc = new SpeedCalculator(this);
_speedCalc = new SpeedCalculator(this);
_oldSpeedCalc = new StrictSpeedCalculator(this);
_reliabilityCalc = new ReliabilityCalculator(this);
_capacityCalc = new CapacityCalculator(this);
}

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.162 $ $Date: 2005/03/11 17:23:41 $";
public final static String ID = "$Revision: 1.163 $ $Date: 2005/03/14 22:47:15 $";
public final static String VERSION = "0.5.0.2";
public final static long BUILD = 3;
public final static long BUILD = 4;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -52,10 +52,11 @@ class OutboundClientMessageJobHelper {
* @return garlic, or null if no tunnels were found (or other errors)
*/
static GarlicMessage createGarlicMessage(RouterContext ctx, long replyToken, long expiration, PublicKey recipientPK,
Payload data, Hash from, Destination dest, SessionKey wrappedKey, Set wrappedTags,
Payload data, Hash from, Destination dest, TunnelInfo replyTunnel,
SessionKey wrappedKey, Set wrappedTags,
boolean requireAck, LeaseSet bundledReplyLeaseSet) {
PayloadGarlicConfig dataClove = buildDataClove(ctx, data, dest, expiration);
return createGarlicMessage(ctx, replyToken, expiration, recipientPK, dataClove, from, dest, wrappedKey,
return createGarlicMessage(ctx, replyToken, expiration, recipientPK, dataClove, from, dest, replyTunnel, wrappedKey,
wrappedTags, requireAck, bundledReplyLeaseSet);
}
/**
@ -65,9 +66,9 @@ class OutboundClientMessageJobHelper {
* @return garlic, or null if no tunnels were found (or other errors)
*/
static GarlicMessage createGarlicMessage(RouterContext ctx, long replyToken, long expiration, PublicKey recipientPK,
PayloadGarlicConfig dataClove, Hash from, Destination dest, SessionKey wrappedKey,
PayloadGarlicConfig dataClove, Hash from, Destination dest, TunnelInfo replyTunnel, SessionKey wrappedKey,
Set wrappedTags, boolean requireAck, LeaseSet bundledReplyLeaseSet) {
GarlicConfig config = createGarlicConfig(ctx, replyToken, expiration, recipientPK, dataClove, from, dest, requireAck, bundledReplyLeaseSet);
GarlicConfig config = createGarlicConfig(ctx, replyToken, expiration, recipientPK, dataClove, from, dest, replyTunnel, requireAck, bundledReplyLeaseSet);
if (config == null)
return null;
GarlicMessage msg = GarlicMessageBuilder.buildMessage(ctx, config, wrappedKey, wrappedTags);
@ -75,7 +76,7 @@ class OutboundClientMessageJobHelper {
}
private static GarlicConfig createGarlicConfig(RouterContext ctx, long replyToken, long expiration, PublicKey recipientPK,
PayloadGarlicConfig dataClove, Hash from, Destination dest, boolean requireAck,
PayloadGarlicConfig dataClove, Hash from, Destination dest, TunnelInfo replyTunnel, boolean requireAck,
LeaseSet bundledReplyLeaseSet) {
Log log = ctx.logManager().getLog(OutboundClientMessageJobHelper.class);
if (log.shouldLog(Log.DEBUG))
@ -85,7 +86,7 @@ class OutboundClientMessageJobHelper {
config.addClove(dataClove);
if (requireAck) {
PayloadGarlicConfig ackClove = buildAckClove(ctx, from, replyToken, expiration);
PayloadGarlicConfig ackClove = buildAckClove(ctx, from, replyTunnel, replyToken, expiration);
if (ackClove == null)
return null; // no tunnels
config.addClove(ackClove);
@ -122,14 +123,13 @@ class OutboundClientMessageJobHelper {
/**
* Build a clove that sends a DeliveryStatusMessage to us
*/
private static PayloadGarlicConfig buildAckClove(RouterContext ctx, Hash from, long replyToken, long expiration) {
private static PayloadGarlicConfig buildAckClove(RouterContext ctx, Hash from, TunnelInfo replyToTunnel, long replyToken, long expiration) {
Log log = ctx.logManager().getLog(OutboundClientMessageJobHelper.class);
PayloadGarlicConfig ackClove = new PayloadGarlicConfig();
Hash replyToTunnelRouter = null; // inbound tunnel gateway
TunnelId replyToTunnelId = null; // tunnel id on that gateway
TunnelInfo replyToTunnel = ctx.tunnelManager().selectInboundTunnel(from);
if (replyToTunnel == null) {
if (log.shouldLog(Log.ERROR))
log.error("Unable to send client message from " + from.toBase64()

View File

@ -61,6 +61,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
private long _start;
private boolean _finished;
private long _leaseSetLookupBegin;
private TunnelInfo _outTunnel;
private TunnelInfo _inTunnel;
/**
* final timeout (in milliseconds) that the outbound message will fail in.
@ -312,10 +314,12 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
replyLeaseSet = getContext().netDb().lookupLeaseSetLocally(_from.calculateHash());
}
_inTunnel = selectInboundTunnel();
GarlicMessage msg = OutboundClientMessageJobHelper.createGarlicMessage(getContext(), token,
_overallExpiration, key,
_clove, _from.calculateHash(),
_to,
_to, _inTunnel,
sessKey, tags,
true, replyLeaseSet);
if (msg == null) {
@ -341,16 +345,16 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
+ _lease.getTunnelId() + " on "
+ _lease.getGateway().toBase64());
TunnelInfo outTunnel = selectOutboundTunnel();
if (outTunnel != null) {
_outTunnel = selectOutboundTunnel();
if (_outTunnel != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": Sending tunnel message out " + outTunnel.getSendTunnelId(0) + " to "
_log.debug(getJobId() + ": Sending tunnel message out " + _outTunnel.getSendTunnelId(0) + " to "
+ _toString + " at "
+ _lease.getTunnelId() + " on "
+ _lease.getGateway().toBase64());
// dispatch may take 100+ms, so toss it in its own job
getContext().jobQueue().addJob(new DispatchJob(getContext(), msg, outTunnel, selector, onReply, onFail, (int)(_overallExpiration-getContext().clock().now())));
getContext().jobQueue().addJob(new DispatchJob(getContext(), msg, selector, onReply, onFail, (int)(_overallExpiration-getContext().clock().now())));
} else {
if (_log.shouldLog(Log.ERROR))
_log.error(getJobId() + ": Could not find any outbound tunnels to send the payload through... wtf?");
@ -362,15 +366,13 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
private class DispatchJob extends JobImpl {
private GarlicMessage _msg;
private TunnelInfo _outTunnel;
private ReplySelector _selector;
private SendSuccessJob _replyFound;
private SendTimeoutJob _replyTimeout;
private int _timeoutMs;
public DispatchJob(RouterContext ctx, GarlicMessage msg, TunnelInfo out, ReplySelector sel, SendSuccessJob success, SendTimeoutJob timeout, int timeoutMs) {
public DispatchJob(RouterContext ctx, GarlicMessage msg, ReplySelector sel, SendSuccessJob success, SendTimeoutJob timeout, int timeoutMs) {
super(ctx);
_msg = msg;
_outTunnel = out;
_selector = sel;
_replyFound = success;
_replyTimeout = timeout;
@ -396,6 +398,13 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
private TunnelInfo selectOutboundTunnel() {
return getContext().tunnelManager().selectOutboundTunnel(_from.calculateHash());
}
/**
* Pick an arbitrary outbound tunnel for any deliveryStatusMessage to come back in
*
*/
private TunnelInfo selectInboundTunnel() {
return getContext().tunnelManager().selectInboundTunnel(_from.calculateHash());
}
/**
* give up the ghost, this message just aint going through. tell the client to fuck off.
@ -540,8 +549,14 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
getContext().statManager().addRateData("client.sendAckTime", sendTime, 0);
getContext().statManager().addRateData("client.sendMessageSize", _clientMessageSize, sendTime);
if (_outTunnel != null)
for (int i = 0; i < _outTunnel.getLength(); i++)
getContext().profileManager().tunnelTestSucceeded(_outTunnel.getPeer(i), sendTime);
if (_inTunnel != null)
for (int i = 0; i < _inTunnel.getLength(); i++)
getContext().profileManager().tunnelTestSucceeded(_inTunnel.getPeer(i), sendTime);
}
public void setMessage(I2NPMessage msg) {}
}

View File

@ -26,6 +26,7 @@ public class PeerProfile {
private RateStat _dbResponseTime = null;
private RateStat _tunnelCreateResponseTime = null;
private RateStat _tunnelTestResponseTime = null;
private RateStat _tunnelTestResponseTimeSlow = null;
private RateStat _commError = null;
private RateStat _dbIntroduction = null;
// calculation bonuses
@ -143,6 +144,8 @@ public class PeerProfile {
public RateStat getTunnelCreateResponseTime() { return _tunnelCreateResponseTime; }
/** how long it takes to successfully test a tunnel this peer participates in (in milliseconds), calculated over a 10 minute, 1 hour, and 1 day period */
public RateStat getTunnelTestResponseTime() { return _tunnelTestResponseTime; }
/** how long it takes to successfully test the peer (in milliseconds) when the time exceeds 5s */
public RateStat getTunnelTestResponseTimeSlow() { return _tunnelTestResponseTimeSlow; }
/** how long between communication errors with the peer (disconnection, etc), calculated over a 1 minute, 1 hour, and 1 day period */
public RateStat getCommError() { return _commError; }
/** how many new peers we get from dbSearchReplyMessages or dbStore messages, calculated over a 1 hour, 1 day, and 1 week period */
@ -224,6 +227,7 @@ public class PeerProfile {
_dbResponseTime = null;
_tunnelCreateResponseTime = null;
_tunnelTestResponseTime = null;
_tunnelTestResponseTimeSlow = null;
_commError = null;
_dbIntroduction = null;
_tunnelHistory = null;
@ -253,6 +257,8 @@ public class PeerProfile {
_tunnelCreateResponseTime = new RateStat("tunnelCreateResponseTime", "how long it takes to get a tunnel create response from the peer (in milliseconds)", group, new long[] { 10*60*1000l, 30*60*1000l, 60*60*1000l, 24*60*60*1000 } );
if (_tunnelTestResponseTime == null)
_tunnelTestResponseTime = new RateStat("tunnelTestResponseTime", "how long it takes to successfully test a tunnel this peer participates in (in milliseconds)", group, new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000 } );
if (_tunnelTestResponseTimeSlow == null)
_tunnelTestResponseTimeSlow = new RateStat("tunnelTestResponseTimeSlow", "how long it takes to successfully test a peer when the time exceeds 5s", group, new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l, });
if (_commError == null)
_commError = new RateStat("commErrorRate", "how long between communication errors with the peer (e.g. disconnection)", group, new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000 } );
if (_dbIntroduction == null)
@ -269,6 +275,7 @@ public class PeerProfile {
_dbResponseTime.setStatLog(_context.statManager().getStatLog());
_tunnelCreateResponseTime.setStatLog(_context.statManager().getStatLog());
_tunnelTestResponseTime.setStatLog(_context.statManager().getStatLog());
_tunnelTestResponseTimeSlow.setStatLog(_context.statManager().getStatLog());
_commError.setStatLog(_context.statManager().getStatLog());
_dbIntroduction.setStatLog(_context.statManager().getStatLog());
_expanded = true;
@ -285,6 +292,7 @@ public class PeerProfile {
_sendSuccessSize.coalesceStats();
_tunnelCreateResponseTime.coalesceStats();
_tunnelTestResponseTime.coalesceStats();
_tunnelTestResponseTimeSlow.coalesceStats();
_dbHistory.coalesceStats();
_tunnelHistory.coalesceStats();

View File

@ -112,6 +112,13 @@ public class ProfileManagerImpl implements ProfileManager {
PeerProfile data = getProfile(peer);
if (data == null) return;
data.getTunnelTestResponseTime().addData(responseTimeMs, responseTimeMs);
if (responseTimeMs > getSlowThreshold())
data.getTunnelTestResponseTimeSlow().addData(responseTimeMs, responseTimeMs);
}
private int getSlowThreshold() {
// perhaps we should have this compare vs. tunnel.testSuccessTime?
return 5*1000;
}
/**

View File

@ -644,6 +644,10 @@ public class ProfileOrganizer {
* (highest first) for active nonfailing peers
*/
private void locked_calculateSpeedThreshold(Set reordered) {
if (true) {
locked_calculateSpeedThresholdMean(reordered);
return;
}
Set speeds = new TreeSet();
for (Iterator iter = reordered.iterator(); iter.hasNext(); ) {
PeerProfile profile = (PeerProfile)iter.next();
@ -669,6 +673,28 @@ public class ProfileOrganizer {
_log.info("Threshold value for speed: " + _thresholdSpeedValue + " out of speeds: " + speeds);
}
private void locked_calculateSpeedThresholdMean(Set reordered) {
double total = 0;
int count = 0;
for (Iterator iter = reordered.iterator(); iter.hasNext(); ) {
PeerProfile profile = (PeerProfile)iter.next();
if (profile.getCapacityValue() >= _thresholdCapacityValue) {
// duplicates being clobbered is fine by us
total += profile.getSpeedValue();
count++;
} else {
// its ordered
break;
}
}
if (count > 0)
_thresholdSpeedValue = total / count;
if (_log.shouldLog(Log.INFO))
_log.info("Threshold value for speed: " + _thresholdSpeedValue + " out of speeds: " + count);
}
/** simple average, or 0 if NaN */
private final static double avg(double total, double quantity) {
if ( (total > 0) && (quantity > 0) )

View File

@ -6,10 +6,11 @@ import java.io.Writer;
import java.text.DecimalFormat;
import java.text.DecimalFormatSymbols;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Locale;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import net.i2p.data.Hash;
import net.i2p.router.RouterContext;
@ -21,23 +22,25 @@ import net.i2p.router.RouterContext;
class ProfileOrganizerRenderer {
private RouterContext _context;
private ProfileOrganizer _organizer;
private ProfileComparator _comparator;
public ProfileOrganizerRenderer(ProfileOrganizer organizer, RouterContext context) {
_context = context;
_organizer = organizer;
_comparator = new ProfileComparator();
}
public void renderStatusHTML(Writer out) throws IOException {
Set peers = _organizer.selectAllPeers();
long hideBefore = _context.clock().now() - 3*60*60*1000;
TreeMap order = new TreeMap();
TreeSet order = new TreeSet(_comparator);
for (Iterator iter = peers.iterator(); iter.hasNext();) {
Hash peer = (Hash)iter.next();
if (_organizer.getUs().equals(peer)) continue;
PeerProfile prof = _organizer.getProfile(peer);
if (prof.getLastSendSuccessful() <= hideBefore) continue;
order.put(peer.toBase64(), prof);
order.add(prof);
}
int fast = 0;
@ -56,11 +59,35 @@ class ProfileOrganizerRenderer {
buf.append("<td><b>Failing?</b></td>");
buf.append("<td>&nbsp;</td>");
buf.append("</tr>");
for (Iterator iter = order.keySet().iterator(); iter.hasNext();) {
String name = (String)iter.next();
PeerProfile prof = (PeerProfile)order.get(name);
int prevTier = 1;
for (Iterator iter = order.iterator(); iter.hasNext();) {
PeerProfile prof = (PeerProfile)iter.next();
Hash peer = prof.getPeer();
int tier = 0;
boolean isIntegrated = false;
if (_organizer.isFast(peer)) {
tier = 1;
fast++;
reliable++;
} else if (_organizer.isHighCapacity(peer)) {
tier = 2;
reliable++;
} else if (_organizer.isFailing(peer)) {
failing++;
} else {
tier = 3;
}
if (_organizer.isWellIntegrated(peer)) {
isIntegrated = true;
integrated++;
}
if (tier != prevTier)
buf.append("<tr><td colspan=\"7\"><hr /></td></tr>\n");
prevTier = tier;
buf.append("<tr>");
buf.append("<td><code>");
if (prof.getIsFailing()) {
@ -74,25 +101,6 @@ class ProfileOrganizerRenderer {
}
buf.append("</code></td>");
buf.append("<td>");
int tier = 0;
boolean isIntegrated = false;
if (_organizer.isFast(peer)) {
tier = 1;
fast++;
reliable++;
} else if (_organizer.isHighCapacity(peer)) {
tier = 2;
reliable++;
} else if (_organizer.isFailing(peer)) {
failing++;
} else {
tier = 3;
}
if (_organizer.isWellIntegrated(peer)) {
isIntegrated = true;
integrated++;
}
switch (tier) {
case 1: buf.append("Fast"); break;
@ -131,6 +139,56 @@ class ProfileOrganizerRenderer {
out.flush();
}
private class ProfileComparator implements Comparator {
public int compare(Object lhs, Object rhs) {
if ( (lhs == null) || (rhs == null) )
throw new NullPointerException("lhs=" + lhs + " rhs=" + rhs);
if ( !(lhs instanceof PeerProfile) || !(rhs instanceof PeerProfile) )
throw new ClassCastException("lhs=" + lhs.getClass().getName() + " rhs=" + rhs.getClass().getName());
PeerProfile left = (PeerProfile)lhs;
PeerProfile right = (PeerProfile)rhs;
if (_context.profileOrganizer().isFast(left.getPeer())) {
if (_context.profileOrganizer().isFast(right.getPeer())) {
return compareHashes(left, right);
} else {
return -1; // fast comes first
}
} else if (_context.profileOrganizer().isHighCapacity(left.getPeer())) {
if (_context.profileOrganizer().isFast(right.getPeer())) {
return 1;
} else if (_context.profileOrganizer().isHighCapacity(right.getPeer())) {
return compareHashes(left, right);
} else {
return -1;
}
} else if (_context.profileOrganizer().isFailing(left.getPeer())) {
if (_context.profileOrganizer().isFailing(right.getPeer())) {
return compareHashes(left, right);
} else {
return 1;
}
} else {
// left is not failing
if (_context.profileOrganizer().isFast(right.getPeer())) {
return 1;
} else if (_context.profileOrganizer().isHighCapacity(right.getPeer())) {
return 1;
} else if (_context.profileOrganizer().isFailing(right.getPeer())) {
return -1;
} else {
return compareHashes(left, right);
}
}
}
private int compareHashes(PeerProfile left, PeerProfile right) {
return left.getPeer().toBase64().compareTo(right.getPeer().toBase64());
}
}
private final static DecimalFormat _fmt = new DecimalFormat("###,##0.00", new DecimalFormatSymbols(Locale.UK));
private final static String num(double num) { synchronized (_fmt) { return _fmt.format(num); } }
}

View File

@ -126,6 +126,7 @@ class ProfilePersistenceHelper {
profile.getSendSuccessSize().store(out, "sendSuccessSize");
profile.getTunnelCreateResponseTime().store(out, "tunnelCreateResponseTime");
profile.getTunnelTestResponseTime().store(out, "tunnelTestResponseTime");
profile.getTunnelTestResponseTimeSlow().store(out, "tunnelTestResponseTimeSlow");
}
}
@ -189,6 +190,7 @@ class ProfilePersistenceHelper {
profile.getSendSuccessSize().load(props, "sendSuccessSize", true);
profile.getTunnelCreateResponseTime().load(props, "tunnelCreateResponseTime", true);
profile.getTunnelTestResponseTime().load(props, "tunnelTestResponseTime", true);
profile.getTunnelTestResponseTimeSlow().load(props, "tunnelTestResponseTimeSlow", true);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Loaded the profile for " + peer.toBase64() + " from " + file.getName());

View File

@ -72,10 +72,35 @@ public class SpeedCalculator extends Calculator {
double estimateFactor = getEstimateFactor(threshold, events);
double rv = (1-estimateFactor)*measuredRTPerMinute + (estimateFactor)*estimatedRTPerMinute;
long slowCount = 0;
RateStat rs = profile.getTunnelTestResponseTimeSlow();
if (rs != null) {
Rate r = rs.getRate(period);
if (r != null)
slowCount = r.getCurrentEventCount();
}
long fastCount = 0;
rs = profile.getTunnelTestResponseTime();
if (rs != null) {
Rate r = rs.getRate(period);
if (r != null)
fastCount = r.getCurrentEventCount();
}
double slowPct = 0;
if (fastCount > 0)
slowPct = slowCount / fastCount;
else
rv /= 100; // if there are no tunnel tests, weigh against it
if (slowPct > 0.01) // if 1% of the traffic is dogshit slow, the peer probably sucks
rv /= 100.0*slowPct;
rv = adjust(period, rv);
if (_log.shouldLog(Log.DEBUG)) {
_log.debug("\n\nrv: " + rv + " events: " + events + " threshold: " + threshold + " period: " + period + " useTunnelTestOnly? " + tunnelTestOnly + "\n"
+ "measuredRTT: " + measuredRoundTripTime + " measured events per minute: " + measuredRTPerMinute + "\n"
+ "estimateRTT: " + estimatedRoundTripTime + " estimated events per minute: " + estimatedRTPerMinute + "\n"
+ "slow count: " + slowCount + " fast count: " + fastCount + "\n"
+ "estimateFactor: " + estimateFactor + "\n"
+ "for peer: " + profile.getPeer().toBase64());
}
@ -83,6 +108,19 @@ public class SpeedCalculator extends Calculator {
rv += profile.getSpeedBonus();
return rv;
}
private double adjust(long period, double value) {
switch ((int)period) {
case 10*60*1000:
return value;
case 60*60*1000:
return value * 0.5;
case 24*60*60*1000:
return value * 0.001;
default:
return value * 0.0001;
}
}
/**
* How much do we want to prefer the measured values more than the estimated