2005-03-14 jrandom

* New strict speed calculator that goes off the actual number of messages
      verifiably sent through the peer by way of tunnels.  Initially, this only
      contains the successful message count on inbound tunnels, but may be
      augmented later to include verified outbound messages, peers queried in
      the netDb, etc.  The speed calculation decays quickly, but should give
      a better differential than the previous stat (both values are shown on
      the /profiles.jsp page)
This commit is contained in:
jrandom
2005-03-15 03:47:14 +00:00
committed by zzz
parent f9aa3aef18
commit b20aee6753
11 changed files with 185 additions and 11 deletions

View File

@ -1,4 +1,13 @@
$Id: history.txt,v 1.167 2005/03/07 21:45:14 jrandom Exp $
$Id: history.txt,v 1.168 2005/03/11 17:23:41 jrandom Exp $
2005-03-14 jrandom
* New strict speed calculator that goes off the actual number of messages
verifiably sent through the peer by way of tunnels. Initially, this only
contains the successful message count on inbound tunnels, but may be
augmented later to include verified outbound messages, peers queried in
the netDb, etc. The speed calculation decays quickly, but should give
a better differential than the previous stat (both values are shown on
the /profiles.jsp page)
2005-03-11 jrandom
* Rather than the fixed resend timeout floor (10s), use 10s+RTT as the

View File

