* do a db store after a successful db search (healing the netDb)

* timeout each peer in a db search after 10 seconds, not 30
* logging
This commit is contained in:
jrandom
2004-05-17 00:59:29 +00:00
committed by zzz
parent 1cf7dac82b
commit 07e79ce61a
6 changed files with 98 additions and 37 deletions

View File

@ -640,11 +640,12 @@ public class OutboundClientMessageJob extends JobImpl {
long sendTime = _context.clock().now() - _status.getStart(); long sendTime = _context.clock().now() - _status.getStart();
boolean alreadySuccessful = _status.success(); boolean alreadySuccessful = _status.success();
MessageId msgId = _status.getMessage().getMessageId(); MessageId msgId = _status.getMessage().getMessageId();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.INFO))
_log.debug(OutboundClientMessageJob.this.getJobId() _log.info(OutboundClientMessageJob.this.getJobId()
+ ": SUCCESS! Message delivered completely for message " + msgId + ": SUCCESS! msg " + msgId
+ " after " + sendTime + "ms [for " + " sent after " + sendTime + "ms after "
+ _status.getMessage().getMessageId() + "]"); + _status.getNumLookups() + " lookups and "
+ _status.getNumSent() + " sends");
if ( (_key != null) && (_tags != null) && (_tags.size() > 0) ) { if ( (_key != null) && (_tags != null) && (_tags.size() > 0) ) {
_context.sessionKeyManager().tagsDelivered(_status.getLeaseSet().getEncryptionKey(), _context.sessionKeyManager().tagsDelivered(_status.getLeaseSet().getEncryptionKey(),

View File

@ -57,7 +57,19 @@ class SearchJob extends JobImpl {
private static final int SEARCH_BREDTH = 3; // 3 peers at a time private static final int SEARCH_BREDTH = 3; // 3 peers at a time
private static final int SEARCH_PRIORITY = 400; // large because the search is probably for a real search private static final int SEARCH_PRIORITY = 400; // large because the search is probably for a real search
private static final long PER_PEER_TIMEOUT = 30*1000; /**
* How long will we give each peer to reply to our search?
*
*/
private static final long PER_PEER_TIMEOUT = 10*1000;
/**
* give ourselves 30 seconds to send out the value found to the closest
* peers /after/ we get a successful match. If this fails, no biggie, but
* this'll help heal the network so subsequent searches will find the data.
*
*/
private static final long RESEND_TIMEOUT = 30*1000;
/** /**
* Create a new search for the routingKey specified * Create a new search for the routingKey specified
@ -103,8 +115,8 @@ class SearchJob extends JobImpl {
*/ */
protected void searchNext() { protected void searchNext() {
if (_state.completed()) { if (_state.completed()) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.DEBUG))
_log.info(getJobId() + ": Already completed"); _log.debug(getJobId() + ": Already completed");
return; return;
} }
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
@ -155,8 +167,7 @@ class SearchJob extends JobImpl {
// too many already pending // too many already pending
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Too many searches already pending (pending: " _log.info(getJobId() + ": Too many searches already pending (pending: "
+ _state.getPending().size() + " max: " + getBredth() + ")", + _state.getPending().size() + " max: " + getBredth() + ")");
new Exception("too many pending"));
requeuePending(); requeuePending();
return; return;
} }
@ -166,16 +177,14 @@ class SearchJob extends JobImpl {
// we tried to find some peers, but there weren't any and no one else is going to answer // we tried to find some peers, but there weren't any and no one else is going to answer
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": No peers left, and none pending! Already searched: " _log.info(getJobId() + ": No peers left, and none pending! Already searched: "
+ _state.getAttempted().size() + " failed: " + _state.getFailed().size(), + _state.getAttempted().size() + " failed: " + _state.getFailed().size());
new Exception("none left"));
fail(); fail();
} else { } else {
// no more to try, but we might get data or close peers from some outstanding requests // no more to try, but we might get data or close peers from some outstanding requests
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": No peers left, but some are pending! Pending: " _log.info(getJobId() + ": No peers left, but some are pending! Pending: "
+ _state.getPending().size() + " attempted: " + _state.getAttempted().size() + _state.getPending().size() + " attempted: " + _state.getAttempted().size()
+ " failed: " + _state.getFailed().size(), + " failed: " + _state.getFailed().size());
new Exception("none left, but pending"));
requeuePending(); requeuePending();
return; return;
} }
@ -187,7 +196,7 @@ class SearchJob extends JobImpl {
if ( (ds == null) || !(ds instanceof RouterInfo) ) { if ( (ds == null) || !(ds instanceof RouterInfo) ) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn(getJobId() + ": Error selecting closest hash that wasnt a router! " _log.warn(getJobId() + ": Error selecting closest hash that wasnt a router! "
+ peer + " : " + ds); + peer + " : " + (ds == null ? "null" : ds.getClass().getName()));
} else { } else {
sendSearch((RouterInfo)ds); sendSearch((RouterInfo)ds);
} }
@ -237,7 +246,7 @@ class SearchJob extends JobImpl {
return; return;
} else { } else {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": Send search to " + router); _log.debug(getJobId() + ": Send search to " + router.getIdentity().getHash().toBase64());
} }
if (_isLease || false) // moo if (_isLease || false) // moo
@ -424,8 +433,8 @@ class SearchJob extends JobImpl {
if (_state.wasAttempted(ri.getIdentity().getHash())) { if (_state.wasAttempted(ri.getIdentity().getHash())) {
_duplicatePeers++; _duplicatePeers++;
} }
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.DEBUG))
_log.info(getJobId() + ": dbSearchReply received on search containing router " _log.debug(getJobId() + ": dbSearchReply received on search containing router "
+ ri.getIdentity().getHash() + " with publishDate of " + ri.getIdentity().getHash() + " with publishDate of "
+ new Date(ri.getPublished())); + new Date(ri.getPublished()));
_facade.store(ri.getIdentity().getHash(), ri); _facade.store(ri.getIdentity().getHash(), ri);
@ -435,8 +444,7 @@ class SearchJob extends JobImpl {
_seenPeers++; _seenPeers++;
} else { } else {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error(getJobId() + ": Received an invalid peer from " + _peer + ": " _log.error(getJobId() + ": Received an invalid peer from " + _peer + ": " + ri);
+ ri, new Exception("Invalid peer"));
_invalidPeers++; _invalidPeers++;
} }
_curIndex++; _curIndex++;
@ -485,9 +493,10 @@ class SearchJob extends JobImpl {
/** /**
* Search was totally successful * Search was totally successful
*/ */
protected void succeed() { private void succeed() {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Succeeded search for key " + _state.getTarget()); _log.info(getJobId() + ": Succeeded search for key " + _state.getTarget()
+ " after querying " + _state.getAttempted().size());
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": State of successful search: " + _state); _log.debug(getJobId() + ": State of successful search: " + _state);
@ -498,6 +507,23 @@ class SearchJob extends JobImpl {
} }
if (_onSuccess != null) if (_onSuccess != null)
_context.jobQueue().addJob(_onSuccess); _context.jobQueue().addJob(_onSuccess);
resend();
}
/**
* After we get the data we were searching for, rebroadcast it to the peers
* we would query first if we were to search for it again (healing the network).
*
*/
private void resend() {
DataStructure ds = _facade.lookupLeaseSetLocally(_state.getTarget());
if (ds == null)
ds = _facade.lookupRouterInfoLocally(_state.getTarget());
if (ds != null)
_context.jobQueue().addJob(new StoreJob(_context, _facade, _state.getTarget(),
ds, null, null, RESEND_TIMEOUT,
_state.getSuccessful()));
} }
/** /**

View File

@ -36,12 +36,16 @@ class SearchMessageSelector implements MessageSelector {
_log.debug("[" + _id + "] Created: " + toString()); _log.debug("[" + _id + "] Created: " + toString());
} }
public String toString() { return "Search selector [" + _id + "] looking for a reply from " + _peer + " with regards to " + _state.getTarget(); } public String toString() {
return "Search selector [" + _id + "] looking for a reply from " + _peer
+ " with regards to " + _state.getTarget();
}
public boolean continueMatching() { public boolean continueMatching() {
if (_found) { if (_found) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("[" + _id + "] Dont continue matching! looking for a reply from " + _peer + " with regards to " + _state.getTarget()); _log.debug("[" + _id + "] Dont continue matching! looking for a reply from "
+ _peer + " with regards to " + _state.getTarget());
return false; return false;
} }
long now = _context.clock().now(); long now = _context.clock().now();
@ -50,12 +54,16 @@ class SearchMessageSelector implements MessageSelector {
public long getExpiration() { return _exp; } public long getExpiration() { return _exp; }
public boolean isMatch(I2NPMessage message) { public boolean isMatch(I2NPMessage message) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("[" + _id + "] isMatch("+message.getClass().getName() + ") [want dbStore or dbSearchReply from " + _peer + " for " + _state.getTarget() + "]"); _log.debug("[" + _id + "] isMatch("+message.getClass().getName()
+ ") [want dbStore or dbSearchReply from " + _peer
+ " for " + _state.getTarget() + "]");
if (message instanceof DatabaseStoreMessage) { if (message instanceof DatabaseStoreMessage) {
DatabaseStoreMessage msg = (DatabaseStoreMessage)message; DatabaseStoreMessage msg = (DatabaseStoreMessage)message;
if (msg.getKey().equals(_state.getTarget())) { if (msg.getKey().equals(_state.getTarget())) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("[" + _id + "] Was a DBStore of the key we're looking for. May not have been from who we're checking against though, but DBStore doesn't include that info"); _log.debug("[" + _id + "] Was a DBStore of the key we're looking for. "
+ "May not have been from who we're checking against though, "
+ "but DBStore doesn't include that info");
_found = true; _found = true;
return true; return true;
} else { } else {
@ -68,17 +76,20 @@ class SearchMessageSelector implements MessageSelector {
if (_peer.equals(msg.getFromHash())) { if (_peer.equals(msg.getFromHash())) {
if (msg.getSearchKey().equals(_state.getTarget())) { if (msg.getSearchKey().equals(_state.getTarget())) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("[" + _id + "] Was a DBSearchReply from who we're checking with for a key we're looking for"); _log.debug("[" + _id + "] Was a DBSearchReply from who we're "
+ "checking with for a key we're looking for");
_found = true; _found = true;
return true; return true;
} else { } else {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("[" + _id + "] Was a DBSearchReply from who we're checking with but NOT for the key we're looking for"); _log.debug("[" + _id + "] Was a DBSearchReply from who we're checking "
+ "with but NOT for the key we're looking for");
return false; return false;
} }
} else { } else {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("[" + _id + "] DBSearchReply from someone we are not checking with [" + msg.getFromHash() + ", not " + _state.getTarget() + "]"); _log.debug("[" + _id + "] DBSearchReply from someone we are not checking with ["
+ msg.getFromHash() + ", not " + _state.getTarget() + "]");
return false; return false;
} }
} else { } else {

View File

@ -129,29 +129,33 @@ class SearchState {
buf.append(" completed? false "); buf.append(" completed? false ");
else else
buf.append(" completed on ").append(new Date(_completed)); buf.append(" completed on ").append(new Date(_completed));
buf.append(" Attempted: "); buf.append("\n\tAttempted: ");
synchronized (_attemptedPeers) { synchronized (_attemptedPeers) {
buf.append(_attemptedPeers.size()).append(' ');
for (Iterator iter = _attemptedPeers.iterator(); iter.hasNext(); ) { for (Iterator iter = _attemptedPeers.iterator(); iter.hasNext(); ) {
Hash peer = (Hash)iter.next(); Hash peer = (Hash)iter.next();
buf.append(peer.toBase64()).append(" "); buf.append(peer.toBase64()).append(" ");
} }
} }
buf.append(" Pending: "); buf.append("\n\tPending: ");
synchronized (_pendingPeers) { synchronized (_pendingPeers) {
buf.append(_pendingPeers.size()).append(' ');
for (Iterator iter = _pendingPeers.iterator(); iter.hasNext(); ) { for (Iterator iter = _pendingPeers.iterator(); iter.hasNext(); ) {
Hash peer = (Hash)iter.next(); Hash peer = (Hash)iter.next();
buf.append(peer.toBase64()).append(" "); buf.append(peer.toBase64()).append(" ");
} }
} }
buf.append(" Failed: "); buf.append("\n\tFailed: ");
synchronized (_failedPeers) { synchronized (_failedPeers) {
buf.append(_failedPeers.size()).append(' ');
for (Iterator iter = _failedPeers.iterator(); iter.hasNext(); ) { for (Iterator iter = _failedPeers.iterator(); iter.hasNext(); ) {
Hash peer = (Hash)iter.next(); Hash peer = (Hash)iter.next();
buf.append(peer.toBase64()).append(" "); buf.append(peer.toBase64()).append(" ");
} }
} }
buf.append(" Successful: "); buf.append("\n\tSuccessful: ");
synchronized (_successfulPeers) { synchronized (_successfulPeers) {
buf.append(_successfulPeers.size()).append(' ');
for (Iterator iter = _successfulPeers.iterator(); iter.hasNext(); ) { for (Iterator iter = _successfulPeers.iterator(); iter.hasNext(); ) {
Hash peer = (Hash)iter.next(); Hash peer = (Hash)iter.next();
buf.append(peer.toBase64()).append(" "); buf.append(peer.toBase64()).append(" ");

View File

@ -25,7 +25,9 @@ class SearchUpdateReplyFoundJob extends JobImpl implements ReplyJob {
private KademliaNetworkDatabaseFacade _facade; private KademliaNetworkDatabaseFacade _facade;
private SearchJob _job; private SearchJob _job;
public SearchUpdateReplyFoundJob(RouterContext context, RouterInfo peer, SearchState state, KademliaNetworkDatabaseFacade facade, SearchJob job) { public SearchUpdateReplyFoundJob(RouterContext context, RouterInfo peer,
SearchState state, KademliaNetworkDatabaseFacade facade,
SearchJob job) {
super(context); super(context);
_log = context.logManager().getLog(SearchUpdateReplyFoundJob.class); _log = context.logManager().getLog(SearchUpdateReplyFoundJob.class);
_peer = peer.getIdentity().getHash(); _peer = peer.getIdentity().getHash();
@ -37,7 +39,8 @@ class SearchUpdateReplyFoundJob extends JobImpl implements ReplyJob {
public String getName() { return "Update Reply Found for Kademlia Search"; } public String getName() { return "Update Reply Found for Kademlia Search"; }
public void runJob() { public void runJob() {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Reply from " + _peer + " with message " + _message.getClass().getName()); _log.info(getJobId() + ": Reply from " + _peer.toBase64()
+ " with message " + _message.getClass().getName());
if (_message instanceof DatabaseStoreMessage) { if (_message instanceof DatabaseStoreMessage) {
long timeToReply = _state.dataFound(_peer); long timeToReply = _state.dataFound(_peer);
@ -47,7 +50,9 @@ class SearchUpdateReplyFoundJob extends JobImpl implements ReplyJob {
_facade.store(msg.getKey(), msg.getLeaseSet()); _facade.store(msg.getKey(), msg.getLeaseSet());
} else if (msg.getValueType() == DatabaseStoreMessage.KEY_TYPE_ROUTERINFO) { } else if (msg.getValueType() == DatabaseStoreMessage.KEY_TYPE_ROUTERINFO) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": dbStore received on search containing router " + msg.getKey() + " with publishDate of " + new Date(msg.getRouterInfo().getPublished())); _log.info(getJobId() + ": dbStore received on search containing router "
+ msg.getKey() + " with publishDate of "
+ new Date(msg.getRouterInfo().getPublished()));
_facade.store(msg.getKey(), msg.getRouterInfo()); _facade.store(msg.getKey(), msg.getRouterInfo());
} else { } else {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))

View File

@ -81,11 +81,20 @@ class StoreJob extends JobImpl {
*/ */
public StoreJob(RouterContext context, KademliaNetworkDatabaseFacade facade, Hash key, public StoreJob(RouterContext context, KademliaNetworkDatabaseFacade facade, Hash key,
DataStructure data, Job onSuccess, Job onFailure, long timeoutMs) { DataStructure data, Job onSuccess, Job onFailure, long timeoutMs) {
this(context, facade, key, data, onSuccess, onFailure, timeoutMs, null);
}
/**
* @param toSkip set of peer hashes of people we dont want to send the data to (e.g. we
* already know they have it). This can be null.
*/
public StoreJob(RouterContext context, KademliaNetworkDatabaseFacade facade, Hash key,
DataStructure data, Job onSuccess, Job onFailure, long timeoutMs, Set toSkip) {
super(context); super(context);
_log = context.logManager().getLog(StoreJob.class); _log = context.logManager().getLog(StoreJob.class);
_context.statManager().createRateStat("netDb.storeSent", "How many netDb store messages have we sent?", "Network Database", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); _context.statManager().createRateStat("netDb.storeSent", "How many netDb store messages have we sent?", "Network Database", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
_facade = facade; _facade = facade;
_state = new StoreState(key, data); _state = new StoreState(key, data, toSkip);
_onSuccess = onSuccess; _onSuccess = onSuccess;
_onFailure = onFailure; _onFailure = onFailure;
_timeoutMs = timeoutMs; _timeoutMs = timeoutMs;
@ -385,11 +394,16 @@ class StoreJob extends JobImpl {
private volatile long _started; private volatile long _started;
public StoreState(Hash key, DataStructure data) { public StoreState(Hash key, DataStructure data) {
this(key, data, null);
}
public StoreState(Hash key, DataStructure data, Set toSkip) {
_key = key; _key = key;
_data = data; _data = data;
_pendingPeers = new HashSet(16); _pendingPeers = new HashSet(16);
_pendingPeerTimes = new HashMap(16); _pendingPeerTimes = new HashMap(16);
_attemptedPeers = new HashSet(16); _attemptedPeers = new HashSet(16);
if (toSkip != null)
_attemptedPeers.addAll(toSkip);
_failedPeers = new HashSet(16); _failedPeers = new HashSet(16);
_successfulPeers = new HashSet(16); _successfulPeers = new HashSet(16);
_successfulExploratoryPeers = new HashSet(16); _successfulExploratoryPeers = new HashSet(16);