* FloodOnlySearchJob:

- Fix up field hiding and duplicate overrides
      - Other javadoc and java 5 improvements
This commit is contained in:
zzz
2009-11-09 17:23:22 +00:00
parent 37a2ccca95
commit b045fb3a45
4 changed files with 73 additions and 71 deletions

View File

@ -37,40 +37,25 @@ import net.i2p.util.Log;
* *
*/ */
class FloodOnlySearchJob extends FloodSearchJob { class FloodOnlySearchJob extends FloodSearchJob {
protected Log _log;
private FloodfillNetworkDatabaseFacade _facade;
protected Hash _key;
private final List _onFind;
private final List _onFailed;
private long _expiration;
protected int _timeoutMs;
private long _origExpiration;
private boolean _isLease;
protected volatile int _lookupsRemaining; protected volatile int _lookupsRemaining;
private volatile boolean _dead; private volatile boolean _dead;
private long _created; private long _created;
private boolean _shouldProcessDSRM; private boolean _shouldProcessDSRM;
private final HashSet _unheardFrom; private final HashSet<Hash> _unheardFrom;
protected final List _out; private final List<OutNetMessage> _out;
protected MessageSelector _replySelector; protected MessageSelector _replySelector;
protected ReplyJob _onReply; protected ReplyJob _onReply;
protected Job _onTimeout; protected Job _onTimeout;
public FloodOnlySearchJob(RouterContext ctx, FloodfillNetworkDatabaseFacade facade, Hash key, Job onFind, Job onFailed, int timeoutMs, boolean isLease) { public FloodOnlySearchJob(RouterContext ctx, FloodfillNetworkDatabaseFacade facade, Hash key, Job onFind, Job onFailed, int timeoutMs, boolean isLease) {
super(ctx, facade, key, onFind, onFailed, timeoutMs, isLease); super(ctx, facade, key, onFind, onFailed, timeoutMs, isLease);
// these override the settings in super
_log = ctx.logManager().getLog(FloodOnlySearchJob.class); _log = ctx.logManager().getLog(FloodOnlySearchJob.class);
_facade = facade;
_key = key;
_onFind = new ArrayList();
_onFind.add(onFind);
_onFailed = new ArrayList();
_onFailed.add(onFailed);
_timeoutMs = Math.min(timeoutMs, SearchJob.PER_FLOODFILL_PEER_TIMEOUT); _timeoutMs = Math.min(timeoutMs, SearchJob.PER_FLOODFILL_PEER_TIMEOUT);
_expiration = _timeoutMs + ctx.clock().now(); _expiration = _timeoutMs + ctx.clock().now();
_origExpiration = _timeoutMs + ctx.clock().now(); _origExpiration = _timeoutMs + ctx.clock().now();
_isLease = isLease; // do we need a synchronizedList, since we synch on _out everywhere below...
_lookupsRemaining = 0;
_dead = false;
_out = Collections.synchronizedList(new ArrayList(2)); _out = Collections.synchronizedList(new ArrayList(2));
_unheardFrom = new HashSet(CONCURRENT_SEARCHES); _unheardFrom = new HashSet(CONCURRENT_SEARCHES);
_replySelector = new FloodOnlyLookupSelector(getContext(), this); _replySelector = new FloodOnlyLookupSelector(getContext(), this);
@ -79,17 +64,7 @@ class FloodOnlySearchJob extends FloodSearchJob {
_created = System.currentTimeMillis(); _created = System.currentTimeMillis();
_shouldProcessDSRM = false; _shouldProcessDSRM = false;
} }
@Override
void addDeferred(Job onFind, Job onFailed, long timeoutMs, boolean isLease) {
if (_dead) {
getContext().jobQueue().addJob(onFailed);
} else {
if (onFind != null) synchronized (_onFind) { _onFind.add(onFind); }
if (onFailed != null) synchronized (_onFailed) { _onFailed.add(onFailed); }
}
}
@Override
public long getExpiration() { return _expiration; }
public long getCreated() { return _created; } public long getCreated() { return _created; }
public boolean shouldProcessDSRM() { return _shouldProcessDSRM; } public boolean shouldProcessDSRM() { return _shouldProcessDSRM; }
private static final int CONCURRENT_SEARCHES = 2; private static final int CONCURRENT_SEARCHES = 2;
@ -188,12 +163,6 @@ class FloodOnlySearchJob extends FloodSearchJob {
@Override @Override
public String getName() { return "NetDb flood search (phase 1)"; } public String getName() { return "NetDb flood search (phase 1)"; }
@Override
Hash getKey() { return _key; }
@Override
void decrementRemaining() { if (_lookupsRemaining > 0) _lookupsRemaining--; }
@Override
int getLookupsRemaining() { return _lookupsRemaining; }
/** Note that we heard from the peer */ /** Note that we heard from the peer */
void decrementRemaining(Hash peer) { void decrementRemaining(Hash peer) {
decrementRemaining(); decrementRemaining();
@ -218,8 +187,8 @@ class FloodOnlySearchJob extends FloodSearchJob {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Floodfill search for " + _key.toBase64() + " failed with " + timeRemaining + " remaining after " + (System.currentTimeMillis()-_created)); _log.info(getJobId() + ": Floodfill search for " + _key.toBase64() + " failed with " + timeRemaining + " remaining after " + (System.currentTimeMillis()-_created));
synchronized(_unheardFrom) { synchronized(_unheardFrom) {
for (Iterator iter = _unheardFrom.iterator(); iter.hasNext(); ) for (Iterator<Hash> iter = _unheardFrom.iterator(); iter.hasNext(); )
getContext().profileManager().dbLookupFailed((Hash) iter.next()); getContext().profileManager().dbLookupFailed(iter.next());
} }
_facade.complete(_key); _facade.complete(_key);
getContext().statManager().addRateData("netDb.failedTime", System.currentTimeMillis()-_created, System.currentTimeMillis()-_created); getContext().statManager().addRateData("netDb.failedTime", System.currentTimeMillis()-_created, System.currentTimeMillis()-_created);
@ -248,7 +217,7 @@ class FloodOnlySearchJob extends FloodSearchJob {
// StoreJob also calls dbStoreSent() which updates the lastHeardFrom timer - this also helps. // StoreJob also calls dbStoreSent() which updates the lastHeardFrom timer - this also helps.
synchronized(_unheardFrom) { synchronized(_unheardFrom) {
if (_unheardFrom.size() == 1) { if (_unheardFrom.size() == 1) {
Hash peer = (Hash) _unheardFrom.iterator().next(); Hash peer = _unheardFrom.iterator().next();
getContext().profileManager().dbLookupSuccessful(peer, System.currentTimeMillis()-_created); getContext().profileManager().dbLookupSuccessful(peer, System.currentTimeMillis()-_created);
} }
} }

View File

@ -23,19 +23,22 @@ import net.i2p.util.Log;
* the normal (kademlia) channels. This should cut down on spurious lookups caused * the normal (kademlia) channels. This should cut down on spurious lookups caused
* by simple delays in responses from floodfill peers * by simple delays in responses from floodfill peers
* *
* NOTE: Unused directly - see FloodOnlySearchJob extension which overrides almost everything.
* TODO: Comment out or delete what we don't use here.
*/ */
public class FloodSearchJob extends JobImpl { public class FloodSearchJob extends JobImpl {
private Log _log; protected Log _log;
private FloodfillNetworkDatabaseFacade _facade; protected FloodfillNetworkDatabaseFacade _facade;
private Hash _key; protected Hash _key;
private final List _onFind; protected final List<Job> _onFind;
private final List _onFailed; protected final List<Job> _onFailed;
private long _expiration; protected long _expiration;
private int _timeoutMs; protected int _timeoutMs;
private long _origExpiration; protected long _origExpiration;
private boolean _isLease; protected boolean _isLease;
private volatile int _lookupsRemaining; protected volatile int _lookupsRemaining;
private volatile boolean _dead; protected volatile boolean _dead;
public FloodSearchJob(RouterContext ctx, FloodfillNetworkDatabaseFacade facade, Hash key, Job onFind, Job onFailed, int timeoutMs, boolean isLease) { public FloodSearchJob(RouterContext ctx, FloodfillNetworkDatabaseFacade facade, Hash key, Job onFind, Job onFailed, int timeoutMs, boolean isLease) {
super(ctx); super(ctx);
_log = ctx.logManager().getLog(FloodSearchJob.class); _log = ctx.logManager().getLog(FloodSearchJob.class);
@ -86,13 +89,13 @@ public class FloodSearchJob extends JobImpl {
TunnelInfo outTunnel = getContext().tunnelManager().selectOutboundTunnel(); TunnelInfo outTunnel = getContext().tunnelManager().selectOutboundTunnel();
if ( (replyTunnel == null) || (outTunnel == null) ) { if ( (replyTunnel == null) || (outTunnel == null) ) {
_dead = true; _dead = true;
List removed = null; List<Job> removed = null;
synchronized (_onFailed) { synchronized (_onFailed) {
removed = new ArrayList(_onFailed); removed = new ArrayList(_onFailed);
_onFailed.clear(); _onFailed.clear();
} }
while (removed.size() > 0) while (removed.size() > 0)
getContext().jobQueue().addJob((Job)removed.remove(0)); getContext().jobQueue().addJob(removed.remove(0));
getContext().messageRegistry().unregisterPending(out); getContext().messageRegistry().unregisterPending(out);
return; return;
} }
@ -117,9 +120,9 @@ public class FloodSearchJob extends JobImpl {
} }
public String getName() { return "NetDb search (phase 1)"; } public String getName() { return "NetDb search (phase 1)"; }
Hash getKey() { return _key; } protected Hash getKey() { return _key; }
void decrementRemaining() { _lookupsRemaining--; } protected void decrementRemaining() { if (_lookupsRemaining > 0) _lookupsRemaining--; }
int getLookupsRemaining() { return _lookupsRemaining; } protected int getLookupsRemaining() { return _lookupsRemaining; }
void failed() { void failed() {
if (_dead) return; if (_dead) return;
@ -130,13 +133,13 @@ public class FloodSearchJob extends JobImpl {
if (timeRemaining > 0) { if (timeRemaining > 0) {
_facade.searchFull(_key, _onFind, _onFailed, timeRemaining, _isLease); _facade.searchFull(_key, _onFind, _onFailed, timeRemaining, _isLease);
} else { } else {
List removed = null; List<Job> removed = null;
synchronized (_onFailed) { synchronized (_onFailed) {
removed = new ArrayList(_onFailed); removed = new ArrayList(_onFailed);
_onFailed.clear(); _onFailed.clear();
} }
while (removed.size() > 0) while (removed.size() > 0)
getContext().jobQueue().addJob((Job)removed.remove(0)); getContext().jobQueue().addJob(removed.remove(0));
} }
} }
void success() { void success() {
@ -145,13 +148,13 @@ public class FloodSearchJob extends JobImpl {
_log.info(getJobId() + ": Floodfill search for " + _key.toBase64() + " successful"); _log.info(getJobId() + ": Floodfill search for " + _key.toBase64() + " successful");
_dead = true; _dead = true;
_facade.complete(_key); _facade.complete(_key);
List removed = null; List<Job> removed = null;
synchronized (_onFind) { synchronized (_onFind) {
removed = new ArrayList(_onFind); removed = new ArrayList(_onFind);
_onFind.clear(); _onFind.clear();
} }
while (removed.size() > 0) while (removed.size() > 0)
getContext().jobQueue().addJob((Job)removed.remove(0)); getContext().jobQueue().addJob(removed.remove(0));
} }
private static class FloodLookupTimeoutJob extends JobImpl { private static class FloodLookupTimeoutJob extends JobImpl {

View File

@ -179,8 +179,8 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
return false; return false;
} }
public List getKnownRouterData() { public List<RouterInfo> getKnownRouterData() {
List rv = new ArrayList(); List<RouterInfo> rv = new ArrayList();
DataStore ds = getDataStore(); DataStore ds = getDataStore();
if (ds != null) { if (ds != null) {
Set keys = ds.getKeys(); Set keys = ds.getKeys();
@ -188,7 +188,7 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
for (Iterator iter = keys.iterator(); iter.hasNext(); ) { for (Iterator iter = keys.iterator(); iter.hasNext(); ) {
Object o = getDataStore().get((Hash)iter.next()); Object o = getDataStore().get((Hash)iter.next());
if (o instanceof RouterInfo) if (o instanceof RouterInfo)
rv.add(o); rv.add((RouterInfo)o);
} }
} }
} }
@ -237,7 +237,7 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
* Ok, the initial set of searches to the floodfill peers timed out, lets fall back on the * Ok, the initial set of searches to the floodfill peers timed out, lets fall back on the
* wider kademlia-style searches * wider kademlia-style searches
*/ */
void searchFull(Hash key, List onFind, List onFailed, long timeoutMs, boolean isLease) { void searchFull(Hash key, List<Job> onFind, List<Job> onFailed, long timeoutMs, boolean isLease) {
synchronized (_activeFloodQueries) { _activeFloodQueries.remove(key); } synchronized (_activeFloodQueries) { _activeFloodQueries.remove(key); }
Job find = null; Job find = null;
@ -245,13 +245,13 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
if (onFind != null) { if (onFind != null) {
synchronized (onFind) { synchronized (onFind) {
if (onFind.size() > 0) if (onFind.size() > 0)
find = (Job)onFind.remove(0); find = onFind.remove(0);
} }
} }
if (onFailed != null) { if (onFailed != null) {
synchronized (onFailed) { synchronized (onFailed) {
if (onFailed.size() > 0) if (onFailed.size() > 0)
fail = (Job)onFailed.remove(0); fail = onFailed.remove(0);
} }
} }
SearchJob job = super.search(key, find, fail, timeoutMs, isLease); SearchJob job = super.search(key, find, fail, timeoutMs, isLease);
@ -260,14 +260,14 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
_log.info("Floodfill search timed out for " + key.toBase64() + ", falling back on normal search (#" _log.info("Floodfill search timed out for " + key.toBase64() + ", falling back on normal search (#"
+ job.getJobId() + ") with " + timeoutMs + " remaining"); + job.getJobId() + ") with " + timeoutMs + " remaining");
long expiration = timeoutMs + _context.clock().now(); long expiration = timeoutMs + _context.clock().now();
List removed = null; List<Job> removed = null;
if (onFind != null) { if (onFind != null) {
synchronized (onFind) { synchronized (onFind) {
removed = new ArrayList(onFind); removed = new ArrayList(onFind);
onFind.clear(); onFind.clear();
} }
for (int i = 0; i < removed.size(); i++) for (int i = 0; i < removed.size(); i++)
job.addDeferred((Job)removed.get(i), null, expiration, isLease); job.addDeferred(removed.get(i), null, expiration, isLease);
removed = null; removed = null;
} }
if (onFailed != null) { if (onFailed != null) {
@ -276,7 +276,7 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
onFailed.clear(); onFailed.clear();
} }
for (int i = 0; i < removed.size(); i++) for (int i = 0; i < removed.size(); i++)
job.addDeferred(null, (Job)removed.get(i), expiration, isLease); job.addDeferred(null, removed.get(i), expiration, isLease);
removed = null; removed = null;
} }
} }
@ -287,8 +287,9 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
/** list of the Hashes of currently known floodfill peers; /** list of the Hashes of currently known floodfill peers;
* Returned list will not include our own hash. * Returned list will not include our own hash.
* List is not sorted and not shuffled.
*/ */
public List getFloodfillPeers() { public List<Hash> getFloodfillPeers() {
FloodfillPeerSelector sel = (FloodfillPeerSelector)getPeerSelector(); FloodfillPeerSelector sel = (FloodfillPeerSelector)getPeerSelector();
return sel.selectFloodfillParticipants(getKBuckets()); return sel.selectFloodfillParticipants(getKBuckets());
} }

View File

@ -22,12 +22,19 @@ import net.i2p.router.RouterContext;
import net.i2p.router.peermanager.PeerProfile; import net.i2p.router.peermanager.PeerProfile;
import net.i2p.util.Log; import net.i2p.util.Log;
/**
* This is where we implement semi-Kademlia with the floodfills, by
* selecting floodfills closest to a given key for
* searches and stores.
*
*/
class FloodfillPeerSelector extends PeerSelector { class FloodfillPeerSelector extends PeerSelector {
public FloodfillPeerSelector(RouterContext ctx) { super(ctx); } public FloodfillPeerSelector(RouterContext ctx) { super(ctx); }
/** /**
* Pick out peers with the floodfill capacity set, returning them first, but then * Pick out peers with the floodfill capacity set, returning them first, but then
* after they're complete, sort via kademlia. * after they're complete, sort via kademlia.
* Puts the floodfill peers that are directly connected first in the list.
* *
* @return List of Hash for the peers selected * @return List of Hash for the peers selected
*/ */
@ -36,6 +43,13 @@ class FloodfillPeerSelector extends PeerSelector {
return selectNearestExplicitThin(key, maxNumRouters, peersToIgnore, kbuckets, true); return selectNearestExplicitThin(key, maxNumRouters, peersToIgnore, kbuckets, true);
} }
/**
* Pick out peers with the floodfill capacity set, returning them first, but then
* after they're complete, sort via kademlia.
* Does not prefer the floodfill peers that are directly connected.
*
* @return List of Hash for the peers selected
*/
@Override @Override
public List<Hash> selectNearestExplicitThin(Hash key, int maxNumRouters, Set<Hash> peersToIgnore, KBucketSet kbuckets) { public List<Hash> selectNearestExplicitThin(Hash key, int maxNumRouters, Set<Hash> peersToIgnore, KBucketSet kbuckets) {
return selectNearestExplicitThin(key, maxNumRouters, peersToIgnore, kbuckets, false); return selectNearestExplicitThin(key, maxNumRouters, peersToIgnore, kbuckets, false);
@ -58,7 +72,7 @@ class FloodfillPeerSelector extends PeerSelector {
/** /**
* @return all floodfills not shitlisted forever. list will not include our own hash * @return all floodfills not shitlisted forever. list will not include our own hash
* * List is not sorted and not shuffled.
*/ */
public List<Hash> selectFloodfillParticipants(KBucketSet kbuckets) { public List<Hash> selectFloodfillParticipants(KBucketSet kbuckets) {
if (kbuckets == null) return new ArrayList(); if (kbuckets == null) return new ArrayList();
@ -69,6 +83,7 @@ class FloodfillPeerSelector extends PeerSelector {
/** /**
* @return all floodfills not shitlisted foreverx * @return all floodfills not shitlisted foreverx
* @param key the routing key
* @param maxNumRouters max to return * @param maxNumRouters max to return
* Sorted by closest to the key if > maxNumRouters, otherwise not * Sorted by closest to the key if > maxNumRouters, otherwise not
*/ */
@ -104,7 +119,13 @@ class FloodfillPeerSelector extends PeerSelector {
_matches = 0; _matches = 0;
_wanted = wanted; _wanted = wanted;
} }
/**
* @return unsorted list of all with the 'f' mark in their netdb
* except for shitlisted ones.
*/
public List<Hash> getFloodfillParticipants() { return _floodfillMatches; } public List<Hash> getFloodfillParticipants() { return _floodfillMatches; }
private static final int EXTRA_MATCHES = 100; private static final int EXTRA_MATCHES = 100;
public void add(Hash entry) { public void add(Hash entry) {
//if (_context.profileOrganizer().isFailing(entry)) //if (_context.profileOrganizer().isFailing(entry))
@ -144,6 +165,14 @@ class FloodfillPeerSelector extends PeerSelector {
return get(howMany, false); return get(howMany, false);
} }
/**
* @return list of all with the 'f' mark in their netdb except for shitlisted ones.
* The list is in 3 groups - unsorted (shuffled) within each group.
* Group 1: If preferConnected = true, the peers we are directly
* connected to, that meet the group 2 criteria
* Group 2: Netdb published less than 3h ago, no bad send in last 30m.
* Group 3: All others
*/
public List<Hash> get(int howMany, boolean preferConnected) { public List<Hash> get(int howMany, boolean preferConnected) {
Collections.shuffle(_floodfillMatches, _context.random()); Collections.shuffle(_floodfillMatches, _context.random());
List<Hash> rv = new ArrayList(howMany); List<Hash> rv = new ArrayList(howMany);