@ -18,6 +18,7 @@ import net.i2p.router.peermanager.ProfileManagerImpl;
import net.i2p.router.peermanager.ProfileOrganizer;
import net.i2p.router.peermanager.ReliabilityCalculator;
import net.i2p.router.peermanager.SpeedCalculator;
import net.i2p.router.peermanager.StrictSpeedCalculator;
import net.i2p.router.transport.CommSystemFacadeImpl;
import net.i2p.router.transport.FIFOBandwidthLimiter;
import net.i2p.router.transport.OutboundMessageRegistry;
@ -62,6 +63,7 @@ public class RouterContext extends I2PAppContext {
private Calculator _speedCalc;
private Calculator _reliabilityCalc;
private Calculator _capacityCalc;
private Calculator _oldSpeedCalc;
private static List _contexts = new ArrayList(1);
@ -114,7 +116,8 @@ public class RouterContext extends I2PAppContext {
_throttle = new RouterDoSThrottle(this);
_isFailingCalc = new IsFailingCalculator(this);
_integrationCalc = new IntegrationCalculator(this);
_speedCalc = new SpeedCalculator(this);
_speedCalc = new StrictSpeedCalculator(this);
_oldSpeedCalc = new SpeedCalculator(this);
_reliabilityCalc = new ReliabilityCalculator(this);
_capacityCalc = new CapacityCalculator(this);
}
@ -248,6 +251,7 @@ public class RouterContext extends I2PAppContext {
public Calculator integrationCalculator() { return _integrationCalc; }
/** how do we rank the speed of profiles? */
public Calculator speedCalculator() { return _speedCalc; }
public Calculator oldSpeedCalculator() { return _oldSpeedCalc; }
/** how do we rank the reliability of profiles? */
public Calculator reliabilityCalculator() { return _reliabilityCalc; }
/** how do we rank the capacity of profiles? */

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.161 $ $Date: 2005/03/07 21:45:15 $";
public final static String ID = "$Revision: 1.162 $ $Date: 2005/03/11 17:23:41 $";
public final static String VERSION = "0.5.0.2";
public final static long BUILD = 2;
public final static long BUILD = 3;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -38,6 +38,7 @@ public class PeerProfile {
private double _reliabilityValue;
private double _capacityValue;
private double _integrationValue;
private double _oldSpeedValue;
private boolean _isFailing;
// good vs bad behavior
private TunnelHistory _tunnelHistory;
@ -186,6 +187,7 @@ public class PeerProfile {
*
*/
public double getSpeedValue() { return _speedValue; }
public double getOldSpeedValue() { return _oldSpeedValue; }
/**
* How likely are they to stay up and pass on messages over the next few minutes.
* Positive numbers means more likely, negative numbers means its probably not
@ -287,6 +289,7 @@ public class PeerProfile {
_tunnelHistory.coalesceStats();
_speedValue = calculateSpeed();
_oldSpeedValue = calculateOldSpeed();
_reliabilityValue = calculateReliability();
_capacityValue = calculateCapacity();
_integrationValue = calculateIntegration();
@ -297,6 +300,7 @@ public class PeerProfile {
}
private double calculateSpeed() { return _context.speedCalculator().calc(this); }
private double calculateOldSpeed() { return _context.oldSpeedCalculator().calc(this); }
private double calculateReliability() { return _context.reliabilityCalculator().calc(this); }
private double calculateCapacity() { return _context.capacityCalculator().calc(this); }
private double calculateIntegration() { return _context.integrationCalculator().calc(this); }

View File

@ -102,7 +102,8 @@ class ProfileOrganizerRenderer {
}
if (isIntegrated) buf.append(", Integrated");
buf.append("<td align=\"right\">").append(num(prof.getSpeedValue())).append("</td>");
buf.append("<td align=\"right\">").append(num(prof.getSpeedValue()));
buf.append('/').append(num(prof.getOldSpeedValue())).append("</td>");
buf.append("<td align=\"right\">").append(num(prof.getCapacityValue())).append("</td>");
buf.append("<td align=\"right\">").append(num(prof.getIntegrationValue())).append("</td>");
buf.append("<td align=\"right\">").append(prof.getIsFailing()).append("</td>");

View File

@ -0,0 +1,90 @@
package net.i2p.router.peermanager;
import net.i2p.stat.Rate;
import net.i2p.stat.RateStat;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
/**
* Simple speed calculator that just counts how many messages go through the
* tunnel.
*
*/
public class StrictSpeedCalculator extends Calculator {
private Log _log;
private RouterContext _context;
public StrictSpeedCalculator(RouterContext context) {
_context = context;
_log = context.logManager().getLog(StrictSpeedCalculator.class);
}
public double calc(PeerProfile profile) {
return countSuccesses(profile);
}
private double countSuccesses(PeerProfile profile) {
RateStat success = profile.getTunnelHistory().getProcessSuccessRate();
RateStat failure = profile.getTunnelHistory().getProcessFailureRate();
return messagesPerMinute(success, failure);
}
private double messagesPerMinute(RateStat success, RateStat failure) {
double rv = 0.0d;
if (success != null) {
Rate rate = null;
long periods[] = success.getPeriods();
for (int i = 0; i < periods.length; i++) {
rate = success.getRate(periods[i]);
if ( (rate != null) && (rate.getCurrentTotalValue() > 0) )
break;
}
double value = rate.getCurrentTotalValue();
value += rate.getLastTotalValue();
rv = value * 10.0d * 60.0d * 1000.0d / (double)rate.getPeriod();
// if any of the messages are getting fragmented and cannot be
// handled, penalize like crazy
Rate fail = failure.getRate(rate.getPeriod());
if (fail.getCurrentTotalValue() > 0)
rv /= fail.getCurrentTotalValue();
}
return rv;
}
/*
public double calc(PeerProfile profile) {
double successCount = countSuccesses(profile);
double failureCount = countFailures(profile);
double rv = successCount - 5*failureCount;
if (rv < 0)
rv = 0;
return rv;
}
private double countSuccesses(PeerProfile profile) {
RateStat success = profile.getTunnelHistory().getProcessSuccessRate();
return messagesPerMinute(success);
}
private double countFailures(PeerProfile profile) {
RateStat failure = profile.getTunnelHistory().getProcessFailureRate();
return messagesPerMinute(failure);
}
private double messagesPerMinute(RateStat stat) {
double rv = 0.0d;
if (stat != null) {
Rate rate = null;
long periods[] = stat.getPeriods();
for (int i = 0; i < periods.length; i++) {
rate = stat.getRate(periods[i]);
if ( (rate != null) && (rate.getCurrentTotalValue() > 0) )
break;
}
double value = rate.getCurrentTotalValue();
value += rate.getLastTotalValue();
rv = value * 60.0d * 1000.0d / (double)rate.getPeriod();
}
return rv;
}
*/
}

View File

@ -26,6 +26,8 @@ public class TunnelHistory {
private volatile long _lastFailed;
private RateStat _rejectRate;
private RateStat _failRate;
private RateStat _processSuccessRate;
private RateStat _processFailureRate;
private String _statGroup;
/** probabalistic tunnel rejection due to a flood of requests */
@ -47,8 +49,12 @@ public class TunnelHistory {
private void createRates(String statGroup) {
_rejectRate = new RateStat("tunnelHistory.rejectRate", "How often does this peer reject a tunnel request?", statGroup, new long[] { 60*1000l, 10*60*1000l, 30*60*1000l, 60*60*1000l, 24*60*60*1000l });
_failRate = new RateStat("tunnelHistory.failRate", "How often do tunnels this peer accepts fail?", statGroup, new long[] { 60*1000l, 10*60*1000l, 30*60*1000l, 60*60*1000l, 24*60*60*1000l });
_processSuccessRate = new RateStat("tunnelHistory.processSuccessRate", "How many messages does a tunnel process?", statGroup, new long[] { 5*60*1000l, 10*60*1000l, 30*60*1000l, 60*60*1000l, 24*60*60*1000l });
_processFailureRate = new RateStat("tunnelHistory.processfailureRate", "How many messages does a tunnel fail?", statGroup, new long[] { 5*60*1000l, 10*60*1000l, 30*60*1000l, 60*60*1000l, 24*60*60*1000l });
_rejectRate.setStatLog(_context.statManager().getStatLog());
_failRate.setStatLog(_context.statManager().getStatLog());
_processSuccessRate.setStatLog(_context.statManager().getStatLog());
_processFailureRate.setStatLog(_context.statManager().getStatLog());
}
/** total tunnels the peer has agreed to participate in */
@ -70,6 +76,13 @@ public class TunnelHistory {
/** when the last tunnel the peer participated in failed */
public long getLastFailed() { return _lastFailed; }
public void incrementProcessed(int processedSuccessfully, int failedProcessing) {
if (processedSuccessfully > 0)
_processSuccessRate.addData(processedSuccessfully, 0);
if (failedProcessing > 0)
_processFailureRate.addData(failedProcessing, 0);
}
public void incrementAgreedTo() {
_lifetimeAgreedTo++;
_lastAgreedTo = _context.clock().now();
@ -113,12 +126,16 @@ public class TunnelHistory {
public RateStat getRejectionRate() { return _rejectRate; }
public RateStat getFailedRate() { return _failRate; }
public RateStat getProcessSuccessRate() { return _processSuccessRate; }
public RateStat getProcessFailureRate() { return _processFailureRate; }
public void coalesceStats() {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Coallescing stats");
_rejectRate.coalesceStats();
_failRate.coalesceStats();
_processFailureRate.coalesceStats();
_processSuccessRate.coalesceStats();
}
private final static String NL = System.getProperty("line.separator");
@ -140,7 +157,9 @@ public class TunnelHistory {
add(buf, "lifetimeRejected", _lifetimeRejected, "How many tunnels has the peer ever refused to participate in?");
out.write(buf.toString().getBytes());
_rejectRate.store(out, "tunnelHistory.rejectRate");
_rejectRate.store(out, "tunnelHistory.failRate");
_failRate.store(out, "tunnelHistory.failRate");
_processSuccessRate.store(out, "tunnelHistory.processSuccessRate");
_processFailureRate.store(out, "tunnelHistory.processFailureRate");
}
private void add(StringBuffer buf, String name, long val, String description) {
@ -162,9 +181,15 @@ public class TunnelHistory {
_rejectRate.load(props, "tunnelHistory.rejectRate", true);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Loading tunnelHistory.rejectRate");
_rejectRate.load(props, "tunnelHistory.failRate", true);
_failRate.load(props, "tunnelHistory.failRate", true);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Loading tunnelHistory.failRate");
_processFailureRate.load(props, "tunnelHistory.processFailureRate", true);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Loading tunnelHistory.processFailureRate");
_processSuccessRate.load(props, "tunnelHistory.processSuccessRate", true);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Loading tunnelHistory.processSuccessRate");
} catch (IllegalArgumentException iae) {
_log.warn("TunnelHistory rates are corrupt, resetting", iae);
createRates(_statGroup);

View File

@ -29,6 +29,8 @@ public class FragmentHandler {
private Log _log;
private Map _fragmentedMessages;
private DefragmentedReceiver _receiver;
private int _completed;
private int _failed;
/** don't wait more than 60s to defragment the partial message */
private static final long MAX_DEFRAGMENT_TIME = 60*1000;
@ -84,6 +86,9 @@ public class FragmentHandler {
}
}
public int getCompleteCount() { return _completed; }
public int getFailedCount() { return _failed; }
private static final ByteCache _validateCache = ByteCache.getInstance(512, TrivialPreprocessor.PREPROCESSED_SIZE);
/**
@ -312,6 +317,7 @@ public class FragmentHandler {
}
private void receiveComplete(FragmentedMessage msg) {
_completed++;
try {
byte data[] = msg.toByteArray();
if (_log.shouldLog(Log.DEBUG))
@ -362,6 +368,7 @@ public class FragmentHandler {
removed = (null != _fragmentedMessages.remove(new Long(_msg.getMessageId())));
}
if (removed && !_msg.getReleased()) {
_failed++;
noteFailure(_msg.getMessageId());
if (_log.shouldLog(Log.ERROR))
_log.error("Dropped failed fragmented message: " + _msg);

View File

@ -14,6 +14,7 @@ import net.i2p.data.TunnelId;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.i2np.TunnelDataMessage;
import net.i2p.data.i2np.TunnelGatewayMessage;
import net.i2p.router.peermanager.PeerProfile;
import net.i2p.router.JobImpl;
import net.i2p.router.Router;
import net.i2p.router.Service;
@ -248,22 +249,37 @@ public class TunnelDispatcher implements Service {
TunnelId recvId = cfg.getConfig(cfg.getLength()-1).getReceiveTunnel();
if (_log.shouldLog(Log.DEBUG))
_log.debug("removing our own inbound " + cfg);
boolean removed = false;
TunnelParticipant participant = null;
synchronized (_participants) {
removed = (null != _participants.remove(recvId));
participant = (TunnelParticipant)_participants.remove(recvId);
}
if (!removed) {
if (participant == null) {
synchronized (_inboundGateways) {
_inboundGateways.remove(recvId);
}
} else {
// update stats based off getCompleteCount() + getFailedCount()
for (int i = 0; i < cfg.getLength(); i++) {
Hash peer = cfg.getPeer(i);
PeerProfile profile = _context.profileOrganizer().getProfile(peer);
if (profile != null) {
int ok = participant.getCompleteCount();
int fail = participant.getFailedCount();
profile.getTunnelHistory().incrementProcessed(ok, fail);
}
}
}
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("removing our own outbound " + cfg);
TunnelId outId = cfg.getConfig(0).getSendTunnel();
TunnelGateway gw = null;
synchronized (_outboundGateways) {
_outboundGateways.remove(outId);
gw = (TunnelGateway)_outboundGateways.remove(outId);
}
if (gw != null) {
// update stats based on gw.getMessagesSent()
}
}
}

View File

@ -42,6 +42,7 @@ public class TunnelGateway {
private long _lastFlush;
private int _flushFrequency;
private DelayedFlush _delayedFlush;
private int _messagesSent;
/**
* @param preprocessor this pulls Pending messages off a list, builds some
@ -58,6 +59,7 @@ public class TunnelGateway {
_preprocessor = preprocessor;
_sender = sender;
_receiver = receiver;
_messagesSent = 0;
_flushFrequency = 500;
_delayedFlush = new DelayedFlush();
_lastFlush = _context.clock().now();
@ -82,6 +84,7 @@ public class TunnelGateway {
* @param toTunnel tunnel to send to after the endpoint (or null for endpoint or router processing)
*/
public void add(I2NPMessage msg, Hash toRouter, TunnelId toTunnel) {
_messagesSent++;
boolean delayedFlush = false;
Pending cur = new Pending(msg, toRouter, toTunnel);
@ -105,6 +108,8 @@ public class TunnelGateway {
}
}
public int getMessagesSent() { return _messagesSent; }
public interface Sender {
/**
* Take the preprocessed data containing zero or more fragments, encrypt

View File

@ -98,6 +98,19 @@ public class TunnelParticipant {
}
}
public int getCompleteCount() {
if (_handler != null)
return _handler.getCompleteCount();
else
return 0;
}
public int getFailedCount() {
if (_handler != null)
return _handler.getFailedCount();
else
return 0;
}
private class DefragmentedHandler implements FragmentHandler.DefragmentedReceiver {
public void receiveComplete(I2NPMessage msg, Hash toRouter, TunnelId toTunnel) {
if (_log.shouldLog(Log.DEBUG))