split classes into their own files

This commit is contained in:
zzz
2008-12-04 21:56:22 +00:00
parent e9f27c60dd
commit dcf4bb595f
10 changed files with 393 additions and 330 deletions

View File

@ -0,0 +1,66 @@
package net.i2p.router.networkdb.kademlia;
import net.i2p.data.i2np.DatabaseSearchReplyMessage;
import net.i2p.data.i2np.DatabaseStoreMessage;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.JobImpl;
import net.i2p.router.ReplyJob;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
class FloodOnlyLookupMatchJob extends JobImpl implements ReplyJob {
private Log _log;
private FloodOnlySearchJob _search;
private DatabaseSearchReplyMessage _dsrm;
public FloodOnlyLookupMatchJob(RouterContext ctx, FloodOnlySearchJob job) {
super(ctx);
_log = ctx.logManager().getLog(getClass());
_search = job;
_dsrm = null;
}
public void runJob() {
if ( (getContext().netDb().lookupLeaseSetLocally(_search.getKey()) != null) ||
(getContext().netDb().lookupRouterInfoLocally(_search.getKey()) != null) ) {
if (_log.shouldLog(Log.INFO))
_log.info(_search.getJobId() + ": search match and found locally");
_search.success();
} else {
int remaining = _search.getLookupsRemaining();
if (_log.shouldLog(Log.INFO))
_log.info(_search.getJobId() + ": got a DatabaseSearchReply when we were looking for "
+ _search.getKey().toBase64() + ", with " + remaining + " outstanding searches");
// netDb reply pointing us at other people
// Only process if we don't know enough floodfills
// This only works if both reply, otherwise we aren't called - should be fixed
if (_search.shouldProcessDSRM() && _dsrm != null) {
if (_log.shouldLog(Log.INFO))
_log.info(_search.getJobId() + ": Processing DatabaseSearchReply");
// Chase the hashes from the reply
getContext().jobQueue().addJob(new SingleLookupJob(getContext(), _dsrm));
}
_search.failed();
}
}
public String getName() { return "NetDb flood search (phase 1) match"; }
public void setMessage(I2NPMessage message) {
if (message instanceof DatabaseSearchReplyMessage) {
// a dsrm is only passed in when there are no more lookups remaining
// If more than one peer sent one, we only process the last one
// And sadly if the first peer sends a DRSM and the second one times out,
// this won't get called...
_dsrm = (DatabaseSearchReplyMessage) message;
_search.failed();
return;
}
try {
DatabaseStoreMessage dsm = (DatabaseStoreMessage)message;
if (dsm.getValueType() == DatabaseStoreMessage.KEY_TYPE_LEASESET)
getContext().netDb().store(dsm.getKey(), dsm.getLeaseSet());
else
getContext().netDb().store(dsm.getKey(), dsm.getRouterInfo());
} catch (IllegalArgumentException iae) {
if (_log.shouldLog(Log.WARN))
_log.warn(_search.getJobId() + ": Received an invalid store reply", iae);
}
}
}

View File

@ -0,0 +1,51 @@
package net.i2p.router.networkdb.kademlia;
import net.i2p.data.i2np.DatabaseSearchReplyMessage;
import net.i2p.data.i2np.DatabaseStoreMessage;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.JobImpl;
import net.i2p.router.MessageSelector;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
class FloodOnlyLookupSelector implements MessageSelector {
private RouterContext _context;
private FloodOnlySearchJob _search;
private boolean _matchFound;
private Log _log;
public FloodOnlyLookupSelector(RouterContext ctx, FloodOnlySearchJob search) {
_context = ctx;
_search = search;
_log = ctx.logManager().getLog(getClass());
_matchFound = false;
}
public boolean continueMatching() {
return _search.getLookupsRemaining() > 0 && !_matchFound && _context.clock().now() < getExpiration();
}
public long getExpiration() { return (_matchFound ? -1 : _search.getExpiration()); }
public boolean isMatch(I2NPMessage message) {
if (message == null) return false;
if (message instanceof DatabaseStoreMessage) {
DatabaseStoreMessage dsm = (DatabaseStoreMessage)message;
// is it worth making sure the reply came in on the right tunnel?
if (_search.getKey().equals(dsm.getKey())) {
_search.decrementRemaining();
_matchFound = true;
return true;
}
} else if (message instanceof DatabaseSearchReplyMessage) {
DatabaseSearchReplyMessage dsrm = (DatabaseSearchReplyMessage)message;
if (_search.getKey().equals(dsrm.getSearchKey())) {
_search.decrementRemaining(dsrm.getFromHash());
// assume 0 old, all new, 0 invalid, 0 dup
_context.profileManager().dbLookupReply(dsrm.getFromHash(), 0, dsrm.getNumReplies(), 0, 0,
System.currentTimeMillis()-_search.getCreated());
if (_search.getLookupsRemaining() <= 0)
return true; // ok, no more left, so time to fail
else
return false;
}
}
return false;
}
}

