From e246cd37dd529cfd08b712a02b8d1ca7ca3f1f90 Mon Sep 17 00:00:00 2001 From: jrandom Date: Fri, 30 Apr 2004 07:08:25 +0000 Subject: [PATCH] simple exponential decay on the exploration (min/max frequency=1m/30m, doubling every time we don't find something new) reduce the bredth and duration of explorations republish less often (every 60s send out one random one [or however many explicit ones]) logging --- .../networkdb/kademlia/DataPublisherJob.java | 14 +++++--- .../kademlia/DataRepublishingSelectorJob.java | 31 +++++++++++----- .../router/networkdb/kademlia/ExploreJob.java | 21 +++++++++-- .../router/networkdb/kademlia/KBucketSet.java | 3 +- .../KademliaNetworkDatabaseFacade.java | 19 ++++++++-- .../networkdb/kademlia/StartExplorersJob.java | 35 +++++++++++++++++-- 6 files changed, 100 insertions(+), 23 deletions(-) diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/DataPublisherJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/DataPublisherJob.java index 6890d57ce0..9f08aba68d 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/DataPublisherJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/DataPublisherJob.java @@ -25,8 +25,8 @@ import net.i2p.router.RouterContext; class DataPublisherJob extends JobImpl { private Log _log; private KademliaNetworkDatabaseFacade _facade; - private final static long RERUN_DELAY_MS = 30*1000; - private final static int MAX_SEND_PER_RUN = 5; // publish no more than 5 at a time + private final static long RERUN_DELAY_MS = 60*1000; + private final static int MAX_SEND_PER_RUN = 1; // publish no more than 2 at a time private final static long STORE_TIMEOUT = 60*1000; // give 'er a minute to send the data public DataPublisherJob(RouterContext ctx, KademliaNetworkDatabaseFacade facade) { @@ -39,18 +39,22 @@ class DataPublisherJob extends JobImpl { public String getName() { return "Data Publisher Job"; } public void runJob() { Set toSend = selectKeysToSend(); - _log.info("Keys being published in this timeslice: " + toSend); + if (_log.shouldLog(Log.INFO)) + _log.info("Keys being published in this timeslice: " + toSend); for (Iterator iter = toSend.iterator(); iter.hasNext(); ) { Hash key = (Hash)iter.next(); DataStructure data = _facade.getDataStore().get(key); if (data == null) { - _log.warn("Trying to send a key we dont have? " + key); + if (_log.shouldLog(Log.WARN)) + _log.warn("Trying to send a key we dont have? " + key); continue; } if (data instanceof LeaseSet) { LeaseSet ls = (LeaseSet)data; if (!ls.isCurrent(Router.CLOCK_FUDGE_FACTOR)) { - _log.warn("Not publishing a lease that isn't current - " + key, new Exception("Publish expired lease?")); + if (_log.shouldLog(Log.WARN)) + _log.warn("Not publishing a lease that isn't current - " + key, + new Exception("Publish expired lease?")); } } StoreJob store = new StoreJob(_context, _facade, key, data, null, null, STORE_TIMEOUT); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/DataRepublishingSelectorJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/DataRepublishingSelectorJob.java index 717284e256..bed3455cd8 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/DataRepublishingSelectorJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/DataRepublishingSelectorJob.java @@ -28,7 +28,7 @@ class DataRepublishingSelectorJob extends JobImpl { private KademliaNetworkDatabaseFacade _facade; private final static long RERUN_DELAY_MS = 1*60*1000; - public final static int MAX_PASSIVE_POOL_SIZE = 30; // no need to have the pool be too big + public final static int MAX_PASSIVE_POOL_SIZE = 10; // no need to have the pool be too big /** * For every bucket away from us, resend period increases by 5 minutes - so we resend @@ -60,7 +60,8 @@ class DataRepublishingSelectorJob extends JobImpl { public String getName() { return "Data Publisher Job"; } public void runJob() { Set toSend = selectKeysToSend(); - _log.info("Keys being queued up for publishing: " + toSend); + if (_log.shouldLog(Log.INFO)) + _log.info("Keys being queued up for publishing: " + toSend); _facade.queueForPublishing(toSend); requeue(RERUN_DELAY_MS); } @@ -77,7 +78,8 @@ class DataRepublishingSelectorJob extends JobImpl { alreadyQueued.addAll(_facade.getPassivelySendKeys()); int toAdd = MAX_PASSIVE_POOL_SIZE - alreadyQueued.size(); - _log.debug("Keys we need to queue up to fill the passive send pool: " + toAdd); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Keys we need to queue up to fill the passive send pool: " + toAdd); if (toAdd <= 0) return new HashSet(); alreadyQueued.addAll(_facade.getExplicitSendKeys()); @@ -85,14 +87,16 @@ class DataRepublishingSelectorJob extends JobImpl { Set keys = _facade.getDataStore().getKeys(); keys.removeAll(alreadyQueued); - _log.debug("Total number of keys in the datastore: " + keys.size()); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Total number of keys in the datastore: " + keys.size()); TreeMap toSend = new TreeMap(); for (Iterator iter = keys.iterator(); iter.hasNext(); ) { Hash key = (Hash)iter.next(); Long lastPublished = _facade.getLastSent(key); long publishRank = rankPublishNeed(key, lastPublished); - _log.debug("Publish rank for " + key + ": " + publishRank); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Publish rank for " + key + ": " + publishRank); if (publishRank > 0) { while (toSend.containsKey(new Long(publishRank))) publishRank++; @@ -124,7 +128,10 @@ class DataRepublishingSelectorJob extends JobImpl { // last time it was sent was before the last send period return KBucketSet.NUM_BUCKETS - bucket; } else { - _log.info("Not republishing router " + key + " since it is really old [" + (now-ri.getPublished()) + "ms]"); + if (_log.shouldLog(Log.INFO)) + _log.info("Not republishing router " + key + + " since it is really old [" + + (now-ri.getPublished()) + "ms]"); return -2; } } else { @@ -134,11 +141,15 @@ class DataRepublishingSelectorJob extends JobImpl { // last time it was sent was before the last send period return KBucketSet.NUM_BUCKETS - bucket; } else { - _log.info("Not republishing leaseSet " + key + " since it is really old [" + (now-ls.getEarliestLeaseDate()) + "ms]"); + if (_log.shouldLog(Log.INFO)) + _log.info("Not republishing leaseSet " + key + + " since it is really old [" + + (now-ls.getEarliestLeaseDate()) + "ms]"); return -3; } } else { - _log.info("Key " + key + " is not a leaseSet or routerInfo, definitely not publishing it"); + if (_log.shouldLog(Log.WARN)) + _log.warn("Key " + key + " is not a leaseSet or routerInfo, definitely not publishing it"); return -5; } } @@ -151,7 +162,9 @@ class DataRepublishingSelectorJob extends JobImpl { // sent it within 5 minutes int val = _context.random().nextInt(LEASE_REBROADCAST_PROBABILITY_SCALE); if (val <= LEASE_REBROADCAST_PROBABILITY) { - _log.info("Randomized rebroadcast of leases tells us to send " + key + ": " + val); + if (_log.shouldLog(Log.INFO)) + _log.info("Randomized rebroadcast of leases tells us to send " + + key + ": " + val); return 1; } } diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/ExploreJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/ExploreJob.java index 687c2c9572..6c7cd463d8 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/ExploreJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/ExploreJob.java @@ -33,10 +33,13 @@ class ExploreJob extends SearchJob { private PeerSelector _peerSelector; /** how long each exploration should run for (currently a trivial 20 seconds) */ - private final static long MAX_EXPLORE_TIME = 30*1000; + private static final long MAX_EXPLORE_TIME = 20*1000; /** how many of the peers closest to the key being explored do we want to explicitly say "dont send me this"? */ - private final static int NUM_CLOSEST_TO_IGNORE = 3; + private static final int NUM_CLOSEST_TO_IGNORE = 3; + + /** how many peers to explore through concurrently */ + private static final int EXPLORE_BREDTH = 1; /** * Create a new search for the routingKey specified @@ -96,6 +99,20 @@ class ExploreJob extends SearchJob { return buildMessage(null, _context.router().getRouterInfo(), expiration); } + /** max # of concurrent searches */ + protected int getBredth() { return EXPLORE_BREDTH; } + + + /** + * We've gotten a search reply that contained the specified + * number of peers that we didn't know about before. + * + */ + protected void newPeersFound(int numNewPeers) { + // who cares about how many new peers. well, maybe we do. but for now, + // we'll do the simplest thing that could possibly work. + _facade.setLastExploreNewDate(_context.clock().now()); + } /* * We could override searchNext to see if we actually fill up a kbucket before diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/KBucketSet.java b/router/java/src/net/i2p/router/networkdb/kademlia/KBucketSet.java index ffe52d63a3..33f2e0970e 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/KBucketSet.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/KBucketSet.java @@ -54,7 +54,8 @@ class KBucketSet { if (numInBucket > BUCKET_SIZE) { // perhaps queue up coallesce job? naaahh.. lets let 'er grow for now } - _log.debug("Peer " + peer + " added to bucket " + bucket); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Peer " + peer + " added to bucket " + bucket); return oldSize != numInBucket; } else { throw new IllegalArgumentException("Unable to pick a bucket. wtf!"); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java index ed44c9c458..6d3e3fab19 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java @@ -52,6 +52,9 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { private boolean _initialized; /** Clock independent time of when we started up */ private long _started; + private StartExplorersJob _exploreJob; + /** when was the last time an exploration found something new? */ + private long _lastExploreNew; private PeerSelector _peerSelector; private RouterContext _context; /** @@ -84,11 +87,19 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { _initialized = false; _peerSelector = new PeerSelector(_context); _publishingLeaseSets = new HashSet(8); + _lastExploreNew = 0; } KBucketSet getKBuckets() { return _kb; } DataStore getDataStore() { return _ds; } + long getLastExploreNewDate() { return _lastExploreNew; } + void setLastExploreNewDate(long when) { + _lastExploreNew = when; + if (_exploreJob != null) + _exploreJob.updateExploreSchedule(); + } + public Set getExplicitSendKeys() { if (!_initialized) return null; synchronized (_explicitSendKeys) { @@ -200,8 +211,9 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { _context.jobQueue().addJob(new DataRepublishingSelectorJob(_context, this)); // fill the search queue with random keys in buckets that are too small _context.jobQueue().addJob(new ExploreKeySelectorJob(_context, this)); + _exploreJob = new StartExplorersJob(_context, this); // fire off a group of searches from the explore pool - _context.jobQueue().addJob(new StartExplorersJob(_context, this)); + _context.jobQueue().addJob(_exploreJob); } else { _log.warn("Operating in quiet mode - not exploring or pushing data proactively, simply reactively"); _log.warn("This should NOT be used in production"); @@ -230,8 +242,9 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { if (_log.shouldLog(Log.INFO)) _log.info("Selected hash " + rhash.toBase64() + " is not stored locally"); } else if ( !(ds instanceof RouterInfo) ) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Selected router hash " + rhash.toBase64() + " is NOT a routerInfo!"); + // could be a LeaseSet + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Selected router hash " + rhash.toBase64() + " is NOT a routerInfo!"); } else { rv.add(ds); } diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/StartExplorersJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/StartExplorersJob.java index 1a86c1e84b..aae74dcb03 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/StartExplorersJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/StartExplorersJob.java @@ -27,8 +27,12 @@ class StartExplorersJob extends JobImpl { private Log _log; private KademliaNetworkDatabaseFacade _facade; - private final static long RERUN_DELAY_MS = 3*60*1000; // every 3 minutes, explore MAX_PER_RUN keys - private final static int MAX_PER_RUN = 3; // don't explore more than 1 bucket at a time + /** don't explore more than 1 bucket at a time */ + private static final int MAX_PER_RUN = 1; + /** dont explore the network more often than once every minute */ + private static final int MIN_RERUN_DELAY_MS = 60*1000; + /** explore the network at least once every thirty minutes */ + private static final int MAX_RERUN_DELAY_MS = 30*60*1000; public StartExplorersJob(RouterContext context, KademliaNetworkDatabaseFacade facade) { super(context); @@ -46,7 +50,32 @@ class StartExplorersJob extends JobImpl { //_log.info("Starting explorer for " + key, new Exception("Exploring!")); _context.jobQueue().addJob(new ExploreJob(_context, _facade, key)); } - requeue(RERUN_DELAY_MS); + long delay = getNextRunDelay(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Reenqueueing the exploration with a delay of " + delay); + requeue(delay); + } + + /** + * the exploration has found some new peers - update the schedule so that + * we'll explore appropriately. + */ + public void updateExploreSchedule() { + long delay = getNextRunDelay(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Updating exploration schedule with a delay of " + delay); + getTiming().setStartAfter(_context.clock().now() + delay); + } + + /** how long should we wait before exploring? */ + private long getNextRunDelay() { + long delay = _context.clock().now() - _facade.getLastExploreNewDate(); + if (delay < MIN_RERUN_DELAY_MS) + return MIN_RERUN_DELAY_MS; + else if (delay > MAX_RERUN_DELAY_MS) + return MAX_RERUN_DELAY_MS; + else + return delay; } /**