Lots of updates. I'm not calling this 0.3.1.2, still need to

"burn it it" some more, but its looking good.
* test all tunnels we manage every period or two. later we'll want to include some randomization to help fight traffic analysis, but that falls into the i2p 3.0 tunnel chaff / mixing / etc)
* test inbound tunnels correctly (use an outbound tunnel, not direct)
* only give the tunnels 30 seconds to succeed
* mark the tunnel as tested for both the inbound and outbound side and adjust the profiles for all participants accordingly
* keep track of the 'last test time' on a tunnel
* new tunnel test response time profile stat, as well as overall router stat (published in the netDb as "tunnel.testSuccessTime")
* rewrite of the speed calculator - the value it generates now is essentially "how many round trip messages can this router pass in a minute".
  it also allows a few runtime configurable options:
  = speedCalculator.eventThreshold:
    we use the smallest measurement period that has at least this many events in it (10m, 60m, 24h, lifetime)
  = speedCalculator.useInstantaneousRates:
    when we use the estimated round trip time, do we use instantaneous or period averages?
  = speedCalculator.useTunnelTestOnly:
    do we only use the tunnel test time (no db response or tunnel create time, or even estimated round trip times)?
* fix the reliability calculator to use the 10 minute tunnel create successes, not the (almost always 0) 1 minute rate.
* persist the tunnel create response time and send success time correctly (duh)
* add a new main() to PeerProfile - PeerProfile [filename]* will calculate the values of the peer profiles specified.  useful for tweaking the calculator, and/or the configurable options.  ala:
     java -DspeedCalculator.useInstantaneousRates peerProfiles/profile-*.dat
This commit is contained in:
jrandom
2004-05-13 04:32:26 +00:00
committed by zzz
parent c7895ed905
commit ad9dd9a2e2
10 changed files with 521 additions and 116 deletions

View File