View File

@ -0,0 +1,21 @@
package net.i2p.router.networkdb.kademlia;
import net.i2p.router.JobImpl;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
class FloodOnlyLookupTimeoutJob extends JobImpl {
private FloodSearchJob _search;
private Log _log;
public FloodOnlyLookupTimeoutJob(RouterContext ctx, FloodOnlySearchJob job) {
super(ctx);
_search = job;
_log = ctx.logManager().getLog(getClass());
}
public void runJob() {
if (_log.shouldLog(Log.INFO))
_log.info(_search.getJobId() + ": search timed out");
_search.failed();
}
public String getName() { return "NetDb flood search (phase 1) timeout"; }
}

View File

@ -253,189 +253,3 @@ class FloodOnlySearchJob extends FloodSearchJob {
} }
} }
} }
class FloodOnlyLookupTimeoutJob extends JobImpl {
private FloodSearchJob _search;
private Log _log;
public FloodOnlyLookupTimeoutJob(RouterContext ctx, FloodOnlySearchJob job) {
super(ctx);
_search = job;
_log = ctx.logManager().getLog(getClass());
}
public void runJob() {
if (_log.shouldLog(Log.INFO))
_log.info(_search.getJobId() + ": search timed out");
_search.failed();
}
public String getName() { return "NetDb flood search (phase 1) timeout"; }
}
class FloodOnlyLookupMatchJob extends JobImpl implements ReplyJob {
private Log _log;
private FloodOnlySearchJob _search;
private DatabaseSearchReplyMessage _dsrm;
public FloodOnlyLookupMatchJob(RouterContext ctx, FloodOnlySearchJob job) {
super(ctx);
_log = ctx.logManager().getLog(getClass());
_search = job;
_dsrm = null;
}
public void runJob() {
if ( (getContext().netDb().lookupLeaseSetLocally(_search.getKey()) != null) ||
(getContext().netDb().lookupRouterInfoLocally(_search.getKey()) != null) ) {
if (_log.shouldLog(Log.INFO))
_log.info(_search.getJobId() + ": search match and found locally");
_search.success();
} else {
int remaining = _search.getLookupsRemaining();
if (_log.shouldLog(Log.INFO))
_log.info(_search.getJobId() + ": got a DatabaseSearchReply when we were looking for "
+ _search.getKey().toBase64() + ", with " + remaining + " outstanding searches");
// netDb reply pointing us at other people
// Only process if we don't know enough floodfills
// This only works if both reply, otherwise we aren't called - should be fixed
if (_search.shouldProcessDSRM() && _dsrm != null) {
if (_log.shouldLog(Log.INFO))
_log.info(_search.getJobId() + ": Processing DatabaseSearchReply");
// Chase the hashes from the reply
getContext().jobQueue().addJob(new SingleLookupJob(getContext(), _dsrm));
}
_search.failed();
}
}
public String getName() { return "NetDb flood search (phase 1) match"; }
public void setMessage(I2NPMessage message) {
if (message instanceof DatabaseSearchReplyMessage) {
// a dsrm is only passed in when there are no more lookups remaining
// If more than one peer sent one, we only process the last one
// And sadly if the first peer sends a DRSM and the second one times out,
// this won't get called...
_dsrm = (DatabaseSearchReplyMessage) message;
_search.failed();
return;
}
try {
DatabaseStoreMessage dsm = (DatabaseStoreMessage)message;
if (dsm.getValueType() == DatabaseStoreMessage.KEY_TYPE_LEASESET)
getContext().netDb().store(dsm.getKey(), dsm.getLeaseSet());
else
getContext().netDb().store(dsm.getKey(), dsm.getRouterInfo());
} catch (IllegalArgumentException iae) {
if (_log.shouldLog(Log.WARN))
_log.warn(_search.getJobId() + ": Received an invalid store reply", iae);
}
}
}
class FloodOnlyLookupSelector implements MessageSelector {
private RouterContext _context;
private FloodOnlySearchJob _search;
private boolean _matchFound;
private Log _log;
public FloodOnlyLookupSelector(RouterContext ctx, FloodOnlySearchJob search) {
_context = ctx;
_search = search;
_log = ctx.logManager().getLog(getClass());
_matchFound = false;
}
public boolean continueMatching() {
return _search.getLookupsRemaining() > 0 && !_matchFound && _context.clock().now() < getExpiration();
}
public long getExpiration() { return (_matchFound ? -1 : _search.getExpiration()); }
public boolean isMatch(I2NPMessage message) {
if (message == null) return false;
if (message instanceof DatabaseStoreMessage) {
DatabaseStoreMessage dsm = (DatabaseStoreMessage)message;
// is it worth making sure the reply came in on the right tunnel?
if (_search.getKey().equals(dsm.getKey())) {
_search.decrementRemaining();
_matchFound = true;
return true;
}
} else if (message instanceof DatabaseSearchReplyMessage) {
DatabaseSearchReplyMessage dsrm = (DatabaseSearchReplyMessage)message;
if (_search.getKey().equals(dsrm.getSearchKey())) {
_search.decrementRemaining(dsrm.getFromHash());
// assume 0 old, all new, 0 invalid, 0 dup
_context.profileManager().dbLookupReply(dsrm.getFromHash(), 0, dsrm.getNumReplies(), 0, 0,
System.currentTimeMillis()-_search.getCreated());
if (_search.getLookupsRemaining() <= 0)
return true; // ok, no more left, so time to fail
else
return false;
}
}
return false;
}
}
/** Below here, only used to lookup the DSRM reply hashes when we are short of floodfills **/
/**
* Ask the peer who sent us the DSRM for the RouterInfos.
* A simple version of SearchReplyJob in SearchJob.java.
* Skip the profile updates - this should be rare.
*
*/
class SingleLookupJob extends JobImpl {
private Log _log;
private DatabaseSearchReplyMessage _dsrm;
public SingleLookupJob(RouterContext ctx, DatabaseSearchReplyMessage dsrm) {
super(ctx);
_log = ctx.logManager().getLog(getClass());
_dsrm = dsrm;
}
public void runJob() {
Hash from = _dsrm.getFromHash();
for (int i = 0; i < _dsrm.getNumReplies(); i++) {
Hash peer = _dsrm.getReply(i);
if (peer.equals(getContext().routerHash())) // us
continue;
if (getContext().netDb().lookupRouterInfoLocally(peer) == null)
getContext().jobQueue().addJob(new SingleSearchJob(getContext(), peer, from));
}
}
public String getName() { return "NetDb process DSRM"; }
}
/**
* Ask a single peer for a single key.
* This isn't really a flood-only search job at all, but we extend
* FloodOnlySearchJob so we can use the same selectors, etc.
*
*/
class SingleSearchJob extends FloodOnlySearchJob {
Hash _to;
OutNetMessage _onm;
public SingleSearchJob(RouterContext ctx, Hash key, Hash to) {
// warning, null FloodfillNetworkDatabaseFacade ...
// define our own failed() and success() below so _facade isn't used.
super(ctx, null, key, null, null, 5*1000, false);
_to = to;
}
public String getName() { return "NetDb search key from DSRM"; }
public boolean shouldProcessDSRM() { return false; } // don't loop
public void runJob() {
_onm = getContext().messageRegistry().registerPending(_replySelector, _onReply, _onTimeout, _timeoutMs);
DatabaseLookupMessage dlm = new DatabaseLookupMessage(getContext(), true);
TunnelInfo replyTunnel = getContext().tunnelManager().selectInboundTunnel();
TunnelInfo outTunnel = getContext().tunnelManager().selectOutboundTunnel();
if ( (replyTunnel == null) || (outTunnel == null) ) {
failed();
return;
}
dlm.setFrom(replyTunnel.getPeer(0));
dlm.setMessageExpiration(getContext().clock().now()+5*1000);
dlm.setReplyTunnel(replyTunnel.getReceiveTunnelId(0));
dlm.setSearchKey(_key);
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Single search for " + _key.toBase64() + " to " + _to.toBase64());
getContext().tunnelDispatcher().dispatchOutbound(dlm, outTunnel.getSendTunnelId(0), _to);
_lookupsRemaining = 1;
}
void failed() {
getContext().messageRegistry().unregisterPending(_onm);
}
void success() {}
}

