forked from I2P_Developers/i2p.i2p
allow some overrides that the ExploreJob needs
logging
This commit is contained in:
@ -43,7 +43,7 @@ import net.i2p.router.RouterContext;
|
|||||||
*/
|
*/
|
||||||
class SearchJob extends JobImpl {
|
class SearchJob extends JobImpl {
|
||||||
private Log _log;
|
private Log _log;
|
||||||
private KademliaNetworkDatabaseFacade _facade;
|
protected KademliaNetworkDatabaseFacade _facade;
|
||||||
private SearchState _state;
|
private SearchState _state;
|
||||||
private Job _onSuccess;
|
private Job _onSuccess;
|
||||||
private Job _onFailure;
|
private Job _onFailure;
|
||||||
@ -54,8 +54,8 @@ class SearchJob extends JobImpl {
|
|||||||
private Job _pendingRequeueJob;
|
private Job _pendingRequeueJob;
|
||||||
private PeerSelector _peerSelector;
|
private PeerSelector _peerSelector;
|
||||||
|
|
||||||
public final static int SEARCH_BREDTH = 3; // 3 peers at a time
|
private static final int SEARCH_BREDTH = 3; // 3 peers at a time
|
||||||
public final static int SEARCH_PRIORITY = 400; // large because the search is probably for a real search
|
private static final int SEARCH_PRIORITY = 400; // large because the search is probably for a real search
|
||||||
|
|
||||||
private static final long PER_PEER_TIMEOUT = 30*1000;
|
private static final long PER_PEER_TIMEOUT = 30*1000;
|
||||||
|
|
||||||
@ -65,7 +65,8 @@ class SearchJob extends JobImpl {
|
|||||||
*/
|
*/
|
||||||
public SearchJob(RouterContext context, KademliaNetworkDatabaseFacade facade, Hash key, Job onSuccess, Job onFailure, long timeoutMs, boolean keepStats, boolean isLease) {
|
public SearchJob(RouterContext context, KademliaNetworkDatabaseFacade facade, Hash key, Job onSuccess, Job onFailure, long timeoutMs, boolean keepStats, boolean isLease) {
|
||||||
super(context);
|
super(context);
|
||||||
if ( (key == null) || (key.getData() == null) ) throw new IllegalArgumentException("Search for null key? wtf");
|
if ( (key == null) || (key.getData() == null) )
|
||||||
|
throw new IllegalArgumentException("Search for null key? wtf");
|
||||||
_log = _context.logManager().getLog(SearchJob.class);
|
_log = _context.logManager().getLog(SearchJob.class);
|
||||||
_facade = facade;
|
_facade = facade;
|
||||||
_state = new SearchState(_context, key);
|
_state = new SearchState(_context, key);
|
||||||
@ -80,6 +81,8 @@ class SearchJob extends JobImpl {
|
|||||||
_context.statManager().createRateStat("netDb.failedTime", "How long a failed search takes", "Network Database", new long[] { 60*60*1000l, 24*60*60*1000l });
|
_context.statManager().createRateStat("netDb.failedTime", "How long a failed search takes", "Network Database", new long[] { 60*60*1000l, 24*60*60*1000l });
|
||||||
_context.statManager().createRateStat("netDb.successPeers", "How many peers are contacted in a successful search", "Network Database", new long[] { 60*60*1000l, 24*60*60*1000l });
|
_context.statManager().createRateStat("netDb.successPeers", "How many peers are contacted in a successful search", "Network Database", new long[] { 60*60*1000l, 24*60*60*1000l });
|
||||||
_context.statManager().createRateStat("netDb.failedPeers", "How many peers are contacted in a failed search", "Network Database", new long[] { 60*60*1000l, 24*60*60*1000l });
|
_context.statManager().createRateStat("netDb.failedPeers", "How many peers are contacted in a failed search", "Network Database", new long[] { 60*60*1000l, 24*60*60*1000l });
|
||||||
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
|
_log.debug("Search (" + getClass().getName() + " for " + key.toBase64(), new Exception("Search enqueued by"));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void runJob() {
|
public void runJob() {
|
||||||
@ -130,6 +133,9 @@ class SearchJob extends JobImpl {
|
|||||||
return _context.clock().now() >= _expiration;
|
return _context.clock().now() >= _expiration;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** max # of concurrent searches */
|
||||||
|
protected int getBredth() { return SEARCH_BREDTH; }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send a series of searches to the next available peers as selected by
|
* Send a series of searches to the next available peers as selected by
|
||||||
* the routing table, but making sure no more than SEARCH_BREDTH are outstanding
|
* the routing table, but making sure no more than SEARCH_BREDTH are outstanding
|
||||||
@ -142,12 +148,12 @@ class SearchJob extends JobImpl {
|
|||||||
_log.debug(getJobId() + ": Search already completed", new Exception("already completed"));
|
_log.debug(getJobId() + ": Search already completed", new Exception("already completed"));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
int toCheck = SEARCH_BREDTH - _state.getPending().size();
|
int toCheck = getBredth() - _state.getPending().size();
|
||||||
if (toCheck <= 0) {
|
if (toCheck <= 0) {
|
||||||
// too many already pending
|
// too many already pending
|
||||||
if (_log.shouldLog(Log.WARN))
|
if (_log.shouldLog(Log.INFO))
|
||||||
_log.warn(getJobId() + ": Too many searches already pending (pending: "
|
_log.info(getJobId() + ": Too many searches already pending (pending: "
|
||||||
+ _state.getPending().size() + " max: " + SEARCH_BREDTH + ")",
|
+ _state.getPending().size() + " max: " + getBredth() + ")",
|
||||||
new Exception("too many pending"));
|
new Exception("too many pending"));
|
||||||
requeuePending();
|
requeuePending();
|
||||||
return;
|
return;
|
||||||
@ -156,15 +162,15 @@ class SearchJob extends JobImpl {
|
|||||||
if ( (closestHashes == null) || (closestHashes.size() <= 0) ) {
|
if ( (closestHashes == null) || (closestHashes.size() <= 0) ) {
|
||||||
if (_state.getPending().size() <= 0) {
|
if (_state.getPending().size() <= 0) {
|
||||||
// we tried to find some peers, but there weren't any and no one else is going to answer
|
// we tried to find some peers, but there weren't any and no one else is going to answer
|
||||||
if (_log.shouldLog(Log.WARN))
|
if (_log.shouldLog(Log.INFO))
|
||||||
_log.warn(getJobId() + ": No peers left, and none pending! Already searched: "
|
_log.info(getJobId() + ": No peers left, and none pending! Already searched: "
|
||||||
+ _state.getAttempted().size() + " failed: " + _state.getFailed().size(),
|
+ _state.getAttempted().size() + " failed: " + _state.getFailed().size(),
|
||||||
new Exception("none left"));
|
new Exception("none left"));
|
||||||
fail();
|
fail();
|
||||||
} else {
|
} else {
|
||||||
// no more to try, but we might get data or close peers from some outstanding requests
|
// no more to try, but we might get data or close peers from some outstanding requests
|
||||||
if (_log.shouldLog(Log.WARN))
|
if (_log.shouldLog(Log.INFO))
|
||||||
_log.warn(getJobId() + ": No peers left, but some are pending! Pending: "
|
_log.info(getJobId() + ": No peers left, but some are pending! Pending: "
|
||||||
+ _state.getPending().size() + " attempted: " + _state.getAttempted().size()
|
+ _state.getPending().size() + " attempted: " + _state.getAttempted().size()
|
||||||
+ " failed: " + _state.getFailed().size(),
|
+ " failed: " + _state.getFailed().size(),
|
||||||
new Exception("none left, but pending"));
|
new Exception("none left, but pending"));
|
||||||
@ -375,6 +381,15 @@ class SearchJob extends JobImpl {
|
|||||||
_context.jobQueue().addJob(new SearchReplyJob((DatabaseSearchReplyMessage)message, peer, duration));
|
_context.jobQueue().addJob(new SearchReplyJob((DatabaseSearchReplyMessage)message, peer, duration));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* We've gotten a search reply that contained the specified
|
||||||
|
* number of peers that we didn't know about before.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
protected void newPeersFound(int numNewPeers) {
|
||||||
|
// noop
|
||||||
|
}
|
||||||
|
|
||||||
private final class SearchReplyJob extends JobImpl {
|
private final class SearchReplyJob extends JobImpl {
|
||||||
private DatabaseSearchReplyMessage _msg;
|
private DatabaseSearchReplyMessage _msg;
|
||||||
private Hash _peer;
|
private Hash _peer;
|
||||||
@ -399,6 +414,8 @@ class SearchJob extends JobImpl {
|
|||||||
if (_curIndex >= _msg.getNumReplies()) {
|
if (_curIndex >= _msg.getNumReplies()) {
|
||||||
_context.profileManager().dbLookupReply(_peer, _newPeers, _seenPeers,
|
_context.profileManager().dbLookupReply(_peer, _newPeers, _seenPeers,
|
||||||
_invalidPeers, _duplicatePeers, _duration);
|
_invalidPeers, _duplicatePeers, _duration);
|
||||||
|
if (_newPeers > 0)
|
||||||
|
newPeersFound(_newPeers);
|
||||||
} else {
|
} else {
|
||||||
RouterInfo ri = _msg.getReply(_curIndex);
|
RouterInfo ri = _msg.getReply(_curIndex);
|
||||||
if (ri.isValid()) {
|
if (ri.isValid()) {
|
||||||
|
Reference in New Issue
Block a user