@ -50,6 +50,13 @@ public interface ProfileManager {
*/
void tunnelRejected(Hash peer, long responseTimeMs);
/**
* Note that a tunnel that the router is participating in
* was successfully tested with the given round trip latency
*
*/
void tunnelTestSucceeded(Hash peer, long responseTimeMs);
/**
* Note that the peer participated in a tunnel that failed. Its failure may not have
* been the peer's fault however.

View File

@ -104,6 +104,7 @@ public class StatisticsManager implements Service {
includeRate("jobQueue.droppedJobs", stats, new long[] { 60*60*1000, 24*60*60*1000 });
includeRate("inNetPool.dropped", stats, new long[] { 60*60*1000, 24*60*60*1000 });
includeRate("tunnel.participatingTunnels", stats, new long[] { 5*60*1000, 60*60*1000 });
includeRate("tunnel.testSuccessTime", stats, new long[] { 60*60*1000l, 24*60*60*1000l });
includeRate("netDb.lookupsReceived", stats, new long[] { 5*60*1000, 60*60*1000 });
includeRate("netDb.lookupsHandled", stats, new long[] { 5*60*1000, 60*60*1000 });
includeRate("netDb.lookupsMatched", stats, new long[] { 5*60*1000, 60*60*1000 });

View File

@ -48,6 +48,7 @@ public class TunnelInfo extends DataStructureImpl {
private Properties _options;
private TunnelSettings _settings;
private long _created;
private long _lastTested;
private boolean _ready;
private boolean _wasEverReady;
@ -67,6 +68,7 @@ public class TunnelInfo extends DataStructureImpl {
_ready = false;
_wasEverReady = false;
_created = _context.clock().now();
_lastTested = -1;
}
public TunnelId getTunnelId() { return _id; }
@ -142,6 +144,10 @@ public class TunnelInfo extends DataStructureImpl {
public long getCreated() { return _created; }
/** when was the peer last tested (or -1 if never)? */
public long getLastTested() { return _lastTested; }
public void setLastTested(long when) { _lastTested = when; }
/**
* Number of hops left in the tunnel (including this one)
*

View File

@ -4,8 +4,9 @@ import net.i2p.data.Hash;
import net.i2p.stat.RateStat;
import net.i2p.util.Log;
import net.i2p.router.RouterContext;
import java.io.File;
class PeerProfile {
public class PeerProfile {
private Log _log;
private RouterContext _context;
// whoozaat?
@ -22,6 +23,7 @@ class PeerProfile {
private RateStat _receiveSize = null;
private RateStat _dbResponseTime = null;
private RateStat _tunnelCreateResponseTime = null;
private RateStat _tunnelTestResponseTime = null;
private RateStat _commError = null;
private RateStat _dbIntroduction = null;
// calculation bonuses
@ -63,7 +65,7 @@ class PeerProfile {
public void setPeer(Hash peer) { _peer = peer; }
/**
* are we keeping an expanded profile on the peer, or just the bare minimum?
* are we keeping an expanded profile on the peer, or just the bare minimum.
* If we aren't keeping the expanded profile, all of the rates as well as the
* TunnelHistory and DBHistory will not be available.
*
@ -123,7 +125,9 @@ class PeerProfile {
public RateStat getDbResponseTime() { return _dbResponseTime; }
/** how long it takes to get a tunnel create response from the peer (in milliseconds), calculated over a 1 minute, 1 hour, and 1 day period */
public RateStat getTunnelCreateResponseTime() { return _tunnelCreateResponseTime; }
/** how long between communication errors with the peer (e.g. disconnection), calculated over a 1 minute, 1 hour, and 1 day period */
/** how long it takes to successfully test a tunnel this peer participates in (in milliseconds), calculated over a 10 minute, 1 hour, and 1 day period */
public RateStat getTunnelTestResponseTime() { return _tunnelTestResponseTime; }
/** how long between communication errors with the peer (disconnection, etc), calculated over a 1 minute, 1 hour, and 1 day period */
public RateStat getCommError() { return _commError; }
/** how many new peers we get from dbSearchReplyMessages or dbStore messages, calculated over a 1 hour, 1 day, and 1 week period */
public RateStat getDbIntroduction() { return _dbIntroduction; }
@ -160,7 +164,7 @@ class PeerProfile {
*/
public double getSpeedValue() { return _speedValue; }
/**
* How likely are they to stay up and pass on messages over the next few minutes?
* How likely are they to stay up and pass on messages over the next few minutes.
* Positive numbers means more likely, negative numbers means its probably not
* even worth trying.
*
@ -189,6 +193,7 @@ class PeerProfile {
_receiveSize = null;
_dbResponseTime = null;
_tunnelCreateResponseTime = null;
_tunnelTestResponseTime = null;
_commError = null;
_dbIntroduction = null;
_tunnelHistory = null;
@ -212,9 +217,11 @@ class PeerProfile {
if (_receiveSize == null)
_receiveSize = new RateStat("receiveSize", "How large received messages are", "profile", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l, 24*60*60*1000 } );
if (_dbResponseTime == null)
_dbResponseTime = new RateStat("dbResponseTime", "how long it takes to get a db response from the peer (in milliseconds)", "profile", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000 } );
_dbResponseTime = new RateStat("dbResponseTime", "how long it takes to get a db response from the peer (in milliseconds)", "profile", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000 } );
if (_tunnelCreateResponseTime == null)
_tunnelCreateResponseTime = new RateStat("tunnelCreateResponseTime", "how long it takes to get a tunnel create response from the peer (in milliseconds)", "profile", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000 } );
_tunnelCreateResponseTime = new RateStat("tunnelCreateResponseTime", "how long it takes to get a tunnel create response from the peer (in milliseconds)", "profile", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000 } );
if (_tunnelTestResponseTime == null)
_tunnelTestResponseTime = new RateStat("tunnelTestResponseTime", "how long it takes to successfully test a tunnel this peer participates in (in milliseconds)", "profile", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000 } );
if (_commError == null)
_commError = new RateStat("commErrorRate", "how long between communication errors with the peer (e.g. disconnection)", "profile", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000 } );
if (_dbIntroduction == null)
@ -238,6 +245,7 @@ class PeerProfile {
_sendFailureSize.coallesceStats();
_sendSuccessSize.coallesceStats();
_tunnelCreateResponseTime.coallesceStats();
_tunnelTestResponseTime.coallesceStats();
_dbHistory.coallesceStats();
_speedValue = calculateSpeed();
@ -270,7 +278,7 @@ class PeerProfile {
* for an expanded profile, and ~212 bytes for a compacted one.
*
*/
public static void main(String args[]) {
public static void main2(String args[]) {
RouterContext ctx = new RouterContext(null);
testProfileSize(ctx, 100, 0); // 560KB
testProfileSize(ctx, 1000, 0); // 3.9MB
@ -280,6 +288,36 @@ class PeerProfile {
testProfileSize(ctx, 0, 300000); // 63MB
}
/**
* Read in all of the profiles specified and print out
* their calculated values. Usage: <pre>
* PeerProfile [filename]*
* </pre>
*/
public static void main(String args[]) {
RouterContext ctx = new RouterContext(new net.i2p.router.Router());
ProfilePersistenceHelper helper = new ProfilePersistenceHelper(ctx);
try { Thread.sleep(5*1000); } catch (InterruptedException e) {}
StringBuffer buf = new StringBuffer(1024);
for (int i = 0; i < args.length; i++) {
PeerProfile profile = helper.readProfile(new File(args[i]));
if (profile == null) {
buf.append("Could not load profile ").append(args[i]).append('\n');
continue;
}
//profile.coallesceStats();
buf.append("Peer " + profile.getPeer().toBase64()
+ ":\t Speed:\t" + profile.calculateSpeed()
+ " Reliability:\t" + profile.calculateReliability()
+ " Integration:\t" + profile.calculateIntegration()
+ " Active?\t" + profile.getIsActive()
+ " Failing?\t" + profile.calculateIsFailing()
+ '\n');
}
try { Thread.sleep(5*1000); } catch (InterruptedException e) {}
System.out.println(buf.toString());
}
private static void testProfileSize(RouterContext ctx, int numExpanded, int numCompact) {
Runtime.getRuntime().gc();
PeerProfile profs[] = new PeerProfile[numExpanded];

View File

@ -101,6 +101,17 @@ public class ProfileManagerImpl implements ProfileManager {
data.getTunnelHistory().incrementRejected();
}
/**
* Note that a tunnel that the router is participating in
* was successfully tested with the given round trip latency
*
*/
public void tunnelTestSucceeded(Hash peer, long responseTimeMs) {
PeerProfile data = getProfile(peer);
if (data == null) return;
data.getTunnelTestResponseTime().addData(responseTimeMs, responseTimeMs);
}
/**
* Note that the peer participated in a tunnel that failed. Its failure may not have
* been the peer's fault however.

View File

@ -123,7 +123,9 @@ class ProfilePersistenceHelper {
profile.getDbResponseTime().store(out, "dbResponseTime");
profile.getReceiveSize().store(out, "receiveSize");
profile.getSendFailureSize().store(out, "sendFailureSize");
profile.getSendSuccessSize().store(out, "tunnelCreateResponseTime");
profile.getSendSuccessSize().store(out, "sendSuccessSize");
profile.getTunnelCreateResponseTime().store(out, "tunnelCreateResponseTime");
profile.getTunnelTestResponseTime().store(out, "tunnelTestResponseTime");
}
}
@ -154,10 +156,13 @@ class ProfilePersistenceHelper {
rv.add(files[i]);
return rv;
}
private PeerProfile readProfile(File file) {
public PeerProfile readProfile(File file) {
Hash peer = getHash(file.getName());
try {
if (peer == null) return null;
if (peer == null) {
_log.error("The file " + file.getName() + " is not a valid hash");
return null;
}
PeerProfile profile = new PeerProfile(_context, peer);
Properties props = new Properties();
@ -181,7 +186,9 @@ class ProfilePersistenceHelper {
profile.getDbResponseTime().load(props, "dbResponseTime", true);
profile.getReceiveSize().load(props, "receiveSize", true);
profile.getSendFailureSize().load(props, "sendFailureSize", true);
profile.getSendSuccessSize().load(props, "tunnelCreateResponseTime", true);
profile.getSendSuccessSize().load(props, "sendSuccessSize", true);
profile.getTunnelCreateResponseTime().load(props, "tunnelCreateResponseTime", true);
profile.getTunnelTestResponseTime().load(props, "tunnelTestResponseTime", true);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Loaded the profile for " + peer.toBase64() + " from " + file.getName());
@ -222,7 +229,7 @@ class ProfilePersistenceHelper {
props.setProperty(key, val);
}
} catch (IOException ioe) {
_log.error("Error loading properties from " + file.getName(), ioe);
_log.warn("Error loading properties from " + file.getName(), ioe);
} finally {
if (in != null) try { in.close(); } catch (IOException ioe) {}
}
@ -237,6 +244,7 @@ class ProfilePersistenceHelper {
h.fromBase64(key);
return h;
} catch (DataFormatException dfe) {
_log.warn("Invalid base64 [" + key + "]", dfe);
return null;
}
}
@ -247,10 +255,16 @@ class ProfilePersistenceHelper {
private File getProfileDir() {
if (_profileDir == null) {
String dir = _context.router().getConfigSetting(PROP_PEER_PROFILE_DIR);
if (dir == null) {
_log.info("No peer profile dir specified [" + PROP_PEER_PROFILE_DIR + "], using [" + DEFAULT_PEER_PROFILE_DIR + "]");
dir = DEFAULT_PEER_PROFILE_DIR;
String dir = null;
if (_context.router() == null) {
dir = _context.getProperty(PROP_PEER_PROFILE_DIR, DEFAULT_PEER_PROFILE_DIR);
} else {
dir = _context.router().getConfigSetting(PROP_PEER_PROFILE_DIR);
if (dir == null) {
_log.info("No peer profile dir specified [" + PROP_PEER_PROFILE_DIR
+ "], using [" + DEFAULT_PEER_PROFILE_DIR + "]");
dir = DEFAULT_PEER_PROFILE_DIR;
}
}
_profileDir = new File(dir);
}

View File

@ -29,8 +29,7 @@ public class ReliabilityCalculator extends Calculator {
val += profile.getSendSuccessSize().getRate(60*60*1000).getLastEventCount();
val += profile.getSendSuccessSize().getRate(60*60*1000).getCurrentEventCount();
val += profile.getTunnelCreateResponseTime().getRate(60*1000).getCurrentEventCount() * 10;
val += profile.getTunnelCreateResponseTime().getRate(60*1000).getLastEventCount() * 5;
val += profile.getTunnelCreateResponseTime().getRate(10*60*1000).getLastEventCount() * 5;
val += profile.getTunnelCreateResponseTime().getRate(60*60*1000).getCurrentEventCount();
val += profile.getTunnelCreateResponseTime().getRate(60*60*1000).getLastEventCount();

View File

@ -8,60 +8,318 @@ import net.i2p.router.RouterContext;
/**
* Quantify how fast the peer is - how fast they respond to our requests, how fast
* they pass messages on, etc. This should be affected both by their bandwidth/latency,
* as well as their load.
* as well as their load. The essence of the current algorithm is to determine
* approximately how many 2KB messages the peer can pass round trip within a single
* minute - not based just on itself though, but including the delays of other peers
* in the tunnels. As such, more events make it more accurate.
*
*/
public class SpeedCalculator extends Calculator {
private Log _log;
private RouterContext _context;
/**
* minimum number of events to use a particular period's data. If this many
* events haven't occurred in the period yet, the next largest period is tried.
*/
public static final String PROP_EVENT_THRESHOLD = "speedCalculator.eventThreshold";
public static final int DEFAULT_EVENT_THRESHOLD = 50;
/** should the calculator use instantaneous rates, or period averages? */
public static final String PROP_USE_INSTANTANEOUS_RATES = "speedCalculator.useInstantaneousRates";
public static final boolean DEFAULT_USE_INSTANTANEOUS_RATES = false;
/** should the calculator use tunnel test time only, or include all data? */
public static final String PROP_USE_TUNNEL_TEST_ONLY = "speedCalculator.useTunnelTestOnly";
public static final boolean DEFAULT_USE_TUNNEL_TEST_ONLY = true;
public SpeedCalculator(RouterContext context) {
_context = context;
_log = context.logManager().getLog(SpeedCalculator.class);
}
public double calc(PeerProfile profile) {
double dbResponseTime = profile.getDbResponseTime().getRate(60*1000).getLifetimeAverageValue();
double tunnelResponseTime = profile.getTunnelCreateResponseTime().getRate(60*1000).getLifetimeAverageValue();
double roundTripRate = Math.max(dbResponseTime, tunnelResponseTime);
long threshold = getEventThreshold();
boolean tunnelTestOnly = getUseTunnelTestOnly();
// send and receive rates are the (period rate) * (saturation %)
double sendRate = calcSendRate(profile);
double receiveRate = calcReceiveRate(profile);
long period = 10*60*1000;
long events = getEventCount(profile, period, tunnelTestOnly);
if (events < threshold) {
period = 60*60*1000l;
events = getEventCount(profile, period, tunnelTestOnly);
if (events < threshold) {
period = 24*60*60*1000;
events = getEventCount(profile, period, tunnelTestOnly);
if (events < threshold) {
period = -1;
events = getEventCount(profile, period, tunnelTestOnly);
}
}
}
double measuredRoundTripTime = getMeasuredRoundTripTime(profile, period, tunnelTestOnly);
double measuredRTPerMinute = 0;
if (measuredRoundTripTime > 0)
measuredRTPerMinute = (60000.0d / measuredRoundTripTime);
double val = 60000.0d - 0.1*roundTripRate + sendRate + receiveRate;
// if we don't have any data, the rate is 0
if ( (roundTripRate == 0.0d) && (sendRate == 0.0d) )
val = 0.0;
double estimatedRTPerMinute = 0;
double estimatedRoundTripTime = 0;
if (!tunnelTestOnly) {
estimatedRoundTripTime = getEstimatedRoundTripTime(profile, period);
if (estimatedRoundTripTime > 0)
estimatedRTPerMinute = (60000.0d / estimatedRoundTripTime);
}
double estimateFactor = getEstimateFactor(threshold, events);
double rv = (1-estimateFactor)*measuredRTPerMinute + (estimateFactor)*estimatedRTPerMinute;
if (_log.shouldLog(Log.DEBUG))
_log.debug("roundTripRate: " + roundTripRate + "ms sendRate: " + sendRate + "bytes/second, receiveRate: " + receiveRate + "bytes/second, val: " + val + " for " + profile.getPeer().toBase64());
val += profile.getSpeedBonus();
return val;
if (_log.shouldLog(Log.DEBUG)) {
_log.debug("\n\nrv: " + rv + " events: " + events + " threshold: " + threshold + " period: " + period + " useTunnelTestOnly? " + tunnelTestOnly + "\n"
+ "measuredRTT: " + measuredRoundTripTime + " measured events per minute: " + measuredRTPerMinute + "\n"
+ "estimateRTT: " + estimatedRoundTripTime + " estimated events per minute: " + estimatedRTPerMinute + "\n"
+ "estimateFactor: " + estimateFactor + "\n"
+ "for peer: " + profile.getPeer().toBase64());
}
rv += profile.getSpeedBonus();
return rv;
}
/**
* How much do we want to prefer the measured values more than the estimated
* values, as a fraction. The value 1 means ignore the measured values, while
* the value 0 means ignore the estimate, and everything inbetween means, well
* everything inbetween.
*
*/
private double getEstimateFactor(long eventThreshold, long numEvents) {
if (numEvents > eventThreshold)
return 0.0d;
else
return numEvents / eventThreshold;
}
private double calcSendRate(PeerProfile profile) { return calcRate(profile.getSendSuccessSize()); }
private double calcReceiveRate(PeerProfile profile) { return calcRate(profile.getReceiveSize()); }
private double calcRate(RateStat stat) {
double rate = 0.0d;
Rate hourRate = stat.getRate(60*60*1000);
rate = calcRate(hourRate);
return rate;
/**
* How many measured events do we have for the given period? If the period is negative,
* return the lifetime events.
*
*/
private long getEventCount(PeerProfile profile, long period, boolean tunnelTestOnly) {
if (period < 0) {
Rate dbResponseRate = profile.getDbResponseTime().getRate(60*60*1000l);
Rate tunnelResponseRate = profile.getTunnelCreateResponseTime().getRate(60*60*1000l);
Rate tunnelTestRate = profile.getTunnelTestResponseTime().getRate(60*60*1000l);
long dbResponses = tunnelTestOnly ? 0 : dbResponseRate.getLifetimeEventCount();
long tunnelResponses = tunnelTestOnly ? 0 : tunnelResponseRate.getLifetimeEventCount();
long tunnelTests = tunnelTestRate.getLifetimeEventCount();
return dbResponses + tunnelResponses + tunnelTests;
} else {
Rate dbResponseRate = profile.getDbResponseTime().getRate(period);
Rate tunnelResponseRate = profile.getTunnelCreateResponseTime().getRate(period);
Rate tunnelTestRate = profile.getTunnelTestResponseTime().getRate(period);
long dbResponses = tunnelTestOnly ? 0 : dbResponseRate.getCurrentEventCount();
long tunnelResponses = tunnelTestOnly ? 0 : tunnelResponseRate.getCurrentEventCount();
long tunnelTests = tunnelTestRate.getCurrentEventCount();
if (_log.shouldLog(Log.DEBUG))
_log.debug("TunnelTests for period " + period + ": " + tunnelTests +
" last: " + tunnelTestRate.getLastEventCount() + " lifetime: " +
tunnelTestRate.getLifetimeEventCount());
return dbResponses + tunnelResponses + tunnelTests;
}
}
private double calcRate(Rate rate) {
long events = rate.getLastEventCount() + rate.getCurrentEventCount();
/**
* Retrieve the average measured round trip time within the period specified (including
* db responses, tunnel create responses, and tunnel tests). If the period is negative,
* it uses the lifetime stats. In addition, it weights each of those three measurements
* equally according to their event count (e.g. 4 dbResponses @ 10 seconds and 1 tunnel test
* at 5 seconds will leave the average at 9 seconds)
*
*/
private double getMeasuredRoundTripTime(PeerProfile profile, long period, boolean tunnelTestOnly) {
if (period < 0) {
Rate dbResponseRate = profile.getDbResponseTime().getRate(60*60*1000l);
Rate tunnelResponseRate = profile.getTunnelCreateResponseTime().getRate(60*60*1000l);
Rate tunnelTestRate = profile.getTunnelTestResponseTime().getRate(60*60*1000l);
long dbResponses = tunnelTestOnly ? 0 : dbResponseRate.getLifetimeEventCount();
long tunnelResponses = tunnelTestOnly ? 0 : tunnelResponseRate.getLifetimeEventCount();
long tunnelTests = tunnelTestRate.getLifetimeEventCount();
double dbResponseTime = tunnelTestOnly ? 0 : dbResponseRate.getLifetimeAverageValue();
double tunnelResponseTime = tunnelTestOnly ? 0 : tunnelResponseRate.getLifetimeAverageValue();
double tunnelTestTime = tunnelTestRate.getLifetimeAverageValue();
long events = dbResponses + tunnelResponses + tunnelTests;
if (events <= 0) return 0;
return (dbResponses*dbResponseTime + tunnelResponses*tunnelResponseTime + tunnelTests*tunnelTestTime)
/ events;
} else {
Rate dbResponseRate = profile.getDbResponseTime().getRate(period);
Rate tunnelResponseRate = profile.getTunnelCreateResponseTime().getRate(period);
Rate tunnelTestRate = profile.getTunnelTestResponseTime().getRate(period);
long dbResponses = tunnelTestOnly ? 0 : dbResponseRate.getCurrentEventCount();
long tunnelResponses = tunnelTestOnly ? 0 : tunnelResponseRate.getCurrentEventCount();
long tunnelTests = tunnelTestRate.getCurrentEventCount();
double dbResponseTime = tunnelTestOnly ? 0 : dbResponseRate.getAverageValue();
double tunnelResponseTime = tunnelTestOnly ? 0 : tunnelResponseRate.getAverageValue();
double tunnelTestTime = tunnelTestRate.getAverageValue();
long events = dbResponses + tunnelResponses + tunnelTests;
if (events <= 0) return 0;
return (dbResponses*dbResponseTime + tunnelResponses*tunnelResponseTime + tunnelTests*tunnelTestTime)
/ events;
}
}
private double getEstimatedRoundTripTime(PeerProfile profile, long period) {
double estSendTime = getEstimatedSendTime(profile, period);
double estRecvTime = getEstimatedReceiveTime(profile, period);
return estSendTime + estRecvTime;
}
private double getEstimatedSendTime(PeerProfile profile, long period) {
double bps = calcRate(profile.getSendSuccessSize(), period);
if (bps <= 0)
return 0.0d;
else
return 2048.0d / bps;
}
private double getEstimatedReceiveTime(PeerProfile profile, long period) {
double bps = calcRate(profile.getReceiveSize(), period);
if (bps <= 0)
return 0.0d;
else
return 2048.0d / bps;
}
private double calcRate(RateStat stat, long period) {
Rate rate = stat.getRate(period);
if (rate == null) return 0.0d;
return calcRate(rate, period);
}
private double calcRate(Rate rate, long period) {
long events = rate.getCurrentEventCount();
if (events >= 1) {
double ms = rate.getLastTotalEventTime() + rate.getCurrentTotalEventTime();
double bytes = rate.getLastTotalValue() + rate.getCurrentTotalValue();
double ms = rate.getCurrentTotalEventTime();
double bytes = rate.getCurrentTotalValue();
if (_log.shouldLog(Log.DEBUG))
_log.debug("calculating rate: ms=" + ((int)ms) + " bytes=" + ((int)bytes));
if ( (bytes > 0) && (ms > 0) ) {
return (bytes * 1000.0d) / ms;
if (getUseInstantaneousRates()) {
return (bytes * 1000.0d) / ms;
} else {
// period average
return (bytes * 1000.0d) / period;
}
}
}
return 0.0d;
}
/**
* What is the minimum number of measured events we want in a period before
* trusting the values? This first checks the router's configuration, then
* the context, and then finally falls back on a static default (100).
*
*/
private long getEventThreshold() {
if (_context.router() != null) {
String threshold = _context.router().getConfigSetting(PROP_EVENT_THRESHOLD);
if (threshold != null) {
try {
return Long.parseLong(threshold);
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Event threshold for speed improperly set in the router config [" + threshold + "]", nfe);
}
}
}
String threshold = _context.getProperty(PROP_EVENT_THRESHOLD, ""+DEFAULT_EVENT_THRESHOLD);
if (threshold != null) {
try {
return Long.parseLong(threshold);
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Event threshold for speed improperly set in the router environment [" + threshold + "]", nfe);
}
}
return DEFAULT_EVENT_THRESHOLD;
}
/**
* Should we use instantaneous rates for the estimated speed, or the period rates?
* This first checks the router's configuration, then the context, and then
* finally falls back on a static default (true).
*
* @return true if we should use instantaneous rates, false if we should use period averages
*/
private boolean getUseInstantaneousRates() {
if (_context.router() != null) {
String val = _context.router().getConfigSetting(PROP_USE_INSTANTANEOUS_RATES);
if (val != null) {
try {
return Boolean.getBoolean(val);
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Instantaneous rate for speed improperly set in the router config [" + val + "]", nfe);
}
}
}
String val = _context.getProperty(PROP_USE_INSTANTANEOUS_RATES, ""+DEFAULT_USE_INSTANTANEOUS_RATES);
if (val != null) {
try {
return Boolean.getBoolean(val);
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Instantaneous rate for speed improperly set in the router environment [" + val + "]", nfe);
}
}
return DEFAULT_USE_INSTANTANEOUS_RATES;
}
/**
* Should we only use the measured tunnel testing time, or should we include
* measurements on the db responses and tunnel create responses. This first
* checks the router's configuration, then the context, and then finally falls
* back on a static default (true).
*
* @return true if we should use tunnel test time only, false if we should use all available
*/
private boolean getUseTunnelTestOnly() {
if (_context.router() != null) {
String val = _context.router().getConfigSetting(PROP_USE_TUNNEL_TEST_ONLY);
if (val != null) {
try {
boolean rv = Boolean.getBoolean(val);
if (_log.shouldLog(Log.DEBUG))
_log.debug("router config said " + PROP_USE_TUNNEL_TEST_ONLY + '=' + val);
return rv;
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Tunnel test only for speed improperly set in the router config [" + val + "]", nfe);
}
}
}
String val = _context.getProperty(PROP_USE_TUNNEL_TEST_ONLY, ""+DEFAULT_USE_TUNNEL_TEST_ONLY);
if (val != null) {
try {
boolean rv = Boolean.getBoolean(val);
if (_log.shouldLog(Log.DEBUG))
_log.debug("router context said " + PROP_USE_TUNNEL_TEST_ONLY + '=' + val);
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Tunnel test only for speed improperly set in the router environment [" + val + "]", nfe);
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("no config for " + PROP_USE_TUNNEL_TEST_ONLY + ", using " + DEFAULT_USE_TUNNEL_TEST_ONLY);
return DEFAULT_USE_TUNNEL_TEST_ONLY;
}
}

View File

@ -37,26 +37,33 @@ import net.i2p.router.RouterContext;
class TestTunnelJob extends JobImpl {
private Log _log;
private TunnelId _id;
/** tunnel that we want to test */
private TunnelId _primaryId;
/** tunnel that is used to help test the primary id */
private TunnelId _secondaryId;
private TunnelPool _pool;
private long _nonce;
public TestTunnelJob(RouterContext ctx, TunnelId id, TunnelPool pool) {
super(ctx);
_log = ctx.logManager().getLog(TestTunnelJob.class);
_id = id;
_primaryId = id;
_pool = pool;
_nonce = ctx.random().nextInt(Integer.MAX_VALUE);
}
public String getName() { return "Test Tunnel"; }
public void runJob() {
if (_log.shouldLog(Log.INFO))
_log.info("Testing tunnel " + _id.getTunnelId());
TunnelInfo info = _pool.getTunnelInfo(_id);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Testing tunnel " + _primaryId.getTunnelId());
TunnelInfo info = _pool.getTunnelInfo(_primaryId);
if (info == null) {
_log.error("wtf, why are we testing a tunnel that we do not know about? [" + _id.getTunnelId() + "]", getAddedBy());
_log.error("wtf, why are we testing a tunnel that we do not know about? ["
+ _primaryId.getTunnelId() + "]", getAddedBy());
return;
}
// mark it as something we're testing
info.setLastTested(_context.clock().now());
if (isOutbound(info)) {
testOutbound(info);
} else {
@ -75,7 +82,7 @@ class TestTunnelJob extends JobImpl {
return false;
}
private final static long TEST_TIMEOUT = 60*1000; // 60 seconds for a test to succeed
private final static long TEST_TIMEOUT = 30*1000; // 30 seconds for a test to succeed
private final static int TEST_PRIORITY = 100;
/**
@ -83,22 +90,51 @@ class TestTunnelJob extends JobImpl {
* to ourselves and wait for it to arrive.
*/
private void testOutbound(TunnelInfo info) {
if (_log.shouldLog(Log.INFO))
_log.info("Testing outbound tunnel " + info);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Testing outbound tunnel " + info);
DeliveryStatusMessage msg = new DeliveryStatusMessage(_context);
msg.setArrival(new Date(_context.clock().now()));
msg.setMessageId(_nonce);
Hash us = _context.routerHash();
TunnelId inboundTunnelId = getReplyTunnel();
if (inboundTunnelId == null) {
_secondaryId = getReplyTunnel();
if (_secondaryId == null) {
_context.jobQueue().addJob(new TestFailedJob());
return;
}
TunnelInfo inboundInfo = _pool.getTunnelInfo(_secondaryId);
inboundInfo.setLastTested(_context.clock().now());
TestFailedJob failureJob = new TestFailedJob();
MessageSelector selector = new TestMessageSelector(msg.getMessageId(), info.getTunnelId().getTunnelId());
SendTunnelMessageJob testJob = new SendTunnelMessageJob(_context, msg, info.getTunnelId(), us, inboundTunnelId, null, new TestSuccessfulJob(), failureJob, selector, TEST_TIMEOUT, TEST_PRIORITY);
SendTunnelMessageJob testJob = new SendTunnelMessageJob(_context, msg, info.getTunnelId(), us, _secondaryId, null, new TestSuccessfulJob(), failureJob, selector, TEST_TIMEOUT, TEST_PRIORITY);
_context.jobQueue().addJob(testJob);
}
/**
* Send a message to the gateway and wait for it to arrive.
*/
private void testInbound(TunnelInfo info) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Testing inbound tunnel " + info);
DeliveryStatusMessage msg = new DeliveryStatusMessage(_context);
msg.setArrival(new Date(_context.clock().now()));
msg.setMessageId(_nonce);
_secondaryId = getOutboundTunnel();
if (_secondaryId == null) {
_context.jobQueue().addJob(new TestFailedJob());
return;
}
TunnelInfo outboundInfo = _pool.getTunnelInfo(_secondaryId);
outboundInfo.setLastTested(_context.clock().now());
TestFailedJob failureJob = new TestFailedJob();
MessageSelector selector = new TestMessageSelector(msg.getMessageId(), info.getTunnelId().getTunnelId());
SendTunnelMessageJob j = new SendTunnelMessageJob(_context, msg, _secondaryId, info.getThisHop(), info.getTunnelId(), null, new TestSuccessfulJob(), failureJob, selector, _context.clock().now()+TEST_TIMEOUT, TEST_PRIORITY);
_context.jobQueue().addJob(j);
}
/**
* Get the tunnel for replies to be sent down when testing outbound tunnels
@ -116,7 +152,7 @@ class TestTunnelJob extends JobImpl {
for (int i = 0; i < tunnelIds.size(); i++) {
TunnelId id = (TunnelId)tunnelIds.get(i);
if (id.equals(_id)) {
if (id.equals(_primaryId)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Not testing a tunnel with itself [duh]");
} else {
@ -124,39 +160,36 @@ class TestTunnelJob extends JobImpl {
}
}
_log.error("Unable to test tunnel " + _id + ", since there are NO OTHER INBOUND TUNNELS to receive the ack through");
_log.error("Unable to test tunnel " + _primaryId + ", since there are NO OTHER INBOUND TUNNELS to receive the ack through");
return null;
}
/**
* Send a message to the gateway and wait for it to arrive.
* todo: send the message to the gateway via an outbound tunnel or garlic, NOT DIRECT.
* Get the tunnel to send thte message out when testing inbound tunnels
*
*/
private void testInbound(TunnelInfo info) {
if (_log.shouldLog(Log.INFO))
_log.info("Testing inbound tunnel " + info);
DeliveryStatusMessage msg = new DeliveryStatusMessage(_context);
msg.setArrival(new Date(_context.clock().now()));
msg.setMessageId(_nonce);
TestFailedJob failureJob = new TestFailedJob();
MessageSelector selector = new TestMessageSelector(msg.getMessageId(), info.getTunnelId().getTunnelId());
TunnelMessage tmsg = new TunnelMessage(_context);
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
msg.writeBytes(baos);
tmsg.setData(baos.toByteArray());
tmsg.setTunnelId(info.getTunnelId());
_context.jobQueue().addJob(new SendMessageDirectJob(_context, tmsg, info.getThisHop(), new TestSuccessfulJob(), failureJob, selector, _context.clock().now() + TEST_TIMEOUT, TEST_PRIORITY));
String bodyType = msg.getClass().getName();
_context.messageHistory().wrap(bodyType, msg.getUniqueId(), TunnelMessage.class.getName(), tmsg.getUniqueId());
} catch (IOException ioe) {
_log.error("Error writing out the tunnel message to send to the tunnel", ioe);
_pool.tunnelFailed(_id);
} catch (DataFormatException dfe) {
_log.error("Error writing out the tunnel message to send to the tunnel", dfe);
_pool.tunnelFailed(_id);
private TunnelId getOutboundTunnel() {
TunnelSelectionCriteria crit = new TunnelSelectionCriteria();
crit.setMinimumTunnelsRequired(2);
crit.setMaximumTunnelsRequired(2);
// arbitrary priorities
crit.setAnonymityPriority(50);
crit.setLatencyPriority(50);
crit.setReliabilityPriority(50);
List tunnelIds = _context.tunnelManager().selectOutboundTunnelIds(crit);
for (int i = 0; i < tunnelIds.size(); i++) {
TunnelId id = (TunnelId)tunnelIds.get(i);
if (id.equals(_primaryId)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Not testing a tunnel with itself [duh]");
} else {
return id;
}
}
_log.error("Unable to test tunnel " + _primaryId + ", since there are NO OTHER OUTBOUND TUNNELS to send the ack through");
return null;
}
private class TestFailedJob extends JobImpl {
@ -167,8 +200,17 @@ class TestTunnelJob extends JobImpl {
public String getName() { return "Tunnel Test Failed"; }
public void runJob() {
if (_log.shouldLog(Log.WARN))
_log.warn("Test of tunnel " + _id.getTunnelId() + " failed while waiting for nonce " + _nonce, getAddedBy());
_pool.tunnelFailed(_id);
_log.warn("Test of tunnel " + _primaryId.getTunnelId()
+ " failed while waiting for nonce " + _nonce + ": "
+ _pool.getTunnelInfo(_primaryId), getAddedBy());
_pool.tunnelFailed(_primaryId);
if (_secondaryId != null) {
if (_log.shouldLog(Log.WARN))
_log.warn("Secondary test of tunnel " + _secondaryId.getTunnelId()
+ " failed while waiting for nonce " + _nonce + ": "
+ _pool.getTunnelInfo(_secondaryId), getAddedBy());
_pool.tunnelFailed(_secondaryId);
}
}
}
@ -183,10 +225,30 @@ class TestTunnelJob extends JobImpl {
public void runJob() {
long time = (_context.clock().now() - _msg.getArrival().getTime());
if (_log.shouldLog(Log.INFO))
_log.info("Test of tunnel " + _id+ " successfull after " + time + "ms waiting for " + _nonce);
TunnelInfo info = _pool.getTunnelInfo(_id);
if (info != null)
_log.info("Test of tunnel " + _primaryId+ " successfull after "
+ time + "ms waiting for " + _nonce);
TunnelInfo info = _pool.getTunnelInfo(_primaryId);
if (info != null) {
TestTunnelJob.this._context.messageHistory().tunnelValid(info, time);
updateProfiles(info, time);
}
info = _pool.getTunnelInfo(_secondaryId);
if (info != null) {
TestTunnelJob.this._context.messageHistory().tunnelValid(info, time);
updateProfiles(info, time);
}
_context.statManager().addRateData("tunnel.testSuccessTime", time, time);
}
private void updateProfiles(TunnelInfo info, long time) {
TunnelInfo cur = info;
while (cur != null) {
Hash peer = cur.getThisHop();
if ( (peer != null) && (!_context.routerHash().equals(peer)) )
_context.profileManager().tunnelTestSucceeded(peer, time);
cur = cur.getNextHopInfo();
}
}
public void setMessage(I2NPMessage message) {
@ -205,15 +267,16 @@ class TestTunnelJob extends JobImpl {
_found = false;
_expiration = _context.clock().now() + TEST_TIMEOUT;
if (_log.shouldLog(Log.DEBUG))
_log.debug("the expiration while testing tunnel " + tunnelId + " waiting for nonce " + id + ": " + new Date(_expiration));
_log.debug("the expiration while testing tunnel " + tunnelId
+ " waiting for nonce " + id + ": " + new Date(_expiration));
}
public boolean continueMatching() {
if (!_found) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Continue matching while looking for nonce for tunnel " + _tunnelId);
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Don't continue matching for tunnel " + _tunnelId + " / " + _id);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Don't continue matching for tunnel " + _tunnelId + " / " + _id);
}
return !_found;
}
@ -229,12 +292,15 @@ class TestTunnelJob extends JobImpl {
DeliveryStatusMessage msg = (DeliveryStatusMessage)message;
if (msg.getMessageId() == _id) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Found successful test of tunnel " + _tunnelId + " after " + (_context.clock().now() - msg.getArrival().getTime()) + "ms waiting for " + _id);
_log.debug("Found successful test of tunnel " + _tunnelId + " after "
+ (_context.clock().now() - msg.getArrival().getTime())
+ "ms waiting for " + _id);
_found = true;
return true;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Found a delivery status message, but it contains nonce " + msg.getMessageId() + " and not " + _id);
_log.debug("Found a delivery status message, but it contains nonce "
+ msg.getMessageId() + " and not " + _id);
}
} else {
//_log.debug("Not a match while looking to test tunnel " + _tunnelId + " with nonce " + _id + " (" + message + ")");
@ -244,7 +310,8 @@ class TestTunnelJob extends JobImpl {
public String toString() {
StringBuffer buf = new StringBuffer(256);
buf.append(super.toString());
buf.append(": TestMessageSelector: tunnel ").append(_tunnelId).append(" looking for ").append(_id).append(" expiring on ");
buf.append(": TestMessageSelector: tunnel ").append(_tunnelId);
buf.append(" looking for ").append(_id).append(" expiring on ");
buf.append(new Date(_expiration));
return buf.toString();
}

View File

@ -32,16 +32,13 @@ class TunnelTestManager {
private TunnelPool _pool;
private boolean _stopTesting;
private final static long MINIMUM_RETEST_DELAY = 60*1000; // dont test tunnels more than once every 30 seconds
/** avg # tests per tunnel lifetime that we want */
private final static int TESTS_PER_DURATION = 2;
/** how many times we'll be able to try the tests (this should take into consideration user prefs, but fsck it for now) */
private final static int CHANCES_PER_DURATION = 8;
/** dont test any particular tunnel more than once a minute */
private final static long MINIMUM_RETEST_DELAY = 60*1000;
public TunnelTestManager(RouterContext ctx, TunnelPool pool) {
_context = ctx;
_log = ctx.logManager().getLog(TunnelTestManager.class);
ctx.statManager().createRateStat("tunnel.testSuccessTime", "How long do successful tunnel tests take?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
_pool = pool;
_stopTesting = false;
_context.jobQueue().addJob(new CoordinateTunnelTestingJob());
@ -61,18 +58,28 @@ class TunnelTestManager {
// skip not ready tunnels
} else if (info.getSettings().getExpiration() < now + MINIMUM_RETEST_DELAY) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Tunnel " + id.getTunnelId() + " will be expiring within the current period (" + new Date(info.getSettings().getExpiration()) + "), so skip testing it");
_log.debug("Tunnel " + id.getTunnelId()
+ " will be expiring within the current period ("
+ new Date(info.getSettings().getExpiration())
+ "), so skip testing it");
} else if (info.getSettings().getCreated() + MINIMUM_RETEST_DELAY < now) {
double probability = TESTS_PER_DURATION / (allIds.size() * CHANCES_PER_DURATION);
if (_context.random().nextInt(10) <= (probability*10d)) {
toTest.add(id);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Tunnel " + id.getTunnelId() + " could be tested, but probabilistically isn't going to be");
// we're past the initial buffer period
if (info.getLastTested() + MINIMUM_RETEST_DELAY < now) {
// we haven't tested this tunnel in the minimum delay, so maybe we
// should.
if (_context.random().nextBoolean()) {
toTest.add(id);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("We could have tested tunnel " + id.getTunnelId()
+ ", but randomly decided not to.");
}
}
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Tunnel " + id.getTunnelId() + " was just created (" + new Date(info.getSettings().getCreated()) + "), wait until the next pass to test it");
_log.debug("Tunnel " + id.getTunnelId() + " was just created ("
+ new Date(info.getSettings().getCreated())
+ "), wait until the next pass to test it");
}
} else {
if (_log.shouldLog(Log.WARN))
@ -112,11 +119,8 @@ class TunnelTestManager {
}
private void reschedule() {
long minNext = TunnelTestManager.this._context.clock().now() + MINIMUM_RETEST_DELAY;
long nxt = minNext + TunnelTestManager.this._context.random().nextInt(60*1000); // test tunnels once every 30-90 seconds
long nxt = TunnelTestManager.this._context.clock().now() + 30*1000;
getTiming().setStartAfter(nxt);
if (_log.shouldLog(Log.INFO))
_log.info("Rescheduling tunnel tests for " + new Date(nxt));
TunnelTestManager.this._context.jobQueue().addJob(CoordinateTunnelTestingJob.this);
}
}