diff --git a/router/java/src/net/i2p/router/ProfileManager.java b/router/java/src/net/i2p/router/ProfileManager.java index c49c85bd7a..28c2683acc 100644 --- a/router/java/src/net/i2p/router/ProfileManager.java +++ b/router/java/src/net/i2p/router/ProfileManager.java @@ -50,6 +50,13 @@ public interface ProfileManager { */ void tunnelRejected(Hash peer, long responseTimeMs); + /** + * Note that a tunnel that the router is participating in + * was successfully tested with the given round trip latency + * + */ + void tunnelTestSucceeded(Hash peer, long responseTimeMs); + /** * Note that the peer participated in a tunnel that failed. Its failure may not have * been the peer's fault however. diff --git a/router/java/src/net/i2p/router/StatisticsManager.java b/router/java/src/net/i2p/router/StatisticsManager.java index 6142c658d8..435da9fabb 100644 --- a/router/java/src/net/i2p/router/StatisticsManager.java +++ b/router/java/src/net/i2p/router/StatisticsManager.java @@ -104,6 +104,7 @@ public class StatisticsManager implements Service { includeRate("jobQueue.droppedJobs", stats, new long[] { 60*60*1000, 24*60*60*1000 }); includeRate("inNetPool.dropped", stats, new long[] { 60*60*1000, 24*60*60*1000 }); includeRate("tunnel.participatingTunnels", stats, new long[] { 5*60*1000, 60*60*1000 }); + includeRate("tunnel.testSuccessTime", stats, new long[] { 60*60*1000l, 24*60*60*1000l }); includeRate("netDb.lookupsReceived", stats, new long[] { 5*60*1000, 60*60*1000 }); includeRate("netDb.lookupsHandled", stats, new long[] { 5*60*1000, 60*60*1000 }); includeRate("netDb.lookupsMatched", stats, new long[] { 5*60*1000, 60*60*1000 }); diff --git a/router/java/src/net/i2p/router/TunnelInfo.java b/router/java/src/net/i2p/router/TunnelInfo.java index 7589cea715..f64656ebc0 100644 --- a/router/java/src/net/i2p/router/TunnelInfo.java +++ b/router/java/src/net/i2p/router/TunnelInfo.java @@ -48,6 +48,7 @@ public class TunnelInfo extends DataStructureImpl { private Properties _options; private TunnelSettings _settings; private long _created; + private long _lastTested; private boolean _ready; private boolean _wasEverReady; @@ -67,6 +68,7 @@ public class TunnelInfo extends DataStructureImpl { _ready = false; _wasEverReady = false; _created = _context.clock().now(); + _lastTested = -1; } public TunnelId getTunnelId() { return _id; } @@ -142,6 +144,10 @@ public class TunnelInfo extends DataStructureImpl { public long getCreated() { return _created; } + /** when was the peer last tested (or -1 if never)? */ + public long getLastTested() { return _lastTested; } + public void setLastTested(long when) { _lastTested = when; } + /** * Number of hops left in the tunnel (including this one) * diff --git a/router/java/src/net/i2p/router/peermanager/PeerProfile.java b/router/java/src/net/i2p/router/peermanager/PeerProfile.java index 3eda878848..4e1a3b08b5 100644 --- a/router/java/src/net/i2p/router/peermanager/PeerProfile.java +++ b/router/java/src/net/i2p/router/peermanager/PeerProfile.java @@ -4,8 +4,9 @@ import net.i2p.data.Hash; import net.i2p.stat.RateStat; import net.i2p.util.Log; import net.i2p.router.RouterContext; +import java.io.File; -class PeerProfile { +public class PeerProfile { private Log _log; private RouterContext _context; // whoozaat? @@ -22,6 +23,7 @@ class PeerProfile { private RateStat _receiveSize = null; private RateStat _dbResponseTime = null; private RateStat _tunnelCreateResponseTime = null; + private RateStat _tunnelTestResponseTime = null; private RateStat _commError = null; private RateStat _dbIntroduction = null; // calculation bonuses @@ -63,7 +65,7 @@ class PeerProfile { public void setPeer(Hash peer) { _peer = peer; } /** - * are we keeping an expanded profile on the peer, or just the bare minimum? + * are we keeping an expanded profile on the peer, or just the bare minimum. * If we aren't keeping the expanded profile, all of the rates as well as the * TunnelHistory and DBHistory will not be available. * @@ -123,7 +125,9 @@ class PeerProfile { public RateStat getDbResponseTime() { return _dbResponseTime; } /** how long it takes to get a tunnel create response from the peer (in milliseconds), calculated over a 1 minute, 1 hour, and 1 day period */ public RateStat getTunnelCreateResponseTime() { return _tunnelCreateResponseTime; } - /** how long between communication errors with the peer (e.g. disconnection), calculated over a 1 minute, 1 hour, and 1 day period */ + /** how long it takes to successfully test a tunnel this peer participates in (in milliseconds), calculated over a 10 minute, 1 hour, and 1 day period */ + public RateStat getTunnelTestResponseTime() { return _tunnelTestResponseTime; } + /** how long between communication errors with the peer (disconnection, etc), calculated over a 1 minute, 1 hour, and 1 day period */ public RateStat getCommError() { return _commError; } /** how many new peers we get from dbSearchReplyMessages or dbStore messages, calculated over a 1 hour, 1 day, and 1 week period */ public RateStat getDbIntroduction() { return _dbIntroduction; } @@ -160,7 +164,7 @@ class PeerProfile { */ public double getSpeedValue() { return _speedValue; } /** - * How likely are they to stay up and pass on messages over the next few minutes? + * How likely are they to stay up and pass on messages over the next few minutes. * Positive numbers means more likely, negative numbers means its probably not * even worth trying. * @@ -189,6 +193,7 @@ class PeerProfile { _receiveSize = null; _dbResponseTime = null; _tunnelCreateResponseTime = null; + _tunnelTestResponseTime = null; _commError = null; _dbIntroduction = null; _tunnelHistory = null; @@ -212,9 +217,11 @@ class PeerProfile { if (_receiveSize == null) _receiveSize = new RateStat("receiveSize", "How large received messages are", "profile", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l, 24*60*60*1000 } ); if (_dbResponseTime == null) - _dbResponseTime = new RateStat("dbResponseTime", "how long it takes to get a db response from the peer (in milliseconds)", "profile", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000 } ); + _dbResponseTime = new RateStat("dbResponseTime", "how long it takes to get a db response from the peer (in milliseconds)", "profile", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000 } ); if (_tunnelCreateResponseTime == null) - _tunnelCreateResponseTime = new RateStat("tunnelCreateResponseTime", "how long it takes to get a tunnel create response from the peer (in milliseconds)", "profile", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000 } ); + _tunnelCreateResponseTime = new RateStat("tunnelCreateResponseTime", "how long it takes to get a tunnel create response from the peer (in milliseconds)", "profile", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000 } ); + if (_tunnelTestResponseTime == null) + _tunnelTestResponseTime = new RateStat("tunnelTestResponseTime", "how long it takes to successfully test a tunnel this peer participates in (in milliseconds)", "profile", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000 } ); if (_commError == null) _commError = new RateStat("commErrorRate", "how long between communication errors with the peer (e.g. disconnection)", "profile", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000 } ); if (_dbIntroduction == null) @@ -238,6 +245,7 @@ class PeerProfile { _sendFailureSize.coallesceStats(); _sendSuccessSize.coallesceStats(); _tunnelCreateResponseTime.coallesceStats(); + _tunnelTestResponseTime.coallesceStats(); _dbHistory.coallesceStats(); _speedValue = calculateSpeed(); @@ -270,7 +278,7 @@ class PeerProfile { * for an expanded profile, and ~212 bytes for a compacted one. * */ - public static void main(String args[]) { + public static void main2(String args[]) { RouterContext ctx = new RouterContext(null); testProfileSize(ctx, 100, 0); // 560KB testProfileSize(ctx, 1000, 0); // 3.9MB @@ -280,6 +288,36 @@ class PeerProfile { testProfileSize(ctx, 0, 300000); // 63MB } + /** + * Read in all of the profiles specified and print out + * their calculated values. Usage:
+ * PeerProfile [filename]* + *+ */ + public static void main(String args[]) { + RouterContext ctx = new RouterContext(new net.i2p.router.Router()); + ProfilePersistenceHelper helper = new ProfilePersistenceHelper(ctx); + try { Thread.sleep(5*1000); } catch (InterruptedException e) {} + StringBuffer buf = new StringBuffer(1024); + for (int i = 0; i < args.length; i++) { + PeerProfile profile = helper.readProfile(new File(args[i])); + if (profile == null) { + buf.append("Could not load profile ").append(args[i]).append('\n'); + continue; + } + //profile.coallesceStats(); + buf.append("Peer " + profile.getPeer().toBase64() + + ":\t Speed:\t" + profile.calculateSpeed() + + " Reliability:\t" + profile.calculateReliability() + + " Integration:\t" + profile.calculateIntegration() + + " Active?\t" + profile.getIsActive() + + " Failing?\t" + profile.calculateIsFailing() + + '\n'); + } + try { Thread.sleep(5*1000); } catch (InterruptedException e) {} + System.out.println(buf.toString()); + } + private static void testProfileSize(RouterContext ctx, int numExpanded, int numCompact) { Runtime.getRuntime().gc(); PeerProfile profs[] = new PeerProfile[numExpanded]; diff --git a/router/java/src/net/i2p/router/peermanager/ProfileManagerImpl.java b/router/java/src/net/i2p/router/peermanager/ProfileManagerImpl.java index 16d93acf9f..aaef0da7dc 100644 --- a/router/java/src/net/i2p/router/peermanager/ProfileManagerImpl.java +++ b/router/java/src/net/i2p/router/peermanager/ProfileManagerImpl.java @@ -101,6 +101,17 @@ public class ProfileManagerImpl implements ProfileManager { data.getTunnelHistory().incrementRejected(); } + /** + * Note that a tunnel that the router is participating in + * was successfully tested with the given round trip latency + * + */ + public void tunnelTestSucceeded(Hash peer, long responseTimeMs) { + PeerProfile data = getProfile(peer); + if (data == null) return; + data.getTunnelTestResponseTime().addData(responseTimeMs, responseTimeMs); + } + /** * Note that the peer participated in a tunnel that failed. Its failure may not have * been the peer's fault however. diff --git a/router/java/src/net/i2p/router/peermanager/ProfilePersistenceHelper.java b/router/java/src/net/i2p/router/peermanager/ProfilePersistenceHelper.java index 21170d8ccc..43b42d8ad9 100644 --- a/router/java/src/net/i2p/router/peermanager/ProfilePersistenceHelper.java +++ b/router/java/src/net/i2p/router/peermanager/ProfilePersistenceHelper.java @@ -123,7 +123,9 @@ class ProfilePersistenceHelper { profile.getDbResponseTime().store(out, "dbResponseTime"); profile.getReceiveSize().store(out, "receiveSize"); profile.getSendFailureSize().store(out, "sendFailureSize"); - profile.getSendSuccessSize().store(out, "tunnelCreateResponseTime"); + profile.getSendSuccessSize().store(out, "sendSuccessSize"); + profile.getTunnelCreateResponseTime().store(out, "tunnelCreateResponseTime"); + profile.getTunnelTestResponseTime().store(out, "tunnelTestResponseTime"); } } @@ -154,10 +156,13 @@ class ProfilePersistenceHelper { rv.add(files[i]); return rv; } - private PeerProfile readProfile(File file) { + public PeerProfile readProfile(File file) { Hash peer = getHash(file.getName()); try { - if (peer == null) return null; + if (peer == null) { + _log.error("The file " + file.getName() + " is not a valid hash"); + return null; + } PeerProfile profile = new PeerProfile(_context, peer); Properties props = new Properties(); @@ -181,7 +186,9 @@ class ProfilePersistenceHelper { profile.getDbResponseTime().load(props, "dbResponseTime", true); profile.getReceiveSize().load(props, "receiveSize", true); profile.getSendFailureSize().load(props, "sendFailureSize", true); - profile.getSendSuccessSize().load(props, "tunnelCreateResponseTime", true); + profile.getSendSuccessSize().load(props, "sendSuccessSize", true); + profile.getTunnelCreateResponseTime().load(props, "tunnelCreateResponseTime", true); + profile.getTunnelTestResponseTime().load(props, "tunnelTestResponseTime", true); if (_log.shouldLog(Log.DEBUG)) _log.debug("Loaded the profile for " + peer.toBase64() + " from " + file.getName()); @@ -222,7 +229,7 @@ class ProfilePersistenceHelper { props.setProperty(key, val); } } catch (IOException ioe) { - _log.error("Error loading properties from " + file.getName(), ioe); + _log.warn("Error loading properties from " + file.getName(), ioe); } finally { if (in != null) try { in.close(); } catch (IOException ioe) {} } @@ -237,6 +244,7 @@ class ProfilePersistenceHelper { h.fromBase64(key); return h; } catch (DataFormatException dfe) { + _log.warn("Invalid base64 [" + key + "]", dfe); return null; } } @@ -247,10 +255,16 @@ class ProfilePersistenceHelper { private File getProfileDir() { if (_profileDir == null) { - String dir = _context.router().getConfigSetting(PROP_PEER_PROFILE_DIR); - if (dir == null) { - _log.info("No peer profile dir specified [" + PROP_PEER_PROFILE_DIR + "], using [" + DEFAULT_PEER_PROFILE_DIR + "]"); - dir = DEFAULT_PEER_PROFILE_DIR; + String dir = null; + if (_context.router() == null) { + dir = _context.getProperty(PROP_PEER_PROFILE_DIR, DEFAULT_PEER_PROFILE_DIR); + } else { + dir = _context.router().getConfigSetting(PROP_PEER_PROFILE_DIR); + if (dir == null) { + _log.info("No peer profile dir specified [" + PROP_PEER_PROFILE_DIR + + "], using [" + DEFAULT_PEER_PROFILE_DIR + "]"); + dir = DEFAULT_PEER_PROFILE_DIR; + } } _profileDir = new File(dir); } diff --git a/router/java/src/net/i2p/router/peermanager/ReliabilityCalculator.java b/router/java/src/net/i2p/router/peermanager/ReliabilityCalculator.java index bf1eb8ebe9..a1ae587253 100644 --- a/router/java/src/net/i2p/router/peermanager/ReliabilityCalculator.java +++ b/router/java/src/net/i2p/router/peermanager/ReliabilityCalculator.java @@ -29,8 +29,7 @@ public class ReliabilityCalculator extends Calculator { val += profile.getSendSuccessSize().getRate(60*60*1000).getLastEventCount(); val += profile.getSendSuccessSize().getRate(60*60*1000).getCurrentEventCount(); - val += profile.getTunnelCreateResponseTime().getRate(60*1000).getCurrentEventCount() * 10; - val += profile.getTunnelCreateResponseTime().getRate(60*1000).getLastEventCount() * 5; + val += profile.getTunnelCreateResponseTime().getRate(10*60*1000).getLastEventCount() * 5; val += profile.getTunnelCreateResponseTime().getRate(60*60*1000).getCurrentEventCount(); val += profile.getTunnelCreateResponseTime().getRate(60*60*1000).getLastEventCount(); diff --git a/router/java/src/net/i2p/router/peermanager/SpeedCalculator.java b/router/java/src/net/i2p/router/peermanager/SpeedCalculator.java index 2aac1b4e05..e2f65bfea5 100644 --- a/router/java/src/net/i2p/router/peermanager/SpeedCalculator.java +++ b/router/java/src/net/i2p/router/peermanager/SpeedCalculator.java @@ -8,60 +8,318 @@ import net.i2p.router.RouterContext; /** * Quantify how fast the peer is - how fast they respond to our requests, how fast * they pass messages on, etc. This should be affected both by their bandwidth/latency, - * as well as their load. + * as well as their load. The essence of the current algorithm is to determine + * approximately how many 2KB messages the peer can pass round trip within a single + * minute - not based just on itself though, but including the delays of other peers + * in the tunnels. As such, more events make it more accurate. * */ public class SpeedCalculator extends Calculator { private Log _log; private RouterContext _context; + /** + * minimum number of events to use a particular period's data. If this many + * events haven't occurred in the period yet, the next largest period is tried. + */ + public static final String PROP_EVENT_THRESHOLD = "speedCalculator.eventThreshold"; + public static final int DEFAULT_EVENT_THRESHOLD = 50; + /** should the calculator use instantaneous rates, or period averages? */ + public static final String PROP_USE_INSTANTANEOUS_RATES = "speedCalculator.useInstantaneousRates"; + public static final boolean DEFAULT_USE_INSTANTANEOUS_RATES = false; + /** should the calculator use tunnel test time only, or include all data? */ + public static final String PROP_USE_TUNNEL_TEST_ONLY = "speedCalculator.useTunnelTestOnly"; + public static final boolean DEFAULT_USE_TUNNEL_TEST_ONLY = true; + public SpeedCalculator(RouterContext context) { _context = context; _log = context.logManager().getLog(SpeedCalculator.class); } public double calc(PeerProfile profile) { - double dbResponseTime = profile.getDbResponseTime().getRate(60*1000).getLifetimeAverageValue(); - double tunnelResponseTime = profile.getTunnelCreateResponseTime().getRate(60*1000).getLifetimeAverageValue(); - double roundTripRate = Math.max(dbResponseTime, tunnelResponseTime); + long threshold = getEventThreshold(); + boolean tunnelTestOnly = getUseTunnelTestOnly(); - // send and receive rates are the (period rate) * (saturation %) - double sendRate = calcSendRate(profile); - double receiveRate = calcReceiveRate(profile); + long period = 10*60*1000; + long events = getEventCount(profile, period, tunnelTestOnly); + if (events < threshold) { + period = 60*60*1000l; + events = getEventCount(profile, period, tunnelTestOnly); + if (events < threshold) { + period = 24*60*60*1000; + events = getEventCount(profile, period, tunnelTestOnly); + if (events < threshold) { + period = -1; + events = getEventCount(profile, period, tunnelTestOnly); + } + } + } + double measuredRoundTripTime = getMeasuredRoundTripTime(profile, period, tunnelTestOnly); + double measuredRTPerMinute = 0; + if (measuredRoundTripTime > 0) + measuredRTPerMinute = (60000.0d / measuredRoundTripTime); - double val = 60000.0d - 0.1*roundTripRate + sendRate + receiveRate; - // if we don't have any data, the rate is 0 - if ( (roundTripRate == 0.0d) && (sendRate == 0.0d) ) - val = 0.0; + double estimatedRTPerMinute = 0; + double estimatedRoundTripTime = 0; + if (!tunnelTestOnly) { + estimatedRoundTripTime = getEstimatedRoundTripTime(profile, period); + if (estimatedRoundTripTime > 0) + estimatedRTPerMinute = (60000.0d / estimatedRoundTripTime); + } + + double estimateFactor = getEstimateFactor(threshold, events); + double rv = (1-estimateFactor)*measuredRTPerMinute + (estimateFactor)*estimatedRTPerMinute; - if (_log.shouldLog(Log.DEBUG)) - _log.debug("roundTripRate: " + roundTripRate + "ms sendRate: " + sendRate + "bytes/second, receiveRate: " + receiveRate + "bytes/second, val: " + val + " for " + profile.getPeer().toBase64()); - - val += profile.getSpeedBonus(); - return val; + if (_log.shouldLog(Log.DEBUG)) { + _log.debug("\n\nrv: " + rv + " events: " + events + " threshold: " + threshold + " period: " + period + " useTunnelTestOnly? " + tunnelTestOnly + "\n" + + "measuredRTT: " + measuredRoundTripTime + " measured events per minute: " + measuredRTPerMinute + "\n" + + "estimateRTT: " + estimatedRoundTripTime + " estimated events per minute: " + estimatedRTPerMinute + "\n" + + "estimateFactor: " + estimateFactor + "\n" + + "for peer: " + profile.getPeer().toBase64()); + } + + rv += profile.getSpeedBonus(); + return rv; + } + + /** + * How much do we want to prefer the measured values more than the estimated + * values, as a fraction. The value 1 means ignore the measured values, while + * the value 0 means ignore the estimate, and everything inbetween means, well + * everything inbetween. + * + */ + private double getEstimateFactor(long eventThreshold, long numEvents) { + if (numEvents > eventThreshold) + return 0.0d; + else + return numEvents / eventThreshold; } - private double calcSendRate(PeerProfile profile) { return calcRate(profile.getSendSuccessSize()); } - private double calcReceiveRate(PeerProfile profile) { return calcRate(profile.getReceiveSize()); } - - private double calcRate(RateStat stat) { - double rate = 0.0d; - Rate hourRate = stat.getRate(60*60*1000); - rate = calcRate(hourRate); - return rate; + /** + * How many measured events do we have for the given period? If the period is negative, + * return the lifetime events. + * + */ + private long getEventCount(PeerProfile profile, long period, boolean tunnelTestOnly) { + if (period < 0) { + Rate dbResponseRate = profile.getDbResponseTime().getRate(60*60*1000l); + Rate tunnelResponseRate = profile.getTunnelCreateResponseTime().getRate(60*60*1000l); + Rate tunnelTestRate = profile.getTunnelTestResponseTime().getRate(60*60*1000l); + + long dbResponses = tunnelTestOnly ? 0 : dbResponseRate.getLifetimeEventCount(); + long tunnelResponses = tunnelTestOnly ? 0 : tunnelResponseRate.getLifetimeEventCount(); + long tunnelTests = tunnelTestRate.getLifetimeEventCount(); + + return dbResponses + tunnelResponses + tunnelTests; + } else { + Rate dbResponseRate = profile.getDbResponseTime().getRate(period); + Rate tunnelResponseRate = profile.getTunnelCreateResponseTime().getRate(period); + Rate tunnelTestRate = profile.getTunnelTestResponseTime().getRate(period); + + long dbResponses = tunnelTestOnly ? 0 : dbResponseRate.getCurrentEventCount(); + long tunnelResponses = tunnelTestOnly ? 0 : tunnelResponseRate.getCurrentEventCount(); + long tunnelTests = tunnelTestRate.getCurrentEventCount(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("TunnelTests for period " + period + ": " + tunnelTests + + " last: " + tunnelTestRate.getLastEventCount() + " lifetime: " + + tunnelTestRate.getLifetimeEventCount()); + + return dbResponses + tunnelResponses + tunnelTests; + } } - private double calcRate(Rate rate) { - long events = rate.getLastEventCount() + rate.getCurrentEventCount(); + /** + * Retrieve the average measured round trip time within the period specified (including + * db responses, tunnel create responses, and tunnel tests). If the period is negative, + * it uses the lifetime stats. In addition, it weights each of those three measurements + * equally according to their event count (e.g. 4 dbResponses @ 10 seconds and 1 tunnel test + * at 5 seconds will leave the average at 9 seconds) + * + */ + private double getMeasuredRoundTripTime(PeerProfile profile, long period, boolean tunnelTestOnly) { + if (period < 0) { + Rate dbResponseRate = profile.getDbResponseTime().getRate(60*60*1000l); + Rate tunnelResponseRate = profile.getTunnelCreateResponseTime().getRate(60*60*1000l); + Rate tunnelTestRate = profile.getTunnelTestResponseTime().getRate(60*60*1000l); + + long dbResponses = tunnelTestOnly ? 0 : dbResponseRate.getLifetimeEventCount(); + long tunnelResponses = tunnelTestOnly ? 0 : tunnelResponseRate.getLifetimeEventCount(); + long tunnelTests = tunnelTestRate.getLifetimeEventCount(); + + double dbResponseTime = tunnelTestOnly ? 0 : dbResponseRate.getLifetimeAverageValue(); + double tunnelResponseTime = tunnelTestOnly ? 0 : tunnelResponseRate.getLifetimeAverageValue(); + double tunnelTestTime = tunnelTestRate.getLifetimeAverageValue(); + + long events = dbResponses + tunnelResponses + tunnelTests; + if (events <= 0) return 0; + return (dbResponses*dbResponseTime + tunnelResponses*tunnelResponseTime + tunnelTests*tunnelTestTime) + / events; + } else { + Rate dbResponseRate = profile.getDbResponseTime().getRate(period); + Rate tunnelResponseRate = profile.getTunnelCreateResponseTime().getRate(period); + Rate tunnelTestRate = profile.getTunnelTestResponseTime().getRate(period); + + long dbResponses = tunnelTestOnly ? 0 : dbResponseRate.getCurrentEventCount(); + long tunnelResponses = tunnelTestOnly ? 0 : tunnelResponseRate.getCurrentEventCount(); + long tunnelTests = tunnelTestRate.getCurrentEventCount(); + + double dbResponseTime = tunnelTestOnly ? 0 : dbResponseRate.getAverageValue(); + double tunnelResponseTime = tunnelTestOnly ? 0 : tunnelResponseRate.getAverageValue(); + double tunnelTestTime = tunnelTestRate.getAverageValue(); + + long events = dbResponses + tunnelResponses + tunnelTests; + if (events <= 0) return 0; + return (dbResponses*dbResponseTime + tunnelResponses*tunnelResponseTime + tunnelTests*tunnelTestTime) + / events; + } + } + + private double getEstimatedRoundTripTime(PeerProfile profile, long period) { + double estSendTime = getEstimatedSendTime(profile, period); + double estRecvTime = getEstimatedReceiveTime(profile, period); + return estSendTime + estRecvTime; + } + + private double getEstimatedSendTime(PeerProfile profile, long period) { + double bps = calcRate(profile.getSendSuccessSize(), period); + if (bps <= 0) + return 0.0d; + else + return 2048.0d / bps; + } + private double getEstimatedReceiveTime(PeerProfile profile, long period) { + double bps = calcRate(profile.getReceiveSize(), period); + if (bps <= 0) + return 0.0d; + else + return 2048.0d / bps; + } + + private double calcRate(RateStat stat, long period) { + Rate rate = stat.getRate(period); + if (rate == null) return 0.0d; + return calcRate(rate, period); + } + + private double calcRate(Rate rate, long period) { + long events = rate.getCurrentEventCount(); if (events >= 1) { - double ms = rate.getLastTotalEventTime() + rate.getCurrentTotalEventTime(); - double bytes = rate.getLastTotalValue() + rate.getCurrentTotalValue(); + double ms = rate.getCurrentTotalEventTime(); + double bytes = rate.getCurrentTotalValue(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("calculating rate: ms=" + ((int)ms) + " bytes=" + ((int)bytes)); if ( (bytes > 0) && (ms > 0) ) { - return (bytes * 1000.0d) / ms; + if (getUseInstantaneousRates()) { + return (bytes * 1000.0d) / ms; + } else { + // period average + return (bytes * 1000.0d) / period; + } } } return 0.0d; } + /** + * What is the minimum number of measured events we want in a period before + * trusting the values? This first checks the router's configuration, then + * the context, and then finally falls back on a static default (100). + * + */ + private long getEventThreshold() { + if (_context.router() != null) { + String threshold = _context.router().getConfigSetting(PROP_EVENT_THRESHOLD); + if (threshold != null) { + try { + return Long.parseLong(threshold); + } catch (NumberFormatException nfe) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Event threshold for speed improperly set in the router config [" + threshold + "]", nfe); + } + } + } + String threshold = _context.getProperty(PROP_EVENT_THRESHOLD, ""+DEFAULT_EVENT_THRESHOLD); + if (threshold != null) { + try { + return Long.parseLong(threshold); + } catch (NumberFormatException nfe) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Event threshold for speed improperly set in the router environment [" + threshold + "]", nfe); + } + } + return DEFAULT_EVENT_THRESHOLD; + } + + /** + * Should we use instantaneous rates for the estimated speed, or the period rates? + * This first checks the router's configuration, then the context, and then + * finally falls back on a static default (true). + * + * @return true if we should use instantaneous rates, false if we should use period averages + */ + private boolean getUseInstantaneousRates() { + if (_context.router() != null) { + String val = _context.router().getConfigSetting(PROP_USE_INSTANTANEOUS_RATES); + if (val != null) { + try { + return Boolean.getBoolean(val); + } catch (NumberFormatException nfe) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Instantaneous rate for speed improperly set in the router config [" + val + "]", nfe); + } + } + } + String val = _context.getProperty(PROP_USE_INSTANTANEOUS_RATES, ""+DEFAULT_USE_INSTANTANEOUS_RATES); + if (val != null) { + try { + return Boolean.getBoolean(val); + } catch (NumberFormatException nfe) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Instantaneous rate for speed improperly set in the router environment [" + val + "]", nfe); + } + } + return DEFAULT_USE_INSTANTANEOUS_RATES; + } + + /** + * Should we only use the measured tunnel testing time, or should we include + * measurements on the db responses and tunnel create responses. This first + * checks the router's configuration, then the context, and then finally falls + * back on a static default (true). + * + * @return true if we should use tunnel test time only, false if we should use all available + */ + private boolean getUseTunnelTestOnly() { + if (_context.router() != null) { + String val = _context.router().getConfigSetting(PROP_USE_TUNNEL_TEST_ONLY); + if (val != null) { + try { + boolean rv = Boolean.getBoolean(val); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("router config said " + PROP_USE_TUNNEL_TEST_ONLY + '=' + val); + return rv; + } catch (NumberFormatException nfe) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Tunnel test only for speed improperly set in the router config [" + val + "]", nfe); + } + } + } + String val = _context.getProperty(PROP_USE_TUNNEL_TEST_ONLY, ""+DEFAULT_USE_TUNNEL_TEST_ONLY); + if (val != null) { + try { + boolean rv = Boolean.getBoolean(val); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("router context said " + PROP_USE_TUNNEL_TEST_ONLY + '=' + val); + } catch (NumberFormatException nfe) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Tunnel test only for speed improperly set in the router environment [" + val + "]", nfe); + } + } + + if (_log.shouldLog(Log.DEBUG)) + _log.debug("no config for " + PROP_USE_TUNNEL_TEST_ONLY + ", using " + DEFAULT_USE_TUNNEL_TEST_ONLY); + return DEFAULT_USE_TUNNEL_TEST_ONLY; + } } diff --git a/router/java/src/net/i2p/router/tunnelmanager/TestTunnelJob.java b/router/java/src/net/i2p/router/tunnelmanager/TestTunnelJob.java index c4429550da..6917c54845 100644 --- a/router/java/src/net/i2p/router/tunnelmanager/TestTunnelJob.java +++ b/router/java/src/net/i2p/router/tunnelmanager/TestTunnelJob.java @@ -37,26 +37,33 @@ import net.i2p.router.RouterContext; class TestTunnelJob extends JobImpl { private Log _log; - private TunnelId _id; + /** tunnel that we want to test */ + private TunnelId _primaryId; + /** tunnel that is used to help test the primary id */ + private TunnelId _secondaryId; private TunnelPool _pool; private long _nonce; public TestTunnelJob(RouterContext ctx, TunnelId id, TunnelPool pool) { super(ctx); _log = ctx.logManager().getLog(TestTunnelJob.class); - _id = id; + _primaryId = id; _pool = pool; _nonce = ctx.random().nextInt(Integer.MAX_VALUE); } public String getName() { return "Test Tunnel"; } public void runJob() { - if (_log.shouldLog(Log.INFO)) - _log.info("Testing tunnel " + _id.getTunnelId()); - TunnelInfo info = _pool.getTunnelInfo(_id); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Testing tunnel " + _primaryId.getTunnelId()); + TunnelInfo info = _pool.getTunnelInfo(_primaryId); if (info == null) { - _log.error("wtf, why are we testing a tunnel that we do not know about? [" + _id.getTunnelId() + "]", getAddedBy()); + _log.error("wtf, why are we testing a tunnel that we do not know about? [" + + _primaryId.getTunnelId() + "]", getAddedBy()); return; } + + // mark it as something we're testing + info.setLastTested(_context.clock().now()); if (isOutbound(info)) { testOutbound(info); } else { @@ -75,7 +82,7 @@ class TestTunnelJob extends JobImpl { return false; } - private final static long TEST_TIMEOUT = 60*1000; // 60 seconds for a test to succeed + private final static long TEST_TIMEOUT = 30*1000; // 30 seconds for a test to succeed private final static int TEST_PRIORITY = 100; /** @@ -83,22 +90,51 @@ class TestTunnelJob extends JobImpl { * to ourselves and wait for it to arrive. */ private void testOutbound(TunnelInfo info) { - if (_log.shouldLog(Log.INFO)) - _log.info("Testing outbound tunnel " + info); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Testing outbound tunnel " + info); DeliveryStatusMessage msg = new DeliveryStatusMessage(_context); msg.setArrival(new Date(_context.clock().now())); msg.setMessageId(_nonce); Hash us = _context.routerHash(); - TunnelId inboundTunnelId = getReplyTunnel(); - if (inboundTunnelId == null) { + _secondaryId = getReplyTunnel(); + if (_secondaryId == null) { + _context.jobQueue().addJob(new TestFailedJob()); return; } + TunnelInfo inboundInfo = _pool.getTunnelInfo(_secondaryId); + inboundInfo.setLastTested(_context.clock().now()); + TestFailedJob failureJob = new TestFailedJob(); MessageSelector selector = new TestMessageSelector(msg.getMessageId(), info.getTunnelId().getTunnelId()); - SendTunnelMessageJob testJob = new SendTunnelMessageJob(_context, msg, info.getTunnelId(), us, inboundTunnelId, null, new TestSuccessfulJob(), failureJob, selector, TEST_TIMEOUT, TEST_PRIORITY); + SendTunnelMessageJob testJob = new SendTunnelMessageJob(_context, msg, info.getTunnelId(), us, _secondaryId, null, new TestSuccessfulJob(), failureJob, selector, TEST_TIMEOUT, TEST_PRIORITY); _context.jobQueue().addJob(testJob); } + + /** + * Send a message to the gateway and wait for it to arrive. + */ + private void testInbound(TunnelInfo info) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Testing inbound tunnel " + info); + DeliveryStatusMessage msg = new DeliveryStatusMessage(_context); + msg.setArrival(new Date(_context.clock().now())); + msg.setMessageId(_nonce); + + _secondaryId = getOutboundTunnel(); + if (_secondaryId == null) { + _context.jobQueue().addJob(new TestFailedJob()); + return; + } + + TunnelInfo outboundInfo = _pool.getTunnelInfo(_secondaryId); + outboundInfo.setLastTested(_context.clock().now()); + + TestFailedJob failureJob = new TestFailedJob(); + MessageSelector selector = new TestMessageSelector(msg.getMessageId(), info.getTunnelId().getTunnelId()); + SendTunnelMessageJob j = new SendTunnelMessageJob(_context, msg, _secondaryId, info.getThisHop(), info.getTunnelId(), null, new TestSuccessfulJob(), failureJob, selector, _context.clock().now()+TEST_TIMEOUT, TEST_PRIORITY); + _context.jobQueue().addJob(j); + } /** * Get the tunnel for replies to be sent down when testing outbound tunnels @@ -116,7 +152,7 @@ class TestTunnelJob extends JobImpl { for (int i = 0; i < tunnelIds.size(); i++) { TunnelId id = (TunnelId)tunnelIds.get(i); - if (id.equals(_id)) { + if (id.equals(_primaryId)) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Not testing a tunnel with itself [duh]"); } else { @@ -124,39 +160,36 @@ class TestTunnelJob extends JobImpl { } } - _log.error("Unable to test tunnel " + _id + ", since there are NO OTHER INBOUND TUNNELS to receive the ack through"); + _log.error("Unable to test tunnel " + _primaryId + ", since there are NO OTHER INBOUND TUNNELS to receive the ack through"); return null; } /** - * Send a message to the gateway and wait for it to arrive. - * todo: send the message to the gateway via an outbound tunnel or garlic, NOT DIRECT. + * Get the tunnel to send thte message out when testing inbound tunnels + * */ - private void testInbound(TunnelInfo info) { - if (_log.shouldLog(Log.INFO)) - _log.info("Testing inbound tunnel " + info); - DeliveryStatusMessage msg = new DeliveryStatusMessage(_context); - msg.setArrival(new Date(_context.clock().now())); - msg.setMessageId(_nonce); - TestFailedJob failureJob = new TestFailedJob(); - MessageSelector selector = new TestMessageSelector(msg.getMessageId(), info.getTunnelId().getTunnelId()); - TunnelMessage tmsg = new TunnelMessage(_context); - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); - msg.writeBytes(baos); - tmsg.setData(baos.toByteArray()); - tmsg.setTunnelId(info.getTunnelId()); - _context.jobQueue().addJob(new SendMessageDirectJob(_context, tmsg, info.getThisHop(), new TestSuccessfulJob(), failureJob, selector, _context.clock().now() + TEST_TIMEOUT, TEST_PRIORITY)); - - String bodyType = msg.getClass().getName(); - _context.messageHistory().wrap(bodyType, msg.getUniqueId(), TunnelMessage.class.getName(), tmsg.getUniqueId()); - } catch (IOException ioe) { - _log.error("Error writing out the tunnel message to send to the tunnel", ioe); - _pool.tunnelFailed(_id); - } catch (DataFormatException dfe) { - _log.error("Error writing out the tunnel message to send to the tunnel", dfe); - _pool.tunnelFailed(_id); + private TunnelId getOutboundTunnel() { + TunnelSelectionCriteria crit = new TunnelSelectionCriteria(); + crit.setMinimumTunnelsRequired(2); + crit.setMaximumTunnelsRequired(2); + // arbitrary priorities + crit.setAnonymityPriority(50); + crit.setLatencyPriority(50); + crit.setReliabilityPriority(50); + List tunnelIds = _context.tunnelManager().selectOutboundTunnelIds(crit); + + for (int i = 0; i < tunnelIds.size(); i++) { + TunnelId id = (TunnelId)tunnelIds.get(i); + if (id.equals(_primaryId)) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Not testing a tunnel with itself [duh]"); + } else { + return id; + } } + + _log.error("Unable to test tunnel " + _primaryId + ", since there are NO OTHER OUTBOUND TUNNELS to send the ack through"); + return null; } private class TestFailedJob extends JobImpl { @@ -167,8 +200,17 @@ class TestTunnelJob extends JobImpl { public String getName() { return "Tunnel Test Failed"; } public void runJob() { if (_log.shouldLog(Log.WARN)) - _log.warn("Test of tunnel " + _id.getTunnelId() + " failed while waiting for nonce " + _nonce, getAddedBy()); - _pool.tunnelFailed(_id); + _log.warn("Test of tunnel " + _primaryId.getTunnelId() + + " failed while waiting for nonce " + _nonce + ": " + + _pool.getTunnelInfo(_primaryId), getAddedBy()); + _pool.tunnelFailed(_primaryId); + if (_secondaryId != null) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Secondary test of tunnel " + _secondaryId.getTunnelId() + + " failed while waiting for nonce " + _nonce + ": " + + _pool.getTunnelInfo(_secondaryId), getAddedBy()); + _pool.tunnelFailed(_secondaryId); + } } } @@ -183,10 +225,30 @@ class TestTunnelJob extends JobImpl { public void runJob() { long time = (_context.clock().now() - _msg.getArrival().getTime()); if (_log.shouldLog(Log.INFO)) - _log.info("Test of tunnel " + _id+ " successfull after " + time + "ms waiting for " + _nonce); - TunnelInfo info = _pool.getTunnelInfo(_id); - if (info != null) + _log.info("Test of tunnel " + _primaryId+ " successfull after " + + time + "ms waiting for " + _nonce); + TunnelInfo info = _pool.getTunnelInfo(_primaryId); + if (info != null) { TestTunnelJob.this._context.messageHistory().tunnelValid(info, time); + updateProfiles(info, time); + } + + info = _pool.getTunnelInfo(_secondaryId); + if (info != null) { + TestTunnelJob.this._context.messageHistory().tunnelValid(info, time); + updateProfiles(info, time); + } + _context.statManager().addRateData("tunnel.testSuccessTime", time, time); + } + + private void updateProfiles(TunnelInfo info, long time) { + TunnelInfo cur = info; + while (cur != null) { + Hash peer = cur.getThisHop(); + if ( (peer != null) && (!_context.routerHash().equals(peer)) ) + _context.profileManager().tunnelTestSucceeded(peer, time); + cur = cur.getNextHopInfo(); + } } public void setMessage(I2NPMessage message) { @@ -205,15 +267,16 @@ class TestTunnelJob extends JobImpl { _found = false; _expiration = _context.clock().now() + TEST_TIMEOUT; if (_log.shouldLog(Log.DEBUG)) - _log.debug("the expiration while testing tunnel " + tunnelId + " waiting for nonce " + id + ": " + new Date(_expiration)); + _log.debug("the expiration while testing tunnel " + tunnelId + + " waiting for nonce " + id + ": " + new Date(_expiration)); } public boolean continueMatching() { if (!_found) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Continue matching while looking for nonce for tunnel " + _tunnelId); } else { - if (_log.shouldLog(Log.INFO)) - _log.info("Don't continue matching for tunnel " + _tunnelId + " / " + _id); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Don't continue matching for tunnel " + _tunnelId + " / " + _id); } return !_found; } @@ -229,12 +292,15 @@ class TestTunnelJob extends JobImpl { DeliveryStatusMessage msg = (DeliveryStatusMessage)message; if (msg.getMessageId() == _id) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Found successful test of tunnel " + _tunnelId + " after " + (_context.clock().now() - msg.getArrival().getTime()) + "ms waiting for " + _id); + _log.debug("Found successful test of tunnel " + _tunnelId + " after " + + (_context.clock().now() - msg.getArrival().getTime()) + + "ms waiting for " + _id); _found = true; return true; } else { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Found a delivery status message, but it contains nonce " + msg.getMessageId() + " and not " + _id); + _log.debug("Found a delivery status message, but it contains nonce " + + msg.getMessageId() + " and not " + _id); } } else { //_log.debug("Not a match while looking to test tunnel " + _tunnelId + " with nonce " + _id + " (" + message + ")"); @@ -244,7 +310,8 @@ class TestTunnelJob extends JobImpl { public String toString() { StringBuffer buf = new StringBuffer(256); buf.append(super.toString()); - buf.append(": TestMessageSelector: tunnel ").append(_tunnelId).append(" looking for ").append(_id).append(" expiring on "); + buf.append(": TestMessageSelector: tunnel ").append(_tunnelId); + buf.append(" looking for ").append(_id).append(" expiring on "); buf.append(new Date(_expiration)); return buf.toString(); } diff --git a/router/java/src/net/i2p/router/tunnelmanager/TunnelTestManager.java b/router/java/src/net/i2p/router/tunnelmanager/TunnelTestManager.java index 5160b0e8ff..8ba2865f0f 100644 --- a/router/java/src/net/i2p/router/tunnelmanager/TunnelTestManager.java +++ b/router/java/src/net/i2p/router/tunnelmanager/TunnelTestManager.java @@ -32,16 +32,13 @@ class TunnelTestManager { private TunnelPool _pool; private boolean _stopTesting; - private final static long MINIMUM_RETEST_DELAY = 60*1000; // dont test tunnels more than once every 30 seconds - - /** avg # tests per tunnel lifetime that we want */ - private final static int TESTS_PER_DURATION = 2; - /** how many times we'll be able to try the tests (this should take into consideration user prefs, but fsck it for now) */ - private final static int CHANCES_PER_DURATION = 8; + /** dont test any particular tunnel more than once a minute */ + private final static long MINIMUM_RETEST_DELAY = 60*1000; public TunnelTestManager(RouterContext ctx, TunnelPool pool) { _context = ctx; _log = ctx.logManager().getLog(TunnelTestManager.class); + ctx.statManager().createRateStat("tunnel.testSuccessTime", "How long do successful tunnel tests take?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l }); _pool = pool; _stopTesting = false; _context.jobQueue().addJob(new CoordinateTunnelTestingJob()); @@ -61,18 +58,28 @@ class TunnelTestManager { // skip not ready tunnels } else if (info.getSettings().getExpiration() < now + MINIMUM_RETEST_DELAY) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Tunnel " + id.getTunnelId() + " will be expiring within the current period (" + new Date(info.getSettings().getExpiration()) + "), so skip testing it"); + _log.debug("Tunnel " + id.getTunnelId() + + " will be expiring within the current period (" + + new Date(info.getSettings().getExpiration()) + + "), so skip testing it"); } else if (info.getSettings().getCreated() + MINIMUM_RETEST_DELAY < now) { - double probability = TESTS_PER_DURATION / (allIds.size() * CHANCES_PER_DURATION); - if (_context.random().nextInt(10) <= (probability*10d)) { - toTest.add(id); - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Tunnel " + id.getTunnelId() + " could be tested, but probabilistically isn't going to be"); + // we're past the initial buffer period + if (info.getLastTested() + MINIMUM_RETEST_DELAY < now) { + // we haven't tested this tunnel in the minimum delay, so maybe we + // should. + if (_context.random().nextBoolean()) { + toTest.add(id); + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("We could have tested tunnel " + id.getTunnelId() + + ", but randomly decided not to."); + } } } else { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Tunnel " + id.getTunnelId() + " was just created (" + new Date(info.getSettings().getCreated()) + "), wait until the next pass to test it"); + _log.debug("Tunnel " + id.getTunnelId() + " was just created (" + + new Date(info.getSettings().getCreated()) + + "), wait until the next pass to test it"); } } else { if (_log.shouldLog(Log.WARN)) @@ -112,11 +119,8 @@ class TunnelTestManager { } private void reschedule() { - long minNext = TunnelTestManager.this._context.clock().now() + MINIMUM_RETEST_DELAY; - long nxt = minNext + TunnelTestManager.this._context.random().nextInt(60*1000); // test tunnels once every 30-90 seconds + long nxt = TunnelTestManager.this._context.clock().now() + 30*1000; getTiming().setStartAfter(nxt); - if (_log.shouldLog(Log.INFO)) - _log.info("Rescheduling tunnel tests for " + new Date(nxt)); TunnelTestManager.this._context.jobQueue().addJob(CoordinateTunnelTestingJob.this); } }