View File

@ -231,6 +231,7 @@ class PersistentDataStore extends TransientDataStore {
} }
} }
/** This is only for manual reseeding? Why bother every 60 sec??? */
private class ReadJob extends JobImpl { private class ReadJob extends JobImpl {
private boolean _alreadyWarned; private boolean _alreadyWarned;
public ReadJob() { public ReadJob() {

View File

@ -775,146 +775,3 @@ class SearchJob extends JobImpl {
boolean add(Hash peer) { return _facade.getKBuckets().add(peer); } boolean add(Hash peer) { return _facade.getKBuckets().add(peer); }
void decrementOutstandingFloodfillSearches() { _floodfillSearchesOutstanding--; } void decrementOutstandingFloodfillSearches() { _floodfillSearchesOutstanding--; }
} }
class SearchReplyJob extends JobImpl {
private DatabaseSearchReplyMessage _msg;
private Log _log;
/**
* Peer who we think sent us the reply. Note: could be spoofed! If the
* attacker knew we were searching for a particular key from a
* particular peer, they could send us some searchReply messages with
* shitty values, trying to get us to consider that peer unreliable.
* Potential fixes include either authenticated 'from' address or use a
* nonce in the search + searchReply (and check for it in the selector).
*
*/
private Hash _peer;
private int _curIndex;
private int _invalidPeers;
private int _seenPeers;
private int _newPeers;
private int _duplicatePeers;
private int _repliesPendingVerification;
private long _duration;
private SearchJob _searchJob;
public SearchReplyJob(RouterContext enclosingContext, SearchJob job, DatabaseSearchReplyMessage message, Hash peer, long duration) {
super(enclosingContext);
_log = enclosingContext.logManager().getLog(getClass());
_searchJob = job;
_msg = message;
_peer = peer;
_curIndex = 0;
_invalidPeers = 0;
_seenPeers = 0;
_newPeers = 0;
_duplicatePeers = 0;
_repliesPendingVerification = 0;
if (duration > 0)
_duration = duration;
else
_duration = 0;
}
public String getName() { return "Process Reply for Kademlia Search"; }
public void runJob() {
if (_curIndex >= _msg.getNumReplies()) {
if (_repliesPendingVerification > 0) {
// we received new references from the peer, but still
// haven't verified all of them, so lets give it more time
requeue(_searchJob.timeoutMs());
} else {
// either they didn't tell us anything new or we have verified
// (or failed to verify) all of them. we're done
getContext().profileManager().dbLookupReply(_peer, _newPeers, _seenPeers,
_invalidPeers, _duplicatePeers, _duration);
if (_newPeers > 0)
_searchJob.newPeersFound(_newPeers);
}
} else {
Hash peer = _msg.getReply(_curIndex);
boolean shouldAdd = false;
RouterInfo info = getContext().netDb().lookupRouterInfoLocally(peer);
if (info == null) {
// if the peer is giving us lots of bad peer references,
// dont try to fetch them.
boolean sendsBadInfo = getContext().profileOrganizer().peerSendsBadReplies(_peer);
if (!sendsBadInfo) {
// we don't need to search for everthing we're given here - only ones that
// are next in our search path...
// note: no need to think about shitlisted targets in the netdb search, given
// the floodfill's behavior
// This keeps us from continually chasing blocklisted floodfills
if (getContext().shitlist().isShitlisted(peer)) {
// if (_log.shouldLog(Log.INFO))
// _log.info("Not looking for a shitlisted peer...");
// getContext().statManager().addRateData("netDb.searchReplyValidationSkipped", 1, 0);
} else {
//getContext().netDb().lookupRouterInfo(peer, new ReplyVerifiedJob(getContext(), peer), new ReplyNotVerifiedJob(getContext(), peer), _timeoutMs);
//_repliesPendingVerification++;
shouldAdd = true;
}
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Peer " + _peer.toBase64() + " sends us bad replies, so not verifying " + peer.toBase64());
getContext().statManager().addRateData("netDb.searchReplyValidationSkipped", 1, 0);
}
}
if (_searchJob.wasAttempted(peer)) {
_duplicatePeers++;
}
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": dbSearchReply received on search referencing router " + peer);
if (shouldAdd) {
if (_searchJob.add(peer))
_newPeers++;
else
_seenPeers++;
}
_curIndex++;
requeue(0);
}
}
void replyVerified() {
if (_log.shouldLog(Log.INFO))
_log.info("Peer reply from " + _peer.toBase64());
_repliesPendingVerification--;
getContext().statManager().addRateData("netDb.searchReplyValidated", 1, 0);
}
void replyNotVerified() {
if (_log.shouldLog(Log.INFO))
_log.info("Peer reply from " + _peer.toBase64());
_repliesPendingVerification--;
_invalidPeers++;
getContext().statManager().addRateData("netDb.searchReplyNotValidated", 1, 0);
}
}
/** the peer gave us a reference to a new router, and we were able to fetch it */
class ReplyVerifiedJob extends JobImpl {
private Hash _key;
private SearchReplyJob _replyJob;
public ReplyVerifiedJob(RouterContext enclosingContext, SearchReplyJob srj, Hash key) {
super(enclosingContext);
_replyJob = srj;
_key = key;
}
public String getName() { return "Search reply value verified"; }
public void runJob() { _replyJob.replyVerified(); }
}
/** the peer gave us a reference to a new router, and we were NOT able to fetch it */
class ReplyNotVerifiedJob extends JobImpl {
private Hash _key;
private SearchReplyJob _replyJob;
public ReplyNotVerifiedJob(RouterContext enclosingContext, SearchReplyJob srj, Hash key) {
super(enclosingContext);
_key = key;
_replyJob = srj;
}
public String getName() { return "Search reply value NOT verified"; }
public void runJob() { _replyJob.replyNotVerified(); }
}

View File

@ -0,0 +1,165 @@
package net.i2p.router.networkdb.kademlia;
import net.i2p.data.Hash;
import net.i2p.data.i2np.DatabaseSearchReplyMessage;
import net.i2p.data.RouterInfo;
import net.i2p.router.JobImpl;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
/*
* free (adj.): unencumbered; not under the control of others
* Written by jrandom in 2003 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
class SearchReplyJob extends JobImpl {
private DatabaseSearchReplyMessage _msg;
private Log _log;
/**
* Peer who we think sent us the reply. Note: could be spoofed! If the
* attacker knew we were searching for a particular key from a
* particular peer, they could send us some searchReply messages with
* shitty values, trying to get us to consider that peer unreliable.
* Potential fixes include either authenticated 'from' address or use a
* nonce in the search + searchReply (and check for it in the selector).
*
*/
private Hash _peer;
private int _curIndex;
private int _invalidPeers;
private int _seenPeers;
private int _newPeers;
private int _duplicatePeers;
private int _repliesPendingVerification;
private long _duration;
private SearchJob _searchJob;
public SearchReplyJob(RouterContext enclosingContext, SearchJob job, DatabaseSearchReplyMessage message, Hash peer, long duration) {
super(enclosingContext);
_log = enclosingContext.logManager().getLog(getClass());
_searchJob = job;
_msg = message;
_peer = peer;
_curIndex = 0;
_invalidPeers = 0;
_seenPeers = 0;
_newPeers = 0;
_duplicatePeers = 0;
_repliesPendingVerification = 0;
if (duration > 0)
_duration = duration;
else
_duration = 0;
}
public String getName() { return "Process Reply for Kademlia Search"; }
public void runJob() {
if (_curIndex >= _msg.getNumReplies()) {
if (_repliesPendingVerification > 0) {
// we received new references from the peer, but still
// haven't verified all of them, so lets give it more time
requeue(_searchJob.timeoutMs());
} else {
// either they didn't tell us anything new or we have verified
// (or failed to verify) all of them. we're done
getContext().profileManager().dbLookupReply(_peer, _newPeers, _seenPeers,
_invalidPeers, _duplicatePeers, _duration);
if (_newPeers > 0)
_searchJob.newPeersFound(_newPeers);
}
} else {
Hash peer = _msg.getReply(_curIndex);
boolean shouldAdd = false;
RouterInfo info = getContext().netDb().lookupRouterInfoLocally(peer);
if (info == null) {
// if the peer is giving us lots of bad peer references,
// dont try to fetch them.
boolean sendsBadInfo = getContext().profileOrganizer().peerSendsBadReplies(_peer);
if (!sendsBadInfo) {
// we don't need to search for everthing we're given here - only ones that
// are next in our search path...
// note: no need to think about shitlisted targets in the netdb search, given
// the floodfill's behavior
// This keeps us from continually chasing blocklisted floodfills
if (getContext().shitlist().isShitlisted(peer)) {
// if (_log.shouldLog(Log.INFO))
// _log.info("Not looking for a shitlisted peer...");
// getContext().statManager().addRateData("netDb.searchReplyValidationSkipped", 1, 0);
} else {
//getContext().netDb().lookupRouterInfo(peer, new ReplyVerifiedJob(getContext(), peer), new ReplyNotVerifiedJob(getContext(), peer), _timeoutMs);
//_repliesPendingVerification++;
shouldAdd = true;
}
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Peer " + _peer.toBase64() + " sends us bad replies, so not verifying " + peer.toBase64());
getContext().statManager().addRateData("netDb.searchReplyValidationSkipped", 1, 0);
}
}
if (_searchJob.wasAttempted(peer)) {
_duplicatePeers++;
}
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": dbSearchReply received on search referencing router " + peer);
if (shouldAdd) {
if (_searchJob.add(peer))
_newPeers++;
else
_seenPeers++;
}
_curIndex++;
requeue(0);
}
}
void replyVerified() {
if (_log.shouldLog(Log.INFO))
_log.info("Peer reply from " + _peer.toBase64());
_repliesPendingVerification--;
getContext().statManager().addRateData("netDb.searchReplyValidated", 1, 0);
}
void replyNotVerified() {
if (_log.shouldLog(Log.INFO))
_log.info("Peer reply from " + _peer.toBase64());
_repliesPendingVerification--;
_invalidPeers++;
getContext().statManager().addRateData("netDb.searchReplyNotValidated", 1, 0);
}
}
/** the peer gave us a reference to a new router, and we were able to fetch it */
/***
class ReplyVerifiedJob extends JobImpl {
private Hash _key;
private SearchReplyJob _replyJob;
public ReplyVerifiedJob(RouterContext enclosingContext, SearchReplyJob srj, Hash key) {
super(enclosingContext);
_replyJob = srj;
_key = key;
}
public String getName() { return "Search reply value verified"; }
public void runJob() { _replyJob.replyVerified(); }
}
***/
/** the peer gave us a reference to a new router, and we were NOT able to fetch it */
/***
class ReplyNotVerifiedJob extends JobImpl {
private Hash _key;
private SearchReplyJob _replyJob;
public ReplyNotVerifiedJob(RouterContext enclosingContext, SearchReplyJob srj, Hash key) {
super(enclosingContext);
_key = key;
_replyJob = srj;
}
public String getName() { return "Search reply value NOT verified"; }
public void runJob() { _replyJob.replyNotVerified(); }
}
***/

