2005-08-12 jrandom

* Keep detailed stats on the peer testing, publishing the results in the
      netDb.
    * Don't overwrite the status with 'unknown' unless we haven't had a valid
      status in a while.
    * Make sure to avoid shitlisted peers for peer testing.
    * When we get an unknown result to a peer test, try again soon afterwards.
    * When a peer tells us that our address is different from what we expect,
      if we've done a recent peer test with a result of OK, fire off a peer
      test to make sure our IP/port is still valid.  If our test is old or the
      result was not OK, accept their suggestion, but queue up a peer test for
      later.
    * Don't try to do a netDb store to a shitlisted peer, and adjust the way
      we monitor netDb store progress (to clear up the high netDb.storePeers
      stat)
This commit is contained in:
jrandom
2005-08-12 23:54:46 +00:00
committed by zzz
parent 77b995f5ed
commit 1219dadbd5
8 changed files with 153 additions and 30 deletions

View File

@ -1,4 +1,20 @@
$Id: history.txt,v 1.224 2005/08/08 15:35:51 jrandom Exp $
$Id: history.txt,v 1.225 2005/08/10 18:55:41 jrandom Exp $
2005-08-12 jrandom
* Keep detailed stats on the peer testing, publishing the results in the
netDb.
* Don't overwrite the status with 'unknown' unless we haven't had a valid
status in a while.
* Make sure to avoid shitlisted peers for peer testing.
* When we get an unknown result to a peer test, try again soon afterwards.
* When a peer tells us that our address is different from what we expect,
if we've done a recent peer test with a result of OK, fire off a peer
test to make sure our IP/port is still valid. If our test is old or the
result was not OK, accept their suggestion, but queue up a peer test for
later.
* Don't try to do a netDb store to a shitlisted peer, and adjust the way
we monitor netDb store progress (to clear up the high netDb.storePeers
stat)
2005-08-10 jrandom
* Deployed the peer testing implementation to be run every few minutes on

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.213 $ $Date: 2005/08/08 15:35:50 $";
public final static String ID = "$Revision: 1.214 $ $Date: 2005/08/10 18:55:41 $";
public final static String VERSION = "0.6.0.2";
public final static long BUILD = 1;
public final static long BUILD = 2;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -122,6 +122,13 @@ public class StatisticsManager implements Service {
//includeRate("router.throttleTunnelProcessingTime1m", stats, new long[] { 60*60*1000 });
includeRate("router.fastPeers", stats, new long[] { 60*60*1000 });
includeRate("udp.statusOK", stats, new long[] { 20*60*1000 });
includeRate("udp.statusDifferent", stats, new long[] { 20*60*1000 });
includeRate("udp.statusReject", stats, new long[] { 20*60*1000 });
includeRate("udp.statusUnknown", stats, new long[] { 20*60*1000 });
includeRate("udp.addressUpdated", stats, new long[] { 1*60*1000 });
includeRate("udp.addressTestInsteadOfUpdate", stats, new long[] { 1*60*1000 });
includeRate("clock.skew", stats, new long[] { 10*60*1000, 3*60*60*1000, 24*60*60*1000 });

View File

@ -38,7 +38,7 @@ class StoreJob extends JobImpl {
private long _expiration;
private PeerSelector _peerSelector;
private final static int PARALLELIZATION = 3; // how many sent at a time
private final static int PARALLELIZATION = 6; // how many sent at a time
private final static int REDUNDANCY = 6; // we want the data sent to 6 peers
/**
* additionally send to 1 outlier(s), in case all of the routers chosen in our
@ -146,7 +146,7 @@ class StoreJob extends JobImpl {
return;
}
} else {
_state.addPending(closestHashes);
//_state.addPending(closestHashes);
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Continue sending key " + _state.getTarget() + " after " + _state.getAttempted().size() + " tries to " + closestHashes);
for (Iterator iter = closestHashes.iterator(); iter.hasNext(); ) {
@ -155,8 +155,14 @@ class StoreJob extends JobImpl {
if ( (ds == null) || !(ds instanceof RouterInfo) ) {
if (_log.shouldLog(Log.WARN))
_log.warn(getJobId() + ": Error selecting closest hash that wasnt a router! " + peer + " : " + ds);
_state.addSkipped(peer);
} else {
sendStore((RouterInfo)ds);
if (getContext().shitlist().isShitlisted(((RouterInfo)ds).getIdentity().calculateHash())) {
_state.addSkipped(peer);
} else {
_state.addPending(peer);
sendStore((RouterInfo)ds);
}
}
}
}

View File

@ -102,6 +102,12 @@ class StoreState {
_attemptedPeers.addAll(pending);
}
}
/** we aren't even going to try to contact this peer */
public void addSkipped(Hash peer) {
synchronized (_attemptedPeers) {
_attemptedPeers.add(peer);
}
}
public long confirmed(Hash peer) {
long rv = -1;

View File

@ -147,7 +147,7 @@ public class PeerTestJob extends JobImpl {
ReplySelector sel = new ReplySelector(peer.getIdentity().getHash(), nonce, expiration);
PeerReplyFoundJob reply = new PeerReplyFoundJob(getContext(), peer, inTunnel, outTunnel);
PeerReplyTimeoutJob timeoutJob = new PeerReplyTimeoutJob(getContext(), peer, inTunnel, outTunnel);
PeerReplyTimeoutJob timeoutJob = new PeerReplyTimeoutJob(getContext(), peer, inTunnel, outTunnel, sel);
getContext().messageRegistry().registerPending(sel, reply, timeoutJob, timeoutMs);
getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnelId, null, peer.getIdentity().getHash());
@ -193,10 +193,12 @@ public class PeerTestJob extends JobImpl {
private long _expiration;
private long _nonce;
private Hash _peer;
private boolean _matchFound;
public ReplySelector(Hash peer, long nonce, long expiration) {
_nonce = nonce;
_expiration = expiration;
_peer = peer;
_matchFound = false;
}
public boolean continueMatching() { return false; }
public long getExpiration() { return _expiration; }
@ -213,11 +215,13 @@ public class PeerTestJob extends JobImpl {
} else {
getContext().statManager().addRateData("peer.testOK", getTestTimeout() - timeLeft, 0);
}
_matchFound = true;
return true;
}
}
return false;
}
public boolean matchFound() { return _matchFound; }
public String toString() {
StringBuffer buf = new StringBuffer(64);
buf.append("Test peer ").append(_peer.toBase64().substring(0,4));
@ -268,15 +272,20 @@ public class PeerTestJob extends JobImpl {
private RouterInfo _peer;
private TunnelInfo _replyTunnel;
private TunnelInfo _sendTunnel;
public PeerReplyTimeoutJob(RouterContext context, RouterInfo peer, TunnelInfo replyTunnel, TunnelInfo sendTunnel) {
private ReplySelector _selector;
public PeerReplyTimeoutJob(RouterContext context, RouterInfo peer, TunnelInfo replyTunnel, TunnelInfo sendTunnel, ReplySelector sel) {
super(context);
_peer = peer;
_replyTunnel = replyTunnel;
_sendTunnel = sendTunnel;
_selector = sel;
}
public String getName() { return "Peer test failed"; }
private boolean getShouldFailPeer() { return true; }
public void runJob() {
if (_selector.matchFound())
return;
if (getShouldFailPeer())
getContext().profileManager().dbLookupFailed(_peer.getIdentity().getHash());

View File

@ -164,7 +164,7 @@ public class PeerState {
*/
private static final int DEFAULT_MTU = 608;//600; //1500;
private static final int MIN_RTO = 1000 + ACKSender.ACK_FREQUENCY;
private static final int MAX_RTO = 2000; // 5000;
private static final int MAX_RTO = 3000; // 5000;
public PeerState(I2PAppContext ctx) {
_context = ctx;

View File

@ -67,6 +67,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private ExpirePeerEvent _expireEvent;
private PeerTestEvent _testEvent;
private short _reachabilityStatus;
private long _reachabilityStatusLastUpdated;
/** list of RelayPeer objects for people who will relay to us */
private List _relayPeers;
@ -152,6 +153,13 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_context.statManager().createRateStat("udp.droppedPeer", "How long ago did we receive from a dropped peer (duration == session lifetime", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.droppedPeerInactive", "How long ago did we receive from a dropped peer (duration == session lifetime)", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.peersByCapacity", "How many peers of the given capacity were available to pick between? (duration == (int)capacity)", "udp", new long[] { 1*60*1000, 5*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.statusOK", "How many times the peer test returned OK", "udp", new long[] { 5*60*1000, 20*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.statusDifferent", "How many times the peer test returned different IP/ports", "udp", new long[] { 5*60*1000, 20*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.statusReject", "How many times the peer test returned reject unsolicited", "udp", new long[] { 5*60*1000, 20*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.statusUnknown", "How many times the peer test returned an unknown result", "udp", new long[] { 5*60*1000, 20*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.addressTestInsteadOfUpdate", "How many times we fire off a peer test of ourselves instead of adjusting our own reachable address?", "udp", new long[] { 1*60*1000, 20*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.addressUpdated", "How many times we adjust our own reachable IP address", "udp", new long[] { 1*60*1000, 20*60*1000, 60*60*1000, 24*60*60*1000 });
}
public void startup() {
@ -290,28 +298,46 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
boolean fixedPort = getIsPortFixed();
boolean updated = false;
boolean fireTest = false;
synchronized (this) {
if ( (_externalListenHost == null) ||
(!eq(_externalListenHost.getAddress(), _externalListenPort, ourIP, ourPort)) ) {
try {
_externalListenHost = InetAddress.getByAddress(ourIP);
if (!fixedPort)
_externalListenPort = ourPort;
rebuildExternalAddress();
replaceAddress(_externalAddress);
updated = true;
} catch (UnknownHostException uhe) {
_externalListenHost = null;
if ( (_reachabilityStatus != CommSystemFacade.STATUS_OK) ||
(_context.clock().now() - _reachabilityStatusLastUpdated > 2*TEST_FREQUENCY) ) {
// they told us something different and our tests are either old or failing
try {
_externalListenHost = InetAddress.getByAddress(ourIP);
if (!fixedPort)
_externalListenPort = ourPort;
rebuildExternalAddress();
replaceAddress(_externalAddress);
updated = true;
} catch (UnknownHostException uhe) {
_externalListenHost = null;
}
} else {
// they told us something different, but our tests are recent and positive,
// so lets test again
fireTest = true;
}
} else {
// matched what we expect
}
}
if (!fixedPort)
_context.router().setConfigSetting(PROP_EXTERNAL_PORT, ourPort+"");
_context.router().saveConfig();
if (updated)
if (fireTest) {
_context.statManager().addRateData("udp.addressTestInsteadOfUpdate", 1, 0);
_testEvent.forceRun();
SimpleTimer.getInstance().addEvent(_testEvent, 5*1000);
} else if (updated) {
_context.statManager().addRateData("udp.addressUpdated", 1, 0);
if (!fixedPort)
_context.router().setConfigSetting(PROP_EXTERNAL_PORT, ourPort+"");
_context.router().saveConfig();
_context.router().rebuildRouterInfo();
_testEvent.forceRun();
SimpleTimer.getInstance().addEvent(_testEvent, 5*1000);
}
}
private static final boolean eq(byte laddr[], int lport, byte raddr[], int rport) {
@ -355,12 +381,25 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
public PeerState getPeerState(char capacity) {
int index = _context.random().nextInt(1024);
List peers = _peersByCapacity[capacity-'A'];
synchronized (peers) {
int size = peers.size();
if (size <= 0) return null;
index = index % size;
return (PeerState)peers.get(index);
int size = 0;
PeerState rv = null;
for (int i = 0; i < 5; i++) {
synchronized (peers) {
size = peers.size();
if (size > 0) {
index = (index + i) % size;
rv = (PeerState)peers.get(index);
}
}
if (rv == null)
break;
if (_context.shitlist().isShitlisted(rv.getRemotePeer()))
rv = null;
else
break;
}
_context.statManager().addRateData("udp.peersByCapacity", size, capacity);
return rv;
}
private static final int MAX_PEERS_PER_CAPACITY = 16;
@ -991,7 +1030,44 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
}
void setReachabilityStatus(short status) { _reachabilityStatus = status; }
/**
* If we haven't had a non-unknown test result in 5 minutes, we really dont know. Otherwise,
* when we receive an unknown we should ignore that value and try again (with different peers)
*
*/
private static final long STATUS_GRACE_PERIOD = 5*60*1000;
void setReachabilityStatus(short status) {
long now = _context.clock().now();
switch (status) {
case CommSystemFacade.STATUS_OK:
_context.statManager().addRateData("udp.statusOK", 1, 0);
_reachabilityStatus = status;
_reachabilityStatusLastUpdated = now;
break;
case CommSystemFacade.STATUS_DIFFERENT:
_context.statManager().addRateData("udp.statusDifferent", 1, 0);
_reachabilityStatus = status;
_reachabilityStatusLastUpdated = now;
break;
case CommSystemFacade.STATUS_REJECT_UNSOLICITED:
_context.statManager().addRateData("udp.statusReject", 1, 0);
_reachabilityStatus = status;
_reachabilityStatusLastUpdated = now;
break;
case CommSystemFacade.STATUS_UNKNOWN:
default:
_context.statManager().addRateData("udp.statusUnknown", 1, 0);
if (now - _reachabilityStatusLastUpdated < STATUS_GRACE_PERIOD) {
_testEvent.forceRun();
SimpleTimer.getInstance().addEvent(_testEvent, 5*1000);
} else {
_reachabilityStatus = status;
_reachabilityStatusLastUpdated = now;
}
break;
}
}
public short getReachabilityStatus() { return _reachabilityStatus; }
public void recheckReachability() {
_testEvent.runTest();
@ -1009,11 +1085,12 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private boolean _alive;
/** when did we last test our reachability */
private long _lastTested;
private boolean _forceRun;
public void timeReached() {
if (shouldTest()) {
long now = _context.clock().now();
if (now - _lastTested >= TEST_FREQUENCY) {
if ( (_forceRun) || (now - _lastTested >= TEST_FREQUENCY) ) {
runTest();
}
}
@ -1036,6 +1113,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_lastTested = _context.clock().now();
}
private void forceRun() { _forceRun = true; }
public void setIsAlive(boolean isAlive) {
_alive = isAlive;
if (isAlive) {