- Locking fixes on peer testing
   - More locking fixes on rebuilding address
   - Slow down peer test frequency, esp. when firewalled
* Transports:
   - Deprecate unused recheckReachability()
This commit is contained in:
zzz
2014-04-27 18:46:11 +00:00
parent aa0616d7c5
commit 87889bb322
7 changed files with 91 additions and 22 deletions

View File

@ -53,7 +53,12 @@ public abstract class CommSystemFacade implements Service {
*
*/
public short getReachabilityStatus() { return STATUS_OK; }
/**
* @deprecated unused
*/
public void recheckReachability() {}
public boolean isBacklogged(Hash dest) { return false; }
public boolean wasUnreachable(Hash dest) { return false; }
public boolean isEstablished(Hash dest) { return false; }

View File

@ -173,6 +173,10 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
return STATUS_OK;
return rv;
}
/**
* @deprecated unused
*/
@Override
public void recheckReachability() { _manager.recheckReachability(); }

View File

@ -132,7 +132,12 @@ public interface Transport {
public void renderStatusHTML(Writer out, String urlBase, int sortFlags) throws IOException;
public short getReachabilityStatus();
/**
* @deprecated unused
*/
public void recheckReachability();
public boolean isBacklogged(Hash peer);
/**

View File

@ -703,7 +703,12 @@ public abstract class TransportImpl implements Transport {
public void renderStatusHTML(Writer out, String urlBase, int sortFlags) throws IOException { renderStatusHTML(out); }
public short getReachabilityStatus() { return CommSystemFacade.STATUS_UNKNOWN; }
/**
* @deprecated unused
*/
public void recheckReachability() {}
public boolean isBacklogged(Hash dest) { return false; }
public boolean isEstablished(Hash dest) { return false; }

View File

@ -308,6 +308,9 @@ public class TransportManager implements TransportEventListener {
return rv;
}
/**
* @deprecated unused
*/
public void recheckReachability() {
for (Transport t : _transports.values())
t.recheckReachability();

View File

@ -77,6 +77,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private final PacketBuilder _destroyBuilder;
private short _reachabilityStatus;
private long _reachabilityStatusLastUpdated;
private int _reachabilityStatusUnchanged;
private long _introducersSelectedOn;
private long _lastInboundReceivedOn;
private final DHSessionKeyBuilder.Factory _dhFactory;
@ -189,6 +190,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
public static final int DEFAULT_COST = 5;
private static final int TEST_FREQUENCY = 13*60*1000;
private static final int MIN_TEST_FREQUENCY = 45*1000;
static final long[] RATES = { 10*60*1000 };
private static final int[] BID_VALUES = { 15, 20, 50, 65, 80, 95, 100, 115, TransportBid.TRANSIENT_FAIL };
@ -909,8 +911,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
_context.router().rebuildRouterInfo();
}
_testEvent.forceRun();
_testEvent.reschedule(5*1000);
_testEvent.forceRunImmediately();
return updated;
}
@ -1160,11 +1161,12 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (oldEstablishedOn > 0)
_context.statManager().addRateData("udp.alreadyConnected", oldEstablishedOn, 0);
rebuildIfNecessary();
if (getReachabilityStatus() != CommSystemFacade.STATUS_OK) {
_testEvent.forceRun();
_testEvent.reschedule(0);
synchronized(_rebuildLock) {
rebuildIfNecessary();
if (getReachabilityStatus() != CommSystemFacade.STATUS_OK &&
_reachabilityStatusUnchanged < 7) {
_testEvent.forceRunSoon();
}
}
return true;
}
@ -1831,6 +1833,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
boolean introducersRequired = (!isIPv6) && introducersRequired();
boolean introducersIncluded = false;
if (introducersRequired || !directIncluded) {
// FIXME intro manager doesn't sort introducers, so
// deepEquals() below can fail even with same introducers.
// Only a problem when we have very very few peers to pick from.
int found = _introManager.pickInbound(options, PUBLIC_RELAY_COUNT);
if (found > 0) {
if (_log.shouldLog(Log.INFO))
@ -3014,14 +3019,25 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
//}
break;
}
_testEvent.setLastTested();
if (status != CommSystemFacade.STATUS_UNKNOWN) {
if (status != old)
_reachabilityStatusUnchanged = 0;
else
_reachabilityStatusUnchanged++;
}
if ( (status != old) && (status != CommSystemFacade.STATUS_UNKNOWN) ) {
if (_log.shouldLog(Log.INFO))
_log.info("Old status: " + old + " New status: " + status + " from: ", new Exception("traceback"));
if (_log.shouldLog(Log.WARN))
_log.warn("Old status: " + old + " New status: " + status + " from: ", new Exception("traceback"));
// Always rebuild when the status changes, even if our address hasn't changed,
// as rebuildExternalAddress() calls replaceAddress() which calls CSFI.notifyReplaceAddress()
// which will start up NTCP inbound when we transition to OK.
// if (needsRebuild())
rebuildExternalAddress();
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Status unchanged: " + _reachabilityStatus + " (" + _reachabilityStatusUnchanged + " consecutive times), last updated " +
DataHelper.formatDuration(_context.clock().now() - _reachabilityStatusLastUpdated) + " ago");
}
}
@ -3043,9 +3059,13 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
return _reachabilityStatus;
}
/**
* @deprecated unused
*/
@Override
public void recheckReachability() {
_testEvent.runTest();
// FIXME locking if we do this again
//_testEvent.runTest();
}
/**
@ -3105,7 +3125,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
* Initiate a test (we are Alice)
*/
private class PeerTestEvent extends SimpleTimer2.TimedEvent {
private volatile boolean _alive;
private boolean _alive;
/** when did we last test our reachability */
private long _lastTested;
private boolean _forceRun;
@ -3114,22 +3134,20 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
super(_context.simpleTimer2());
}
public void timeReached() {
public synchronized void timeReached() {
if (shouldTest()) {
long now = _context.clock().now();
if ( (_forceRun) || (now - _lastTested >= TEST_FREQUENCY) ) {
runTest();
long sinceRun = _context.clock().now() - _lastTested;
if ( (_forceRun && sinceRun >= MIN_TEST_FREQUENCY) || (sinceRun >= TEST_FREQUENCY) ) {
locked_runTest();
}
}
if (_alive) {
long delay = (TEST_FREQUENCY / 2) + _context.random().nextInt(TEST_FREQUENCY);
if (delay <= 0)
throw new RuntimeException("wtf, delay is " + delay);
schedule(delay);
}
}
private void runTest() {
private void locked_runTest() {
PeerState bob = pickTestPeer(BOB, null);
if (bob != null) {
if (_log.shouldLog(Log.INFO))
@ -3145,9 +3163,27 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_forceRun = false;
}
void forceRun() { _forceRun = true; }
/**
* Run within the next 45 seconds at the latest
* @since 0.9.13
*/
public synchronized void forceRunSoon() {
_forceRun = true;
reschedule(MIN_TEST_FREQUENCY);
}
public void setIsAlive(boolean isAlive) {
/**
*
* Run within the next 5 seconds at the latest
* @since 0.9.13
*/
public synchronized void forceRunImmediately() {
_lastTested = 0;
_forceRun = true;
reschedule(5*1000);
}
public synchronized void setIsAlive(boolean isAlive) {
_alive = isAlive;
if (isAlive) {
long delay = _context.random().nextInt(2*TEST_FREQUENCY);
@ -3156,6 +3192,15 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
cancel();
}
}
/**
* Set the last-tested timer to now
* @since 0.9.13
*/
public synchronized void setLastTested() {
_lastTested = _context.clock().now();
}
}
/**