View File

@ -0,0 +1,34 @@
package net.i2p.router.networkdb.kademlia;
import net.i2p.data.Hash;
import net.i2p.data.i2np.DatabaseSearchReplyMessage;
import net.i2p.router.JobImpl;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
/**
* Ask the peer who sent us the DSRM for the RouterInfos.
* A simple version of SearchReplyJob in SearchJob.java.
* Skip the profile updates - this should be rare.
*
*/
class SingleLookupJob extends JobImpl {
private Log _log;
private DatabaseSearchReplyMessage _dsrm;
public SingleLookupJob(RouterContext ctx, DatabaseSearchReplyMessage dsrm) {
super(ctx);
_log = ctx.logManager().getLog(getClass());
_dsrm = dsrm;
}
public void runJob() {
Hash from = _dsrm.getFromHash();
for (int i = 0; i < _dsrm.getNumReplies(); i++) {
Hash peer = _dsrm.getReply(i);
if (peer.equals(getContext().routerHash())) // us
continue;
if (getContext().netDb().lookupRouterInfoLocally(peer) == null)
getContext().jobQueue().addJob(new SingleSearchJob(getContext(), peer, from));
}
}
public String getName() { return "NetDb process DSRM"; }
}

View File

@ -0,0 +1,51 @@
package net.i2p.router.networkdb.kademlia;
import net.i2p.data.Hash;
import net.i2p.data.i2np.DatabaseLookupMessage;
import net.i2p.router.JobImpl;
import net.i2p.router.RouterContext;
import net.i2p.router.OutNetMessage;
import net.i2p.router.TunnelInfo;
import net.i2p.util.Log;
/**
* Ask a single peer for a single key.
* This isn't really a flood-only search job at all, but we extend
* FloodOnlySearchJob so we can use the same selectors, etc.
*
*/
class SingleSearchJob extends FloodOnlySearchJob {
Hash _to;
OutNetMessage _onm;
public SingleSearchJob(RouterContext ctx, Hash key, Hash to) {
// warning, null FloodfillNetworkDatabaseFacade ...
// define our own failed() and success() below so _facade isn't used.
super(ctx, null, key, null, null, 5*1000, false);
_to = to;
}
public String getName() { return "NetDb search key from DSRM"; }
public boolean shouldProcessDSRM() { return false; } // don't loop
public void runJob() {
_onm = getContext().messageRegistry().registerPending(_replySelector, _onReply, _onTimeout, _timeoutMs);
DatabaseLookupMessage dlm = new DatabaseLookupMessage(getContext(), true);
TunnelInfo replyTunnel = getContext().tunnelManager().selectInboundTunnel();
TunnelInfo outTunnel = getContext().tunnelManager().selectOutboundTunnel();
if ( (replyTunnel == null) || (outTunnel == null) ) {
failed();
return;
}
dlm.setFrom(replyTunnel.getPeer(0));
dlm.setMessageExpiration(getContext().clock().now()+5*1000);
dlm.setReplyTunnel(replyTunnel.getReceiveTunnelId(0));
dlm.setSearchKey(_key);
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Single search for " + _key.toBase64() + " to " + _to.toBase64());
getContext().tunnelDispatcher().dispatchOutbound(dlm, outTunnel.getSendTunnelId(0), _to);
_lookupsRemaining = 1;
}
void failed() {
getContext().messageRegistry().unregisterPending(_onm);
}
void success() {}
}

View File

@ -42,6 +42,7 @@ class StartExplorersJob extends JobImpl {
public String getName() { return "Start Explorers Job"; } public String getName() { return "Start Explorers Job"; }
public void runJob() { public void runJob() {
Set toExplore = selectKeysToExplore(); Set toExplore = selectKeysToExplore();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Keys to explore during this run: " + toExplore); _log.debug("Keys to explore during this run: " + toExplore);
_facade.removeFromExploreKeys(toExplore); _facade.removeFromExploreKeys(toExplore);
for (Iterator iter = toExplore.iterator(); iter.hasNext(); ) { for (Iterator iter = toExplore.iterator(); iter.hasNext(); ) {
@ -83,6 +84,8 @@ class StartExplorersJob extends JobImpl {
*/ */
private Set selectKeysToExplore() { private Set selectKeysToExplore() {
Set queued = _facade.getExploreKeys(); Set queued = _facade.getExploreKeys();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Keys waiting for exploration: " + queued.size());
if (queued.size() <= MAX_PER_RUN) if (queued.size() <= MAX_PER_RUN)
return queued; return queued;
Set rv = new HashSet(MAX_PER_RUN); Set rv = new HashSet(MAX_PER_RUN);