2005-03-17 jrandom

* Update the old speed calculator and associated profile data points to
      use a non-tiered moving average of the tunnel test time, avoiding the
      freshness issues of the old tiered speed stats.
    * Explicitly synchronize all of the methods on the PRNG, rather than just
      the feeder methods (sun and kaffe only need the feeder, but it seems ibm
      needs all of them synchronized).
    * Properly use the tunnel tests as part of the profile stats.
    * Don't flood the jobqueue with sequential persist profile tasks, but
      instead, inject a brief scheduling delay between them.
    * Reduce the TCP connection establishment timeout to 20s (which is still
      absurdly excessive)
    * Reduced the max resend delay to 30s so we can get some resends in when
      dealing with client apps that hang up early (e.g. wget)
    * Added more alternative socketManager factories (good call aum!)
This commit is contained in:
jrandom
2005-03-17 22:12:51 +00:00
committed by zzz
parent 538dd07e7b
commit a997a46040
14 changed files with 170 additions and 40 deletions

View File

@ -38,6 +38,26 @@ public class I2PSocketManagerFactory {
public static I2PSocketManager createManager() {
return createManager(getHost(), getPort(), System.getProperties());
}
/**
* Create a socket manager using a brand new destination connected to the
* I2CP router on the local machine on the default port (7654).
*
* @return the newly created socket manager, or null if there were errors
*/
public static I2PSocketManager createManager(Properties opts) {
return createManager(getHost(), getPort(), opts);
}
/**
* Create a socket manager using a brand new destination connected to the
* I2CP router on the specified host and port
*
* @return the newly created socket manager, or null if there were errors
*/
public static I2PSocketManager createManager(String host, int port) {
return createManager(host, port, System.getProperties());
}
/**
* Create a socket manager using a brand new destination connected to the

View File

@ -71,7 +71,7 @@ public class Connection {
private long _lifetimeDupMessageSent;
private long _lifetimeDupMessageReceived;
public static final long MAX_RESEND_DELAY = 60*1000;
public static final long MAX_RESEND_DELAY = 30*1000;
public static final long MIN_RESEND_DELAY = 10*1000;
/** wait up to 5 minutes after disconnection so we can ack/close packets */

View File

@ -42,7 +42,7 @@ public class RandomSource extends SecureRandom {
* thats what it has been used for.
*
*/
public int nextInt(int n) {
public synchronized int nextInt(int n) {
if (n == 0) return 0;
int val = super.nextInt(n);
if (val < 0) val = 0 - val;
@ -54,19 +54,48 @@ public class RandomSource extends SecureRandom {
* Like the modified nextInt, nextLong(n) returns a random number from 0 through n,
* including 0, excluding n.
*/
public long nextLong(long n) {
public synchronized long nextLong(long n) {
long v = super.nextLong();
if (v < 0) v = 0 - v;
if (v >= n) v = v % n;
return v;
}
/** synchronized for older versions of kaffe */
public void nextBytes(byte bytes[]) {
synchronized (this) {
super.nextBytes(bytes);
}
}
/**
* override as synchronized, for those JVMs that don't always pull via
* nextBytes (cough ibm)
*/
public synchronized boolean nextBoolean() { return super.nextBoolean(); }
/**
* override as synchronized, for those JVMs that don't always pull via
* nextBytes (cough ibm)
*/
public synchronized void nextBytes(byte buf[]) { super.nextBytes(buf); }
/**
* override as synchronized, for those JVMs that don't always pull via
* nextBytes (cough ibm)
*/
public synchronized double nextDouble() { return super.nextDouble(); }
/**
* override as synchronized, for those JVMs that don't always pull via
* nextBytes (cough ibm)
*/
public synchronized float nextFloat() { return super.nextFloat(); }
/**
* override as synchronized, for those JVMs that don't always pull via
* nextBytes (cough ibm)
*/
public synchronized double nextGaussian() { return super.nextGaussian(); }
/**
* override as synchronized, for those JVMs that don't always pull via
* nextBytes (cough ibm)
*/
public synchronized int nextInt() { return super.nextInt(); }
/**
* override as synchronized, for those JVMs that don't always pull via
* nextBytes (cough ibm)
*/
public synchronized long nextLong() { return super.nextLong(); }
public EntropyHarvester harvester() { return _entropyHarvester; }

View File

@ -1,4 +1,20 @@
$Id: history.txt,v 1.169 2005/03/14 22:47:15 jrandom Exp $
$Id: history.txt,v 1.170 2005/03/17 00:29:55 jrandom Exp $
2005-03-17 jrandom
* Update the old speed calculator and associated profile data points to
use a non-tiered moving average of the tunnel test time, avoiding the
freshness issues of the old tiered speed stats.
* Explicitly synchronize all of the methods on the PRNG, rather than just
the feeder methods (sun and kaffe only need the feeder, but it seems ibm
needs all of them synchronized).
* Properly use the tunnel tests as part of the profile stats.
* Don't flood the jobqueue with sequential persist profile tasks, but
instead, inject a brief scheduling delay between them.
* Reduce the TCP connection establishment timeout to 20s (which is still
absurdly excessive)
* Reduced the max resend delay to 30s so we can get some resends in when
dealing with client apps that hang up early (e.g. wget)
* Added more alternative socketManager factories (good call aum!)
2005-03-16 jrandom
* Adjust the old speed calculator to include end to end RTT data in its

View File

@ -89,7 +89,7 @@ public class OutNetMessage {
*/
public long timestamp(String eventName) {
long now = _context.clock().now();
if (_log.shouldLog(Log.DEBUG)) {
if (_log.shouldLog(Log.INFO)) {
// only timestamp if we are debugging
synchronized (this) {
locked_initTimestamps();
@ -103,7 +103,7 @@ public class OutNetMessage {
return now - _created;
}
public Map getTimestamps() {
if (_log.shouldLog(Log.DEBUG)) {
if (_log.shouldLog(Log.INFO)) {
synchronized (this) {
locked_initTimestamps();
return (Map)_timestamps.clone();
@ -112,7 +112,7 @@ public class OutNetMessage {
return Collections.EMPTY_MAP;
}
public Long getTimestamp(String eventName) {
if (_log.shouldLog(Log.DEBUG)) {
if (_log.shouldLog(Log.INFO)) {
synchronized (this) {
locked_initTimestamps();
return (Long)_timestamps.get(eventName);
@ -301,7 +301,7 @@ public class OutNetMessage {
}
private void renderTimestamps(StringBuffer buf) {
if (_log.shouldLog(Log.DEBUG)) {
if (_log.shouldLog(Log.INFO)) {
synchronized (this) {
long lastWhen = -1;
for (int i = 0; i < _timestampOrder.size(); i++) {

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.163 $ $Date: 2005/03/14 22:47:15 $";
public final static String ID = "$Revision: 1.164 $ $Date: 2005/03/17 00:29:55 $";
public final static String VERSION = "0.5.0.2";
public final static long BUILD = 4;
public final static long BUILD = 5;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -19,6 +19,7 @@ public class PeerProfile {
private long _lastSentToSuccessfully;
private long _lastFailedSend;
private long _lastHeardFrom;
private double _tunnelTestResponseTimeAvg;
// periodic rates
private RateStat _sendSuccessSize = null;
private RateStat _sendFailureSize = null;
@ -61,6 +62,7 @@ public class PeerProfile {
_integrationValue = 0;
_isFailing = false;
_consecutiveShitlists = 0;
_tunnelTestResponseTimeAvg = 0.0d;
_peer = peer;
if (expand)
expandProfile();
@ -213,6 +215,24 @@ public class PeerProfile {
* is this peer actively failing (aka not worth touching)?
*/
public boolean getIsFailing() { return _isFailing; }
public double getTunnelTestTimeAverage() { return _tunnelTestResponseTimeAvg; }
void setTunnelTestTimeAverage(double avg) { _tunnelTestResponseTimeAvg = avg; }
void updateTunnelTestTimeAverage(long ms) {
if (_tunnelTestResponseTimeAvg <= 0)
_tunnelTestResponseTimeAvg = 30*1000; // should we instead start at $ms?
// weighted since we want to let the average grow quickly and shrink slowly
if (ms < _tunnelTestResponseTimeAvg)
_tunnelTestResponseTimeAvg = 0.95*_tunnelTestResponseTimeAvg + .05*(double)ms;
else
_tunnelTestResponseTimeAvg = 0.75*_tunnelTestResponseTimeAvg + .25*(double)ms;
if ( (_peer != null) && (_log.shouldLog(Log.INFO)) )
_log.info("Updating tunnel test time for " + _peer.toBase64().substring(0,6)
+ " to " + _tunnelTestResponseTimeAvg + " via " + ms);
}
/**
* when the given peer is performing so poorly that we don't want to bother keeping
@ -256,9 +276,9 @@ public class PeerProfile {
if (_tunnelCreateResponseTime == null)
_tunnelCreateResponseTime = new RateStat("tunnelCreateResponseTime", "how long it takes to get a tunnel create response from the peer (in milliseconds)", group, new long[] { 10*60*1000l, 30*60*1000l, 60*60*1000l, 24*60*60*1000 } );
if (_tunnelTestResponseTime == null)
_tunnelTestResponseTime = new RateStat("tunnelTestResponseTime", "how long it takes to successfully test a tunnel this peer participates in (in milliseconds)", group, new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000 } );
_tunnelTestResponseTime = new RateStat("tunnelTestResponseTime", "how long it takes to successfully test a tunnel this peer participates in (in milliseconds)", group, new long[] { 10*60*1000l, 30*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000 } );
if (_tunnelTestResponseTimeSlow == null)
_tunnelTestResponseTimeSlow = new RateStat("tunnelTestResponseTimeSlow", "how long it takes to successfully test a peer when the time exceeds 5s", group, new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l, });
_tunnelTestResponseTimeSlow = new RateStat("tunnelTestResponseTimeSlow", "how long it takes to successfully test a peer when the time exceeds 5s", group, new long[] { 10*60*1000l, 30*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l, });
if (_commError == null)
_commError = new RateStat("commErrorRate", "how long between communication errors with the peer (e.g. disconnection)", group, new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000 } );
if (_dbIntroduction == null)

View File

@ -46,7 +46,7 @@ class PersistProfilesJob extends JobImpl {
PersistProfilesJob.this.getContext().jobQueue().addJob(PersistProfilesJob.this);
} else {
// we've got peers left to persist, so requeue the persist profile job
PersistProfilesJob.this.getContext().jobQueue().addJob(PersistProfileJob.this);
PersistProfilesJob.PersistProfileJob.this.requeue(1000);
}
}
public String getName() { return "Persist profile"; }

View File

@ -111,6 +111,7 @@ public class ProfileManagerImpl implements ProfileManager {
public void tunnelTestSucceeded(Hash peer, long responseTimeMs) {
PeerProfile data = getProfile(peer);
if (data == null) return;
data.updateTunnelTestTimeAverage(responseTimeMs);
data.getTunnelTestResponseTime().addData(responseTimeMs, responseTimeMs);
if (responseTimeMs > getSlowThreshold())
data.getTunnelTestResponseTimeSlow().addData(responseTimeMs, responseTimeMs);

View File

@ -109,6 +109,8 @@ class ProfilePersistenceHelper {
buf.append("lastFailedSend=").append(profile.getLastSendFailed()).append(NL);
buf.append("# Last heard from: when did we last get a message from the peer? (milliseconds from the epoch)").append(NL);
buf.append("lastHeardFrom=").append(profile.getLastHeardFrom()).append(NL);
buf.append("# moving average as to how fast the peer replies").append(NL);
buf.append("tunnelTestTimeAverage=").append(profile.getTunnelTestTimeAverage()).append(NL);
buf.append(NL);
out.write(buf.toString().getBytes());
@ -178,6 +180,7 @@ class ProfilePersistenceHelper {
profile.setLastSendSuccessful(getLong(props, "lastSentToSuccessfully"));
profile.setLastSendFailed(getLong(props, "lastFailedSend"));
profile.setLastHeardFrom(getLong(props, "lastHeardFrom"));
profile.setTunnelTestTimeAverage(getDouble(props, "tunnelTestTimeAverage"));
profile.getTunnelHistory().load(props);
profile.getDBHistory().load(props);
@ -214,6 +217,18 @@ class ProfilePersistenceHelper {
}
return 0;
}
private final static double getDouble(Properties props, String key) {
String val = props.getProperty(key);
if (val != null) {
try {
return Double.parseDouble(val);
} catch (NumberFormatException nfe) {
return 0.0;
}
}
return 0.0;
}
private void loadProps(Properties props, File file) {
try {

View File

@ -38,6 +38,7 @@ public class SpeedCalculator extends Calculator {
}
public double calc(PeerProfile profile) {
if (true) return calcAverage(profile);
long threshold = getEventThreshold();
boolean tunnelTestOnly = getUseTunnelTestOnly();
@ -109,16 +110,24 @@ public class SpeedCalculator extends Calculator {
return rv;
}
private double calcAverage(PeerProfile profile) {
double avg = profile.getTunnelTestTimeAverage();
if (avg == 0)
return 0.0;
else
return (60.0*1000.0) / avg;
}
private double adjust(long period, double value) {
switch ((int)period) {
case 10*60*1000:
return value;
case 60*60*1000:
return value * 0.5;
return value * 0.75;
case 24*60*60*1000:
return value * 0.001;
return value * 0.1;
default:
return value * 0.0001;
return value * 0.01;
}
}

View File

@ -76,7 +76,7 @@ public class ConnectionBuilder {
private String _error;
/** If the connection hasn't been built in 30 seconds, give up */
public static final int CONNECTION_TIMEOUT = 30*1000;
public static final int CONNECTION_TIMEOUT = 20*1000;
public static final int WRITE_BUFFER_SIZE = 2*1024;

View File

@ -196,6 +196,7 @@ public class TCPTransport extends TransportImpl {
newPeer = true;
}
msgs.add(msg);
msg.timestamp("TCPTransport.outboundMessageReady queued behind " +(msgs.size()-1));
if (newPeer)
_connectionLock.notifyAll();

View File

@ -24,6 +24,8 @@ class TestJob extends JobImpl {
private TunnelPool _pool;
private PooledTunnelCreatorConfig _cfg;
private boolean _found;
private TunnelInfo _outTunnel;
private TunnelInfo _replyTunnel;
/** base to randomize the test delay on */
private static final int TEST_DELAY = 60*1000;
@ -50,19 +52,19 @@ class TestJob extends JobImpl {
_found = false;
// note: testing with exploratory tunnels always, even if the tested tunnel
// is a client tunnel (per _cfg.getDestination())
TunnelInfo replyTunnel = null;
TunnelInfo outTunnel = null;
_replyTunnel = null;
_outTunnel = null;
if (_cfg.isInbound()) {
replyTunnel = _cfg;
outTunnel = getContext().tunnelManager().selectOutboundTunnel();
_replyTunnel = _cfg;
_outTunnel = getContext().tunnelManager().selectOutboundTunnel();
} else {
replyTunnel = getContext().tunnelManager().selectInboundTunnel();
outTunnel = _cfg;
_replyTunnel = getContext().tunnelManager().selectInboundTunnel();
_outTunnel = _cfg;
}
if ( (replyTunnel == null) || (outTunnel == null) ) {
if ( (_replyTunnel == null) || (_outTunnel == null) ) {
if (_log.shouldLog(Log.ERROR))
_log.error("Insufficient tunnels to test " + _cfg + " with: " + replyTunnel + " / " + outTunnel);
_log.error("Insufficient tunnels to test " + _cfg + " with: " + _replyTunnel + " / " + _outTunnel);
getContext().statManager().addRateData("tunnel.testAborted", _cfg.getLength(), 0);
scheduleRetest();
} else {
@ -77,15 +79,15 @@ class TestJob extends JobImpl {
OnTestReply onReply = new OnTestReply(getContext());
OnTestTimeout onTimeout = new OnTestTimeout(getContext());
getContext().messageRegistry().registerPending(sel, onReply, onTimeout, 3*testPeriod);
sendTest(m, outTunnel, replyTunnel);
sendTest(m);
}
}
private void sendTest(I2NPMessage m, TunnelInfo outTunnel, TunnelInfo replyTunnel) {
private void sendTest(I2NPMessage m) {
if (false) {
getContext().tunnelDispatcher().dispatchOutbound(m, outTunnel.getSendTunnelId(0),
replyTunnel.getReceiveTunnelId(0),
replyTunnel.getPeer(0));
getContext().tunnelDispatcher().dispatchOutbound(m, _outTunnel.getSendTunnelId(0),
_replyTunnel.getReceiveTunnelId(0),
_replyTunnel.getPeer(0));
} else {
// garlic route that DeliveryStatusMessage to ourselves so the endpoints and gateways
// can't tell its a test. to simplify this, we encrypt it with a random key and tag,
@ -116,20 +118,35 @@ class TestJob extends JobImpl {
getContext().sessionKeyManager().tagsReceived(encryptKey, encryptTags);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending garlic test of " + outTunnel + " / " + replyTunnel);
getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnel.getSendTunnelId(0),
replyTunnel.getReceiveTunnelId(0),
replyTunnel.getPeer(0));
_log.debug("Sending garlic test of " + _outTunnel + " / " + _replyTunnel);
getContext().tunnelDispatcher().dispatchOutbound(msg, _outTunnel.getSendTunnelId(0),
_replyTunnel.getReceiveTunnelId(0),
_replyTunnel.getPeer(0));
}
}
public void testSuccessful(int ms) {
getContext().statManager().addRateData("tunnel.testSuccessLength", _cfg.getLength(), 0);
getContext().statManager().addRateData("tunnel.testSuccessTime", ms, 0);
noteSuccess(ms, _outTunnel);
noteSuccess(ms, _replyTunnel);
scheduleRetest();
}
private void noteSuccess(long ms, TunnelInfo tunnel) {
if (tunnel != null)
for (int i = 0; i < tunnel.getLength(); i++)
getContext().profileManager().tunnelTestSucceeded(tunnel.getPeer(i), ms);
}
private void testFailed(long timeToFail) {
if (_found) {
// ok, not really a /success/, but we did find it, even though slowly
noteSuccess(timeToFail, _outTunnel);
noteSuccess(timeToFail, _replyTunnel);
}
if (_pool.getSettings().isExploratory())
getContext().statManager().addRateData("tunnel.testExploratoryFailedTime", timeToFail, timeToFail);
else
@ -144,6 +161,8 @@ class TestJob extends JobImpl {
/** how long we allow tests to run for before failing them */
private int getTestPeriod() { return 20*1000; }
private void scheduleRetest() {
_outTunnel = null;
_replyTunnel = null;
int delay = getDelay();
if (_cfg.getExpiration() > getContext().clock().now() + delay)
requeue(delay);