simple exponential decay on the exploration

(min/max frequency=1m/30m, doubling every time we don't find something new)
reduce the bredth and duration of explorations
republish less often (every 60s send out one random one [or however many explicit ones])
logging
This commit is contained in:
jrandom
2004-04-30 07:08:25 +00:00
committed by zzz
parent 0a4ddedac9
commit e246cd37dd
6 changed files with 100 additions and 23 deletions

View File

@ -25,8 +25,8 @@ import net.i2p.router.RouterContext;
class DataPublisherJob extends JobImpl {
private Log _log;
private KademliaNetworkDatabaseFacade _facade;
private final static long RERUN_DELAY_MS = 30*1000;
private final static int MAX_SEND_PER_RUN = 5; // publish no more than 5 at a time
private final static long RERUN_DELAY_MS = 60*1000;
private final static int MAX_SEND_PER_RUN = 1; // publish no more than 2 at a time
private final static long STORE_TIMEOUT = 60*1000; // give 'er a minute to send the data
public DataPublisherJob(RouterContext ctx, KademliaNetworkDatabaseFacade facade) {
@ -39,18 +39,22 @@ class DataPublisherJob extends JobImpl {
public String getName() { return "Data Publisher Job"; }
public void runJob() {
Set toSend = selectKeysToSend();
_log.info("Keys being published in this timeslice: " + toSend);
if (_log.shouldLog(Log.INFO))
_log.info("Keys being published in this timeslice: " + toSend);
for (Iterator iter = toSend.iterator(); iter.hasNext(); ) {
Hash key = (Hash)iter.next();
DataStructure data = _facade.getDataStore().get(key);
if (data == null) {
_log.warn("Trying to send a key we dont have? " + key);
if (_log.shouldLog(Log.WARN))
_log.warn("Trying to send a key we dont have? " + key);
continue;
}
if (data instanceof LeaseSet) {
LeaseSet ls = (LeaseSet)data;
if (!ls.isCurrent(Router.CLOCK_FUDGE_FACTOR)) {
_log.warn("Not publishing a lease that isn't current - " + key, new Exception("Publish expired lease?"));
if (_log.shouldLog(Log.WARN))
_log.warn("Not publishing a lease that isn't current - " + key,
new Exception("Publish expired lease?"));
}
}
StoreJob store = new StoreJob(_context, _facade, key, data, null, null, STORE_TIMEOUT);

View File

@ -28,7 +28,7 @@ class DataRepublishingSelectorJob extends JobImpl {
private KademliaNetworkDatabaseFacade _facade;
private final static long RERUN_DELAY_MS = 1*60*1000;
public final static int MAX_PASSIVE_POOL_SIZE = 30; // no need to have the pool be too big
public final static int MAX_PASSIVE_POOL_SIZE = 10; // no need to have the pool be too big
/**
* For every bucket away from us, resend period increases by 5 minutes - so we resend
@ -60,7 +60,8 @@ class DataRepublishingSelectorJob extends JobImpl {
public String getName() { return "Data Publisher Job"; }
public void runJob() {
Set toSend = selectKeysToSend();
_log.info("Keys being queued up for publishing: " + toSend);
if (_log.shouldLog(Log.INFO))
_log.info("Keys being queued up for publishing: " + toSend);
_facade.queueForPublishing(toSend);
requeue(RERUN_DELAY_MS);
}
@ -77,7 +78,8 @@ class DataRepublishingSelectorJob extends JobImpl {
alreadyQueued.addAll(_facade.getPassivelySendKeys());
int toAdd = MAX_PASSIVE_POOL_SIZE - alreadyQueued.size();
_log.debug("Keys we need to queue up to fill the passive send pool: " + toAdd);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Keys we need to queue up to fill the passive send pool: " + toAdd);
if (toAdd <= 0) return new HashSet();
alreadyQueued.addAll(_facade.getExplicitSendKeys());
@ -85,14 +87,16 @@ class DataRepublishingSelectorJob extends JobImpl {
Set keys = _facade.getDataStore().getKeys();
keys.removeAll(alreadyQueued);
_log.debug("Total number of keys in the datastore: " + keys.size());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Total number of keys in the datastore: " + keys.size());
TreeMap toSend = new TreeMap();
for (Iterator iter = keys.iterator(); iter.hasNext(); ) {
Hash key = (Hash)iter.next();
Long lastPublished = _facade.getLastSent(key);
long publishRank = rankPublishNeed(key, lastPublished);
_log.debug("Publish rank for " + key + ": " + publishRank);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Publish rank for " + key + ": " + publishRank);
if (publishRank > 0) {
while (toSend.containsKey(new Long(publishRank)))
publishRank++;
@ -124,7 +128,10 @@ class DataRepublishingSelectorJob extends JobImpl {
// last time it was sent was before the last send period
return KBucketSet.NUM_BUCKETS - bucket;
} else {
_log.info("Not republishing router " + key + " since it is really old [" + (now-ri.getPublished()) + "ms]");
if (_log.shouldLog(Log.INFO))
_log.info("Not republishing router " + key
+ " since it is really old ["
+ (now-ri.getPublished()) + "ms]");
return -2;
}
} else {
@ -134,11 +141,15 @@ class DataRepublishingSelectorJob extends JobImpl {
// last time it was sent was before the last send period
return KBucketSet.NUM_BUCKETS - bucket;
} else {
_log.info("Not republishing leaseSet " + key + " since it is really old [" + (now-ls.getEarliestLeaseDate()) + "ms]");
if (_log.shouldLog(Log.INFO))
_log.info("Not republishing leaseSet " + key
+ " since it is really old ["
+ (now-ls.getEarliestLeaseDate()) + "ms]");
return -3;
}
} else {
_log.info("Key " + key + " is not a leaseSet or routerInfo, definitely not publishing it");
if (_log.shouldLog(Log.WARN))
_log.warn("Key " + key + " is not a leaseSet or routerInfo, definitely not publishing it");
return -5;
}
}
@ -151,7 +162,9 @@ class DataRepublishingSelectorJob extends JobImpl {
// sent it within 5 minutes
int val = _context.random().nextInt(LEASE_REBROADCAST_PROBABILITY_SCALE);
if (val <= LEASE_REBROADCAST_PROBABILITY) {
_log.info("Randomized rebroadcast of leases tells us to send " + key + ": " + val);
if (_log.shouldLog(Log.INFO))
_log.info("Randomized rebroadcast of leases tells us to send "
+ key + ": " + val);
return 1;
}
}

View File

@ -33,10 +33,13 @@ class ExploreJob extends SearchJob {
private PeerSelector _peerSelector;
/** how long each exploration should run for (currently a trivial 20 seconds) */
private final static long MAX_EXPLORE_TIME = 30*1000;
private static final long MAX_EXPLORE_TIME = 20*1000;
/** how many of the peers closest to the key being explored do we want to explicitly say "dont send me this"? */
private final static int NUM_CLOSEST_TO_IGNORE = 3;
private static final int NUM_CLOSEST_TO_IGNORE = 3;
/** how many peers to explore through concurrently */
private static final int EXPLORE_BREDTH = 1;
/**
* Create a new search for the routingKey specified
@ -96,6 +99,20 @@ class ExploreJob extends SearchJob {
return buildMessage(null, _context.router().getRouterInfo(), expiration);
}
/** max # of concurrent searches */
protected int getBredth() { return EXPLORE_BREDTH; }
/**
* We've gotten a search reply that contained the specified
* number of peers that we didn't know about before.
*
*/
protected void newPeersFound(int numNewPeers) {
// who cares about how many new peers. well, maybe we do. but for now,
// we'll do the simplest thing that could possibly work.
_facade.setLastExploreNewDate(_context.clock().now());
}
/*
* We could override searchNext to see if we actually fill up a kbucket before

View File

@ -54,7 +54,8 @@ class KBucketSet {
if (numInBucket > BUCKET_SIZE) {
// perhaps queue up coallesce job? naaahh.. lets let 'er grow for now
}
_log.debug("Peer " + peer + " added to bucket " + bucket);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Peer " + peer + " added to bucket " + bucket);
return oldSize != numInBucket;
} else {
throw new IllegalArgumentException("Unable to pick a bucket. wtf!");

View File

@ -52,6 +52,9 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
private boolean _initialized;
/** Clock independent time of when we started up */
private long _started;
private StartExplorersJob _exploreJob;
/** when was the last time an exploration found something new? */
private long _lastExploreNew;
private PeerSelector _peerSelector;
private RouterContext _context;
/**
@ -84,11 +87,19 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
_initialized = false;
_peerSelector = new PeerSelector(_context);
_publishingLeaseSets = new HashSet(8);
_lastExploreNew = 0;
}
KBucketSet getKBuckets() { return _kb; }
DataStore getDataStore() { return _ds; }
long getLastExploreNewDate() { return _lastExploreNew; }
void setLastExploreNewDate(long when) {
_lastExploreNew = when;
if (_exploreJob != null)
_exploreJob.updateExploreSchedule();
}
public Set getExplicitSendKeys() {
if (!_initialized) return null;
synchronized (_explicitSendKeys) {
@ -200,8 +211,9 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
_context.jobQueue().addJob(new DataRepublishingSelectorJob(_context, this));
// fill the search queue with random keys in buckets that are too small
_context.jobQueue().addJob(new ExploreKeySelectorJob(_context, this));
_exploreJob = new StartExplorersJob(_context, this);
// fire off a group of searches from the explore pool
_context.jobQueue().addJob(new StartExplorersJob(_context, this));
_context.jobQueue().addJob(_exploreJob);
} else {
_log.warn("Operating in quiet mode - not exploring or pushing data proactively, simply reactively");
_log.warn("This should NOT be used in production");
@ -230,8 +242,9 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
if (_log.shouldLog(Log.INFO))
_log.info("Selected hash " + rhash.toBase64() + " is not stored locally");
} else if ( !(ds instanceof RouterInfo) ) {
if (_log.shouldLog(Log.WARN))
_log.warn("Selected router hash " + rhash.toBase64() + " is NOT a routerInfo!");
// could be a LeaseSet
if (_log.shouldLog(Log.DEBUG))
_log.debug("Selected router hash " + rhash.toBase64() + " is NOT a routerInfo!");
} else {
rv.add(ds);
}

View File

@ -27,8 +27,12 @@ class StartExplorersJob extends JobImpl {
private Log _log;
private KademliaNetworkDatabaseFacade _facade;
private final static long RERUN_DELAY_MS = 3*60*1000; // every 3 minutes, explore MAX_PER_RUN keys
private final static int MAX_PER_RUN = 3; // don't explore more than 1 bucket at a time
/** don't explore more than 1 bucket at a time */
private static final int MAX_PER_RUN = 1;
/** dont explore the network more often than once every minute */
private static final int MIN_RERUN_DELAY_MS = 60*1000;
/** explore the network at least once every thirty minutes */
private static final int MAX_RERUN_DELAY_MS = 30*60*1000;
public StartExplorersJob(RouterContext context, KademliaNetworkDatabaseFacade facade) {
super(context);
@ -46,7 +50,32 @@ class StartExplorersJob extends JobImpl {
//_log.info("Starting explorer for " + key, new Exception("Exploring!"));
_context.jobQueue().addJob(new ExploreJob(_context, _facade, key));
}
requeue(RERUN_DELAY_MS);
long delay = getNextRunDelay();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Reenqueueing the exploration with a delay of " + delay);
requeue(delay);
}
/**
* the exploration has found some new peers - update the schedule so that
* we'll explore appropriately.
*/
public void updateExploreSchedule() {
long delay = getNextRunDelay();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Updating exploration schedule with a delay of " + delay);
getTiming().setStartAfter(_context.clock().now() + delay);
}
/** how long should we wait before exploring? */
private long getNextRunDelay() {
long delay = _context.clock().now() - _facade.getLastExploreNewDate();
if (delay < MIN_RERUN_DELAY_MS)
return MIN_RERUN_DELAY_MS;
else if (delay > MAX_RERUN_DELAY_MS)
return MAX_RERUN_DELAY_MS;
else
return delay;
}
/**