forked from I2P_Developers/i2p.i2p
* ExploreJob/SearchJob - more fixes:
- Disable ExploreKeySelectorJob completely, just have StartExplorersJob select a random key if queue is empty - Add netDb.alwaysQuery=[B64Hash] for debugging - Queue results of exploration for more exploration - Floodfills periodically shuffle their KBuckets, and FloodfillPeerSelector sorts more keys, so that exploration works well
This commit is contained in:
@ -7,6 +7,7 @@ import java.util.List;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
import net.i2p.data.DataFormatException;
|
||||||
import net.i2p.data.DataStructure;
|
import net.i2p.data.DataStructure;
|
||||||
import net.i2p.data.Hash;
|
import net.i2p.data.Hash;
|
||||||
import net.i2p.data.LeaseSet;
|
import net.i2p.data.LeaseSet;
|
||||||
@ -32,11 +33,14 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
|
|||||||
public static final char CAPACITY_FLOODFILL = 'f';
|
public static final char CAPACITY_FLOODFILL = 'f';
|
||||||
private Map _activeFloodQueries;
|
private Map _activeFloodQueries;
|
||||||
private boolean _floodfillEnabled;
|
private boolean _floodfillEnabled;
|
||||||
|
/** for testing, see isFloodfill() below */
|
||||||
|
private static String _alwaysQuery;
|
||||||
|
|
||||||
public FloodfillNetworkDatabaseFacade(RouterContext context) {
|
public FloodfillNetworkDatabaseFacade(RouterContext context) {
|
||||||
super(context);
|
super(context);
|
||||||
_activeFloodQueries = new HashMap();
|
_activeFloodQueries = new HashMap();
|
||||||
_floodfillEnabled = false;
|
_floodfillEnabled = false;
|
||||||
|
_alwaysQuery = _context.getProperty("netDb.alwaysQuery");
|
||||||
|
|
||||||
_context.statManager().createRateStat("netDb.successTime", "How long a successful search takes", "NetworkDatabase", new long[] { 60*60*1000l, 24*60*60*1000l });
|
_context.statManager().createRateStat("netDb.successTime", "How long a successful search takes", "NetworkDatabase", new long[] { 60*60*1000l, 24*60*60*1000l });
|
||||||
_context.statManager().createRateStat("netDb.failedTime", "How long a failed search takes", "NetworkDatabase", new long[] { 60*60*1000l, 24*60*60*1000l });
|
_context.statManager().createRateStat("netDb.failedTime", "How long a failed search takes", "NetworkDatabase", new long[] { 60*60*1000l, 24*60*60*1000l });
|
||||||
@ -137,6 +141,19 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
|
|||||||
|
|
||||||
public static boolean isFloodfill(RouterInfo peer) {
|
public static boolean isFloodfill(RouterInfo peer) {
|
||||||
if (peer == null) return false;
|
if (peer == null) return false;
|
||||||
|
// For testing or local networks... we will
|
||||||
|
// pretend that the specified router is floodfill.
|
||||||
|
// Must be set at startup since it's static.
|
||||||
|
// In that router, set netDb.floodfillOnly=false.
|
||||||
|
// Warning - experts only!
|
||||||
|
if (_alwaysQuery != null) {
|
||||||
|
Hash aq = new Hash();
|
||||||
|
try {
|
||||||
|
aq.fromBase64(_alwaysQuery);
|
||||||
|
if (aq.equals(peer.getIdentity().getHash()))
|
||||||
|
return true;
|
||||||
|
} catch (DataFormatException dfe) {}
|
||||||
|
}
|
||||||
String caps = peer.getCapabilities();
|
String caps = peer.getCapabilities();
|
||||||
if ( (caps != null) && (caps.indexOf(FloodfillNetworkDatabaseFacade.CAPACITY_FLOODFILL) != -1) )
|
if ( (caps != null) && (caps.indexOf(FloodfillNetworkDatabaseFacade.CAPACITY_FLOODFILL) != -1) )
|
||||||
return true;
|
return true;
|
||||||
|
@ -78,6 +78,7 @@ class FloodfillPeerSelector extends PeerSelector {
|
|||||||
_wanted = wanted;
|
_wanted = wanted;
|
||||||
}
|
}
|
||||||
public List getFloodfillParticipants() { return _floodfillMatches; }
|
public List getFloodfillParticipants() { return _floodfillMatches; }
|
||||||
|
private static final int EXTRA_MATCHES = 100;
|
||||||
public void add(Hash entry) {
|
public void add(Hash entry) {
|
||||||
//if (_context.profileOrganizer().isFailing(entry))
|
//if (_context.profileOrganizer().isFailing(entry))
|
||||||
// return;
|
// return;
|
||||||
@ -98,7 +99,11 @@ class FloodfillPeerSelector extends PeerSelector {
|
|||||||
if (info != null && FloodfillNetworkDatabaseFacade.isFloodfill(info)) {
|
if (info != null && FloodfillNetworkDatabaseFacade.isFloodfill(info)) {
|
||||||
_floodfillMatches.add(entry);
|
_floodfillMatches.add(entry);
|
||||||
} else {
|
} else {
|
||||||
if ( (!SearchJob.onlyQueryFloodfillPeers(_context)) && (_wanted > _matches) && (_key != null) ) {
|
// This didn't really work because we stopped filling up when _wanted == _matches,
|
||||||
|
// thus we don't add and sort the whole db to find the closest.
|
||||||
|
// So we keep going for a while. This, together with periodically shuffling the
|
||||||
|
// KBucket (see KBucketImpl.add()) makes exploration work well.
|
||||||
|
if ( (!SearchJob.onlyQueryFloodfillPeers(_context)) && (_wanted + EXTRA_MATCHES > _matches) && (_key != null) ) {
|
||||||
BigInteger diff = getDistance(_key, entry);
|
BigInteger diff = getDistance(_key, entry);
|
||||||
_sorted.put(diff, entry);
|
_sorted.put(diff, entry);
|
||||||
} else {
|
} else {
|
||||||
|
@ -10,6 +10,7 @@ package net.i2p.router.networkdb.kademlia;
|
|||||||
|
|
||||||
import java.math.BigInteger;
|
import java.math.BigInteger;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -18,6 +19,7 @@ import java.util.Set;
|
|||||||
import net.i2p.I2PAppContext;
|
import net.i2p.I2PAppContext;
|
||||||
import net.i2p.data.DataHelper;
|
import net.i2p.data.DataHelper;
|
||||||
import net.i2p.data.Hash;
|
import net.i2p.data.Hash;
|
||||||
|
import net.i2p.router.RouterContext;
|
||||||
import net.i2p.util.Log;
|
import net.i2p.util.Log;
|
||||||
import net.i2p.util.RandomSource;
|
import net.i2p.util.RandomSource;
|
||||||
|
|
||||||
@ -31,12 +33,16 @@ class KBucketImpl implements KBucket {
|
|||||||
private int _begin;
|
private int _begin;
|
||||||
/** include if no bits higher than this bit (inclusive) are set */
|
/** include if no bits higher than this bit (inclusive) are set */
|
||||||
private int _end;
|
private int _end;
|
||||||
|
/** when did we last shake things up */
|
||||||
|
private long _lastShuffle;
|
||||||
|
private static final int SHUFFLE_DELAY = 10*60*1000;
|
||||||
private I2PAppContext _context;
|
private I2PAppContext _context;
|
||||||
|
|
||||||
public KBucketImpl(I2PAppContext context, Hash local) {
|
public KBucketImpl(I2PAppContext context, Hash local) {
|
||||||
_context = context;
|
_context = context;
|
||||||
_log = context.logManager().getLog(KBucketImpl.class);
|
_log = context.logManager().getLog(KBucketImpl.class);
|
||||||
_entries = new ArrayList(64); //new HashSet();
|
_entries = new ArrayList(64); //new HashSet();
|
||||||
|
_lastShuffle = context.clock().now();
|
||||||
setLocal(local);
|
setLocal(local);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -232,6 +238,13 @@ class KBucketImpl implements KBucket {
|
|||||||
synchronized (_entries) {
|
synchronized (_entries) {
|
||||||
if (!_entries.contains(peer))
|
if (!_entries.contains(peer))
|
||||||
_entries.add(peer);
|
_entries.add(peer);
|
||||||
|
// Randomize the bucket every once in a while if we are floodfill, so that
|
||||||
|
// exploration will return better results. See FloodfillPeerSelector.add(Hash).
|
||||||
|
if (_lastShuffle + SHUFFLE_DELAY < _context.clock().now() &&
|
||||||
|
!SearchJob.onlyQueryFloodfillPeers((RouterContext)_context)) {
|
||||||
|
Collections.shuffle(_entries, _context.random());
|
||||||
|
_lastShuffle = _context.clock().now();
|
||||||
|
}
|
||||||
return _entries.size();
|
return _entries.size();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -245,6 +258,9 @@ class KBucketImpl implements KBucket {
|
|||||||
/**
|
/**
|
||||||
* Generate a random key to go within this bucket
|
* Generate a random key to go within this bucket
|
||||||
*
|
*
|
||||||
|
* WARNING - Something is seriously broken here. testRand2() fails right away.
|
||||||
|
* ExploreKeySelectorJob is now disabled, ExploreJob just searches for a random
|
||||||
|
* key instead.
|
||||||
*/
|
*/
|
||||||
public Hash generateRandomKey() {
|
public Hash generateRandomKey() {
|
||||||
BigInteger variance = new BigInteger((_end-_begin)-1, _context.random());
|
BigInteger variance = new BigInteger((_end-_begin)-1, _context.random());
|
||||||
@ -336,6 +352,7 @@ class KBucketImpl implements KBucket {
|
|||||||
/**
|
/**
|
||||||
* Test harness to make sure its assigning keys to the right buckets
|
* Test harness to make sure its assigning keys to the right buckets
|
||||||
*
|
*
|
||||||
|
* WARNING - Something is seriously broken here. testRand2() fails right away.
|
||||||
*/
|
*/
|
||||||
public static void main(String args[]) {
|
public static void main(String args[]) {
|
||||||
testRand2();
|
testRand2();
|
||||||
|
@ -298,9 +298,13 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
|
|||||||
|
|
||||||
if (!_quiet) {
|
if (!_quiet) {
|
||||||
// fill the passive queue periodically
|
// fill the passive queue periodically
|
||||||
|
// Is this pointless too???
|
||||||
_context.jobQueue().addJob(new DataRepublishingSelectorJob(_context, this));
|
_context.jobQueue().addJob(new DataRepublishingSelectorJob(_context, this));
|
||||||
// fill the search queue with random keys in buckets that are too small
|
// fill the search queue with random keys in buckets that are too small
|
||||||
_context.jobQueue().addJob(new ExploreKeySelectorJob(_context, this));
|
// Disabled since KBucketImpl.generateRandomKey() is b0rked,
|
||||||
|
// and anyway, we want to search for a completely random key,
|
||||||
|
// not a random key for a particular kbucket.
|
||||||
|
// _context.jobQueue().addJob(new ExploreKeySelectorJob(_context, this));
|
||||||
if (_exploreJob == null)
|
if (_exploreJob == null)
|
||||||
_exploreJob = new StartExplorersJob(_context, this);
|
_exploreJob = new StartExplorersJob(_context, this);
|
||||||
// fire off a group of searches from the explore pool
|
// fire off a group of searches from the explore pool
|
||||||
|
@ -9,6 +9,7 @@ package net.i2p.router.networkdb.kademlia;
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
@ -121,20 +122,23 @@ class SearchJob extends JobImpl {
|
|||||||
|
|
||||||
private static final boolean DEFAULT_FLOODFILL_ONLY = true;
|
private static final boolean DEFAULT_FLOODFILL_ONLY = true;
|
||||||
|
|
||||||
|
/** this is now misnamed, as it is only used to determine whether to return floodfill peers only */
|
||||||
static boolean onlyQueryFloodfillPeers(RouterContext ctx) {
|
static boolean onlyQueryFloodfillPeers(RouterContext ctx) {
|
||||||
if (isCongested(ctx))
|
//if (isCongested(ctx))
|
||||||
return true;
|
// return true;
|
||||||
// If we are floodfill, we want the FloodfillPeerSelector (in add()) to include
|
// If we are floodfill, we want the FloodfillPeerSelector (in add()) to include
|
||||||
// non-ff peers (if required) in DatabaseSearchReplyMessage responses
|
// non-ff peers (if required) in DatabaseSearchReplyMessage responses
|
||||||
// so that Exploration works.
|
// so that Exploration works.
|
||||||
// ExploreJob is disabled if we are floodfill.
|
// ExploreJob is disabled if we are floodfill.
|
||||||
// The other two places this was called (one below and one in FNDF)
|
// The other two places this was called (one below and one in FNDF)
|
||||||
// have been commented out.
|
// have been commented out.
|
||||||
|
// Returning false essentially enables kademlia as a backup to floodfill for search responses.
|
||||||
if (FloodfillNetworkDatabaseFacade.floodfillEnabled(ctx))
|
if (FloodfillNetworkDatabaseFacade.floodfillEnabled(ctx))
|
||||||
return false;
|
return false;
|
||||||
return Boolean.valueOf(ctx.getProperty("netDb.floodfillOnly", DEFAULT_FLOODFILL_ONLY + "")).booleanValue();
|
return Boolean.valueOf(ctx.getProperty("netDb.floodfillOnly", DEFAULT_FLOODFILL_ONLY + "")).booleanValue();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/***
|
||||||
static boolean isCongested(RouterContext ctx) {
|
static boolean isCongested(RouterContext ctx) {
|
||||||
float availableSend = ctx.bandwidthLimiter().getOutboundKBytesPerSecond()*1024 - ctx.bandwidthLimiter().getSendBps();
|
float availableSend = ctx.bandwidthLimiter().getOutboundKBytesPerSecond()*1024 - ctx.bandwidthLimiter().getSendBps();
|
||||||
float availableRecv = ctx.bandwidthLimiter().getInboundKBytesPerSecond()*1024 - ctx.bandwidthLimiter().getReceiveBps();
|
float availableRecv = ctx.bandwidthLimiter().getInboundKBytesPerSecond()*1024 - ctx.bandwidthLimiter().getReceiveBps();
|
||||||
@ -142,6 +146,7 @@ class SearchJob extends JobImpl {
|
|||||||
// in that range without a problem
|
// in that range without a problem
|
||||||
return ( (availableSend < 6*1024) || (availableRecv < 6*1024) );
|
return ( (availableSend < 6*1024) || (availableRecv < 6*1024) );
|
||||||
}
|
}
|
||||||
|
***/
|
||||||
|
|
||||||
static final int PER_FLOODFILL_PEER_TIMEOUT = 10*1000;
|
static final int PER_FLOODFILL_PEER_TIMEOUT = 10*1000;
|
||||||
static final long MIN_TIMEOUT = 2500;
|
static final long MIN_TIMEOUT = 2500;
|
||||||
@ -782,6 +787,18 @@ class SearchJob extends JobImpl {
|
|||||||
|
|
||||||
boolean wasAttempted(Hash peer) { return _state.wasAttempted(peer); }
|
boolean wasAttempted(Hash peer) { return _state.wasAttempted(peer); }
|
||||||
long timeoutMs() { return _timeoutMs; }
|
long timeoutMs() { return _timeoutMs; }
|
||||||
boolean add(Hash peer) { return _facade.getKBuckets().add(peer); }
|
|
||||||
|
/** @return true if peer was new */
|
||||||
|
boolean add(Hash peer) {
|
||||||
|
boolean rv = _facade.getKBuckets().add(peer);
|
||||||
|
if (rv) {
|
||||||
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
|
_log.debug(getJobId() + ": Queueing up for next time: " + peer);
|
||||||
|
Set s = new HashSet(1);
|
||||||
|
s.add(peer);
|
||||||
|
_facade.queueForExploration(s);
|
||||||
|
}
|
||||||
|
return rv;
|
||||||
|
}
|
||||||
void decrementOutstandingFloodfillSearches() { _floodfillSearchesOutstanding--; }
|
void decrementOutstandingFloodfillSearches() { _floodfillSearchesOutstanding--; }
|
||||||
}
|
}
|
||||||
|
@ -20,6 +20,7 @@ import net.i2p.util.Log;
|
|||||||
/**
|
/**
|
||||||
* Fire off search jobs for random keys from the explore pool, up to MAX_PER_RUN
|
* Fire off search jobs for random keys from the explore pool, up to MAX_PER_RUN
|
||||||
* at a time.
|
* at a time.
|
||||||
|
* If the explore pool is empty, just search for a random key.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
class StartExplorersJob extends JobImpl {
|
class StartExplorersJob extends JobImpl {
|
||||||
@ -82,18 +83,24 @@ class StartExplorersJob extends JobImpl {
|
|||||||
/**
|
/**
|
||||||
* Run through the explore pool and pick out some values
|
* Run through the explore pool and pick out some values
|
||||||
*
|
*
|
||||||
|
* Nope, ExploreKeySelectorJob is disabled, so the explore pool
|
||||||
|
* may be empty. In that case, generate random keys.
|
||||||
*/
|
*/
|
||||||
private Set selectKeysToExplore() {
|
private Set selectKeysToExplore() {
|
||||||
Set queued = _facade.getExploreKeys();
|
Set queued = _facade.getExploreKeys();
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Keys waiting for exploration: " + queued.size());
|
_log.debug("Keys waiting for exploration: " + queued.size());
|
||||||
if (queued.size() <= MAX_PER_RUN)
|
|
||||||
return queued;
|
|
||||||
Set rv = new HashSet(MAX_PER_RUN);
|
Set rv = new HashSet(MAX_PER_RUN);
|
||||||
for (Iterator iter = queued.iterator(); iter.hasNext(); ) {
|
for (Iterator iter = queued.iterator(); iter.hasNext(); ) {
|
||||||
if (rv.size() >= MAX_PER_RUN) break;
|
if (rv.size() >= MAX_PER_RUN) break;
|
||||||
rv.add(iter.next());
|
rv.add(iter.next());
|
||||||
}
|
}
|
||||||
|
for (int i = rv.size(); i < MAX_PER_RUN; i++) {
|
||||||
|
byte hash[] = new byte[Hash.HASH_LENGTH];
|
||||||
|
getContext().random().nextBytes(hash);
|
||||||
|
Hash key = new Hash(hash);
|
||||||
|
rv.add(key);
|
||||||
|
}
|
||||||
return rv;
|
return rv;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user