NetDB: Fixes for hidden routers losing peers (ticket #2673)

Explore more aggressively, increase thresholds
Explore with standard non-explore lookup if low on floodfills
Run RefreshRoutersJob if low on floodfills
Refactor SearchReplyJob to process all hashes at once
Transport: Use NTCP and SSU equally if hidden
This commit is contained in:
zzz
2019-12-17 16:17:15 +00:00
parent ad3c978c7c
commit d054652952
7 changed files with 130 additions and 104 deletions

View File

@ -1,3 +1,20 @@
2019-12-17 zzz
* NetDB: Fixes for hidden routers losing peers (ticket #2673)
2019-12-16 zzz
* Console: Partial az translation
2019-12-15 zzz
* Console:
- Hide services sidebar section if empty
- Fix Hebrew translation
2019-12-14 zzz
* Console:
- Add Content-Disposition header to graphs
- Stat group display names
* Router: Add new known peers stat
2019-12-03 zzz
* NDT: Numerous fixes (ticket #2672)
* OCMOSJ: Cancel timeout job on reply

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 2;
public final static long BUILD = 3;
/** for example "-test" */
public final static String EXTRA = "";

View File

@ -29,11 +29,12 @@ import net.i2p.util.Log;
*
*/
class ExploreJob extends SearchJob {
private FloodfillPeerSelector _peerSelector;
private final FloodfillPeerSelector _peerSelector;
private final boolean _isRealExplore;
/** how long each exploration should run for
* The exploration won't "succeed" so we make it long so we query several peers */
private static final long MAX_EXPLORE_TIME = 15*1000;
private static final long MAX_EXPLORE_TIME = 30*1000;
/** how many peers to explore through concurrently */
private static final int EXPLORE_BREDTH = 1;
@ -50,13 +51,16 @@ class ExploreJob extends SearchJob {
/**
* Create a new search for the routingKey specified
*
* @param isRealExplore if true, a standard exploration (no floodfills will be returned)
* if false, a standard lookup (floodfills will be returned, use if low on floodfills)
*/
public ExploreJob(RouterContext context, KademliaNetworkDatabaseFacade facade, Hash key) {
public ExploreJob(RouterContext context, KademliaNetworkDatabaseFacade facade, Hash key, boolean isRealExplore) {
// note that we're treating the last param (isLease) as *false* since we're just exploring.
// if this collides with an actual leaseSet's key, neat, but that wouldn't imply we're actually
// attempting to send that lease a message!
super(context, facade, key, null, null, MAX_EXPLORE_TIME, false, false);
_peerSelector = (FloodfillPeerSelector) (_facade.getPeerSelector());
_isRealExplore = isRealExplore;
}
/**
@ -93,6 +97,7 @@ class ExploreJob extends SearchJob {
msg.setReplyTunnel(replyTunnelId);
int available = MAX_CLOSEST - dontIncludePeers.size();
if (_isRealExplore) {
if (available > 0) {
// Add a flag to say this is an exploration and we don't want floodfills in the responses.
// Doing it this way is of course backwards-compatible.
@ -102,6 +107,9 @@ class ExploreJob extends SearchJob {
}
// supported as of 0.9.16. TODO remove fake hash above
msg.setSearchType(DatabaseLookupMessage.Type.EXPL);
} else {
msg.setSearchType(DatabaseLookupMessage.Type.RI);
}
KBucketSet<Hash> ks = _facade.getKBuckets();
Hash rkey = getContext().routingKeyGenerator().getRoutingKey(getState().getTarget());

View File

@ -21,18 +21,29 @@ import net.i2p.util.Log;
* To improve integration even more, we fetch the floodfills first.
* Ideally this should complete within the first half-hour of uptime.
*
* As of 0.9.45, periodically rerun, to maintain a minimum number of
* floodfills, primarily for hidden mode. StartExplorersJob will get us
* to about 100 ffs and maintain that for a while, but they will eventually
* start to expire. Use this to get us to 300 or more. Each pass of this
* will gain us about 150 ffs. If we have more than 300 ffs, we just
* requeue to check later. Otherwise this will grow our netdb
* almost unbounded, as it prevents most normal expiration.
*
* @since 0.8.8
*/
class RefreshRoutersJob extends JobImpl {
private final Log _log;
private final FloodfillNetworkDatabaseFacade _facade;
private List<Hash> _routers;
private boolean _wasRun;
/** rerun fairly often. 1000 routers in 50 minutes
* Don't go faster as this overloads the expl. OBEP / IBGW
*/
private final static long RERUN_DELAY_MS = 3*1000;
private final static long EXPIRE = 2*60*60*1000;
private final static long NEW_LOOP_DELAY = 37*60*1000;
private static final int ENOUGH_FFS = 3 * StartExplorersJob.LOW_FFS;
public RefreshRoutersJob(RouterContext ctx, FloodfillNetworkDatabaseFacade facade) {
super(ctx);
@ -45,6 +56,15 @@ class RefreshRoutersJob extends JobImpl {
public void runJob() {
if (_facade.isInitialized()) {
if (_routers == null) {
if (_wasRun) {
int ffs = getContext().peerManager().countPeersByCapability(FloodfillNetworkDatabaseFacade.CAPABILITY_FLOODFILL);
if (ffs >= ENOUGH_FFS) {
requeue(NEW_LOOP_DELAY);
return;
}
} else {
_wasRun = true;
}
// make a list of all routers, floodfill first
_routers = _facade.getFloodfillPeers();
int ff = _routers.size();
@ -58,6 +78,11 @@ class RefreshRoutersJob extends JobImpl {
if (_routers.isEmpty()) {
if (_log.shouldLog(Log.INFO))
_log.info("Finished");
// despite best efforts in StartExplorersJob,
// hidden mode routers have trouble keeping peers
// but we'll do this for everybody just in case
_routers = null;
requeue(NEW_LOOP_DELAY);
return;
}
long expire = getContext().clock().now() - EXPIRE;

View File

@ -17,8 +17,8 @@ import net.i2p.util.Log;
*/
class SearchReplyJob extends JobImpl {
private DatabaseSearchReplyMessage _msg;
private Log _log;
private final DatabaseSearchReplyMessage _msg;
private final Log _log;
/**
* Peer who we think sent us the reply. Note: could be spoofed! If the
* attacker knew we were searching for a particular key from a
@ -28,51 +28,46 @@ class SearchReplyJob extends JobImpl {
* nonce in the search + searchReply (and check for it in the selector).
*
*/
private Hash _peer;
private int _curIndex;
private int _invalidPeers;
private final Hash _peer;
private int _seenPeers;
private int _newPeers;
private int _duplicatePeers;
private int _repliesPendingVerification;
private long _duration;
private SearchJob _searchJob;
private final long _duration;
private final SearchJob _searchJob;
public SearchReplyJob(RouterContext enclosingContext, SearchJob job, DatabaseSearchReplyMessage message, Hash peer, long duration) {
super(enclosingContext);
_log = enclosingContext.logManager().getLog(getClass());
_searchJob = job;
_msg = message;
_peer = peer;
_curIndex = 0;
_invalidPeers = 0;
_seenPeers = 0;
_newPeers = 0;
_duplicatePeers = 0;
_repliesPendingVerification = 0;
if (duration > 0)
_duration = duration;
else
_duration = 0;
}
public String getName() { return "Process Reply for Kademlia Search"; }
public void runJob() {
if (_curIndex >= _msg.getNumReplies()) {
if (_log.shouldLog(Log.DEBUG) && _msg.getNumReplies() == 0)
int count = _msg.getNumReplies();
for (int i = 0; i < count; i++) {
processPeer(i);
}
if (count == 0 && _log.shouldDebug())
_log.debug(getJobId() + ": dbSearchReply received with no routers referenced");
if (_repliesPendingVerification > 0) {
// we received new references from the peer, but still
// haven't verified all of them, so lets give it more time
requeue(_searchJob.timeoutMs());
} else {
// either they didn't tell us anything new or we have verified
// (or failed to verify) all of them. we're done
getContext().profileManager().dbLookupReply(_peer, _newPeers, _seenPeers,
_invalidPeers, _duplicatePeers, _duration);
0, _duplicatePeers, _duration);
if (_newPeers > 0)
_searchJob.newPeersFound(_newPeers);
}
} else {
Hash peer = _msg.getReply(_curIndex);
private void processPeer(int curIndex) {
Hash peer = _msg.getReply(curIndex);
boolean shouldAdd = false;
@ -116,53 +111,5 @@ class SearchReplyJob extends JobImpl {
else
_seenPeers++;
}
_curIndex++;
requeue(0);
}
}
void replyVerified() {
if (_log.shouldLog(Log.INFO))
_log.info("Peer reply from " + _peer.toBase64());
_repliesPendingVerification--;
getContext().statManager().addRateData("netDb.searchReplyValidated", 1);
}
void replyNotVerified() {
if (_log.shouldLog(Log.INFO))
_log.info("Peer reply from " + _peer.toBase64());
_repliesPendingVerification--;
_invalidPeers++;
getContext().statManager().addRateData("netDb.searchReplyNotValidated", 1);
}
}
/** the peer gave us a reference to a new router, and we were able to fetch it */
/***
class ReplyVerifiedJob extends JobImpl {
private Hash _key;
private SearchReplyJob _replyJob;
public ReplyVerifiedJob(RouterContext enclosingContext, SearchReplyJob srj, Hash key) {
super(enclosingContext);
_replyJob = srj;
_key = key;
}
public String getName() { return "Search reply value verified"; }
public void runJob() { _replyJob.replyVerified(); }
}
***/
/** the peer gave us a reference to a new router, and we were NOT able to fetch it */
/***
class ReplyNotVerifiedJob extends JobImpl {
private Hash _key;
private SearchReplyJob _replyJob;
public ReplyNotVerifiedJob(RouterContext enclosingContext, SearchReplyJob srj, Hash key) {
super(enclosingContext);
_key = key;
_replyJob = srj;
}
public String getName() { return "Search reply value NOT verified"; }
public void runJob() { _replyJob.replyNotVerified(); }
}
***/

View File

@ -23,6 +23,9 @@ import net.i2p.util.Log;
* at a time.
* If the explore pool is empty, just search for a random key.
*
* For hidden mode routers, this is the primary mechanism for staying integrated.
* The goal is to keep known router count above LOW_ROUTERS and
* the known floodfill count above LOW_FFS.
*/
class StartExplorersJob extends JobImpl {
private final Log _log;
@ -31,17 +34,23 @@ class StartExplorersJob extends JobImpl {
/** don't explore more than 1 bucket at a time */
private static final int MAX_PER_RUN = 1;
/** dont explore the network more often than this */
private static final int MIN_RERUN_DELAY_MS = 99*1000;
private static final int MIN_RERUN_DELAY_MS = 55*1000;
/** explore the network at least this often */
private static final int MAX_RERUN_DELAY_MS = 15*60*1000;
/** aggressively explore during this time - same as KNDF expiration grace period */
private static final int STARTUP_TIME = 60*60*1000;
/** super-aggressively explore if we have less than this many routers */
private static final int LOW_ROUTERS = 125;
/** super-aggressively explore if we have less than this many routers.
The goal here is to avoid reseeding.
*/
/** very aggressively explore if we have less than this many routers */
private static final int MIN_ROUTERS = 3 * KademliaNetworkDatabaseFacade.MIN_RESEED;
/** aggressively explore if we have less than this many routers */
private static final int MIN_ROUTERS = 250;
private static final int LOW_ROUTERS = 2 * MIN_ROUTERS;
/** explore slowly if we have more than this many routers */
private static final int MAX_ROUTERS = 800;
private static final int MAX_ROUTERS = 2 * LOW_ROUTERS;
private static final int MIN_FFS = 50;
static final int LOW_FFS = 2 * MIN_FFS;
private static final long MAX_LAG = 100;
private static final long MAX_MSG_DELAY = 1500;
@ -60,21 +69,37 @@ class StartExplorersJob extends JobImpl {
// message delay limit also?
getContext().router().gracefulShutdownInProgress())) {
int num = MAX_PER_RUN;
if (_facade.getDataStore().size() < LOW_ROUTERS)
num *= 3;
int count = _facade.getDataStore().size();
if (count < MIN_ROUTERS)
num *= 15; // at less than 3x MIN_RESEED, explore extremely aggressively
else if (count < LOW_ROUTERS)
num *= 10; // 3x was not sufficient to keep hidden routers from losing peers
if (getContext().router().getUptime() < STARTUP_TIME)
num *= 3;
num *= 2;
Set<Hash> toExplore = selectKeysToExplore(num);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Keys to explore during this run: " + toExplore);
_log.debug("Keys to explore during this run: " + toExplore + ", wanted " + num + ", got " + toExplore.size());
_facade.removeFromExploreKeys(toExplore);
long delay = 0;
// If we're below about 30 ffs, standard exploration stops working well.
// A non-exploratory "exploration" finds us floodfills quickly.
// This is vital when in hidden mode, where this is our primary method
// of maintaining sufficient peers and avoiding repeated reseeding.
int ffs = getContext().peerManager().countPeersByCapability(FloodfillNetworkDatabaseFacade.CAPABILITY_FLOODFILL);
boolean needffs = ffs < MIN_FFS;
boolean lowffs = ffs < LOW_FFS;
for (Hash key : toExplore) {
ExploreJob j = new ExploreJob(getContext(), _facade, key);
// Last param false means get floodfills (non-explore)
// This is very effective so we don't need to do it often
boolean realexpl = !((needffs && getContext().random().nextInt(2) == 0) ||
(lowffs && getContext().random().nextInt(4) == 0));
ExploreJob j = new ExploreJob(getContext(), _facade, key, realexpl);
if (delay > 0)
j.getTiming().setStartAfter(getContext().clock().now() + delay);
getContext().jobQueue().addJob(j);
delay += 200;
// spread them out
delay += 1250;
}
}
long delay = getNextRunDelay();
@ -141,8 +166,8 @@ class StartExplorersJob extends JobImpl {
_log.debug("Keys waiting for exploration: " + queued.size());
Set<Hash> rv = new HashSet<Hash>(num);
for (Hash key : queued) {
if (rv.size() >= num) break;
rv.add(key);
if (rv.size() >= num) break;
}
for (int i = rv.size(); i < num; i++) {
byte hash[] = new byte[Hash.HASH_LENGTH];

View File

@ -1939,7 +1939,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
// (especially when we have an IPv6 address and the increased minimums),
// and if UDP is completely blocked we'll still have some connectivity.
// TODO After some time, decide that UDP is blocked/broken and return TRANSIENT_FAIL_BID?
if (_context.random().nextInt(4) == 0)
// Even more if hidden.
// We'll have very low connection counts, and we don't need peer testing
int ratio = _context.router().isHidden() ? 2 : 4;
if (_context.random().nextInt(ratio) == 0)
return _cachedBid[SLOWEST_BID];
else
return _cachedBid[SLOW_PREFERRED_BID];