diff --git a/router/java/src/net/i2p/router/message/OutboundClientMessageJob.java b/router/java/src/net/i2p/router/message/OutboundClientMessageJob.java index ac488ed58..638d65d55 100644 --- a/router/java/src/net/i2p/router/message/OutboundClientMessageJob.java +++ b/router/java/src/net/i2p/router/message/OutboundClientMessageJob.java @@ -640,11 +640,12 @@ public class OutboundClientMessageJob extends JobImpl { long sendTime = _context.clock().now() - _status.getStart(); boolean alreadySuccessful = _status.success(); MessageId msgId = _status.getMessage().getMessageId(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug(OutboundClientMessageJob.this.getJobId() - + ": SUCCESS! Message delivered completely for message " + msgId - + " after " + sendTime + "ms [for " - + _status.getMessage().getMessageId() + "]"); + if (_log.shouldLog(Log.INFO)) + _log.info(OutboundClientMessageJob.this.getJobId() + + ": SUCCESS! msg " + msgId + + " sent after " + sendTime + "ms after " + + _status.getNumLookups() + " lookups and " + + _status.getNumSent() + " sends"); if ( (_key != null) && (_tags != null) && (_tags.size() > 0) ) { _context.sessionKeyManager().tagsDelivered(_status.getLeaseSet().getEncryptionKey(), diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java index d5e2cc302..cfddf3a6a 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java @@ -57,7 +57,19 @@ class SearchJob extends JobImpl { private static final int SEARCH_BREDTH = 3; // 3 peers at a time private static final int SEARCH_PRIORITY = 400; // large because the search is probably for a real search - private static final long PER_PEER_TIMEOUT = 30*1000; + /** + * How long will we give each peer to reply to our search? + * + */ + private static final long PER_PEER_TIMEOUT = 10*1000; + + /** + * give ourselves 30 seconds to send out the value found to the closest + * peers /after/ we get a successful match. If this fails, no biggie, but + * this'll help heal the network so subsequent searches will find the data. + * + */ + private static final long RESEND_TIMEOUT = 30*1000; /** * Create a new search for the routingKey specified @@ -103,8 +115,8 @@ class SearchJob extends JobImpl { */ protected void searchNext() { if (_state.completed()) { - if (_log.shouldLog(Log.INFO)) - _log.info(getJobId() + ": Already completed"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getJobId() + ": Already completed"); return; } if (_log.shouldLog(Log.INFO)) @@ -155,8 +167,7 @@ class SearchJob extends JobImpl { // too many already pending if (_log.shouldLog(Log.INFO)) _log.info(getJobId() + ": Too many searches already pending (pending: " - + _state.getPending().size() + " max: " + getBredth() + ")", - new Exception("too many pending")); + + _state.getPending().size() + " max: " + getBredth() + ")"); requeuePending(); return; } @@ -166,16 +177,14 @@ class SearchJob extends JobImpl { // we tried to find some peers, but there weren't any and no one else is going to answer if (_log.shouldLog(Log.INFO)) _log.info(getJobId() + ": No peers left, and none pending! Already searched: " - + _state.getAttempted().size() + " failed: " + _state.getFailed().size(), - new Exception("none left")); + + _state.getAttempted().size() + " failed: " + _state.getFailed().size()); fail(); } else { // no more to try, but we might get data or close peers from some outstanding requests if (_log.shouldLog(Log.INFO)) _log.info(getJobId() + ": No peers left, but some are pending! Pending: " + _state.getPending().size() + " attempted: " + _state.getAttempted().size() - + " failed: " + _state.getFailed().size(), - new Exception("none left, but pending")); + + " failed: " + _state.getFailed().size()); requeuePending(); return; } @@ -187,7 +196,7 @@ class SearchJob extends JobImpl { if ( (ds == null) || !(ds instanceof RouterInfo) ) { if (_log.shouldLog(Log.WARN)) _log.warn(getJobId() + ": Error selecting closest hash that wasnt a router! " - + peer + " : " + ds); + + peer + " : " + (ds == null ? "null" : ds.getClass().getName())); } else { sendSearch((RouterInfo)ds); } @@ -237,7 +246,7 @@ class SearchJob extends JobImpl { return; } else { if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": Send search to " + router); + _log.debug(getJobId() + ": Send search to " + router.getIdentity().getHash().toBase64()); } if (_isLease || false) // moo @@ -424,8 +433,8 @@ class SearchJob extends JobImpl { if (_state.wasAttempted(ri.getIdentity().getHash())) { _duplicatePeers++; } - if (_log.shouldLog(Log.INFO)) - _log.info(getJobId() + ": dbSearchReply received on search containing router " + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getJobId() + ": dbSearchReply received on search containing router " + ri.getIdentity().getHash() + " with publishDate of " + new Date(ri.getPublished())); _facade.store(ri.getIdentity().getHash(), ri); @@ -435,8 +444,7 @@ class SearchJob extends JobImpl { _seenPeers++; } else { if (_log.shouldLog(Log.ERROR)) - _log.error(getJobId() + ": Received an invalid peer from " + _peer + ": " - + ri, new Exception("Invalid peer")); + _log.error(getJobId() + ": Received an invalid peer from " + _peer + ": " + ri); _invalidPeers++; } _curIndex++; @@ -485,9 +493,10 @@ class SearchJob extends JobImpl { /** * Search was totally successful */ - protected void succeed() { + private void succeed() { if (_log.shouldLog(Log.INFO)) - _log.info(getJobId() + ": Succeeded search for key " + _state.getTarget()); + _log.info(getJobId() + ": Succeeded search for key " + _state.getTarget() + + " after querying " + _state.getAttempted().size()); if (_log.shouldLog(Log.DEBUG)) _log.debug(getJobId() + ": State of successful search: " + _state); @@ -498,6 +507,23 @@ class SearchJob extends JobImpl { } if (_onSuccess != null) _context.jobQueue().addJob(_onSuccess); + + resend(); + } + + /** + * After we get the data we were searching for, rebroadcast it to the peers + * we would query first if we were to search for it again (healing the network). + * + */ + private void resend() { + DataStructure ds = _facade.lookupLeaseSetLocally(_state.getTarget()); + if (ds == null) + ds = _facade.lookupRouterInfoLocally(_state.getTarget()); + if (ds != null) + _context.jobQueue().addJob(new StoreJob(_context, _facade, _state.getTarget(), + ds, null, null, RESEND_TIMEOUT, + _state.getSuccessful())); } /** diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/SearchMessageSelector.java b/router/java/src/net/i2p/router/networkdb/kademlia/SearchMessageSelector.java index 8dce57170..d72589f02 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/SearchMessageSelector.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/SearchMessageSelector.java @@ -36,12 +36,16 @@ class SearchMessageSelector implements MessageSelector { _log.debug("[" + _id + "] Created: " + toString()); } - public String toString() { return "Search selector [" + _id + "] looking for a reply from " + _peer + " with regards to " + _state.getTarget(); } + public String toString() { + return "Search selector [" + _id + "] looking for a reply from " + _peer + + " with regards to " + _state.getTarget(); + } public boolean continueMatching() { if (_found) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("[" + _id + "] Dont continue matching! looking for a reply from " + _peer + " with regards to " + _state.getTarget()); + _log.debug("[" + _id + "] Dont continue matching! looking for a reply from " + + _peer + " with regards to " + _state.getTarget()); return false; } long now = _context.clock().now(); @@ -50,12 +54,16 @@ class SearchMessageSelector implements MessageSelector { public long getExpiration() { return _exp; } public boolean isMatch(I2NPMessage message) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("[" + _id + "] isMatch("+message.getClass().getName() + ") [want dbStore or dbSearchReply from " + _peer + " for " + _state.getTarget() + "]"); + _log.debug("[" + _id + "] isMatch("+message.getClass().getName() + + ") [want dbStore or dbSearchReply from " + _peer + + " for " + _state.getTarget() + "]"); if (message instanceof DatabaseStoreMessage) { DatabaseStoreMessage msg = (DatabaseStoreMessage)message; if (msg.getKey().equals(_state.getTarget())) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("[" + _id + "] Was a DBStore of the key we're looking for. May not have been from who we're checking against though, but DBStore doesn't include that info"); + _log.debug("[" + _id + "] Was a DBStore of the key we're looking for. " + + "May not have been from who we're checking against though, " + + "but DBStore doesn't include that info"); _found = true; return true; } else { @@ -68,17 +76,20 @@ class SearchMessageSelector implements MessageSelector { if (_peer.equals(msg.getFromHash())) { if (msg.getSearchKey().equals(_state.getTarget())) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("[" + _id + "] Was a DBSearchReply from who we're checking with for a key we're looking for"); + _log.debug("[" + _id + "] Was a DBSearchReply from who we're " + + "checking with for a key we're looking for"); _found = true; return true; } else { if (_log.shouldLog(Log.DEBUG)) - _log.debug("[" + _id + "] Was a DBSearchReply from who we're checking with but NOT for the key we're looking for"); + _log.debug("[" + _id + "] Was a DBSearchReply from who we're checking " + + "with but NOT for the key we're looking for"); return false; } } else { if (_log.shouldLog(Log.DEBUG)) - _log.debug("[" + _id + "] DBSearchReply from someone we are not checking with [" + msg.getFromHash() + ", not " + _state.getTarget() + "]"); + _log.debug("[" + _id + "] DBSearchReply from someone we are not checking with [" + + msg.getFromHash() + ", not " + _state.getTarget() + "]"); return false; } } else { diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/SearchState.java b/router/java/src/net/i2p/router/networkdb/kademlia/SearchState.java index fd3ad4774..cf28d09a0 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/SearchState.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/SearchState.java @@ -129,29 +129,33 @@ class SearchState { buf.append(" completed? false "); else buf.append(" completed on ").append(new Date(_completed)); - buf.append(" Attempted: "); + buf.append("\n\tAttempted: "); synchronized (_attemptedPeers) { + buf.append(_attemptedPeers.size()).append(' '); for (Iterator iter = _attemptedPeers.iterator(); iter.hasNext(); ) { Hash peer = (Hash)iter.next(); buf.append(peer.toBase64()).append(" "); } } - buf.append(" Pending: "); + buf.append("\n\tPending: "); synchronized (_pendingPeers) { + buf.append(_pendingPeers.size()).append(' '); for (Iterator iter = _pendingPeers.iterator(); iter.hasNext(); ) { Hash peer = (Hash)iter.next(); buf.append(peer.toBase64()).append(" "); } } - buf.append(" Failed: "); + buf.append("\n\tFailed: "); synchronized (_failedPeers) { + buf.append(_failedPeers.size()).append(' '); for (Iterator iter = _failedPeers.iterator(); iter.hasNext(); ) { Hash peer = (Hash)iter.next(); buf.append(peer.toBase64()).append(" "); } } - buf.append(" Successful: "); + buf.append("\n\tSuccessful: "); synchronized (_successfulPeers) { + buf.append(_successfulPeers.size()).append(' '); for (Iterator iter = _successfulPeers.iterator(); iter.hasNext(); ) { Hash peer = (Hash)iter.next(); buf.append(peer.toBase64()).append(" "); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/SearchUpdateReplyFoundJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/SearchUpdateReplyFoundJob.java index 6317d78ed..bfa2a718a 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/SearchUpdateReplyFoundJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/SearchUpdateReplyFoundJob.java @@ -25,7 +25,9 @@ class SearchUpdateReplyFoundJob extends JobImpl implements ReplyJob { private KademliaNetworkDatabaseFacade _facade; private SearchJob _job; - public SearchUpdateReplyFoundJob(RouterContext context, RouterInfo peer, SearchState state, KademliaNetworkDatabaseFacade facade, SearchJob job) { + public SearchUpdateReplyFoundJob(RouterContext context, RouterInfo peer, + SearchState state, KademliaNetworkDatabaseFacade facade, + SearchJob job) { super(context); _log = context.logManager().getLog(SearchUpdateReplyFoundJob.class); _peer = peer.getIdentity().getHash(); @@ -37,7 +39,8 @@ class SearchUpdateReplyFoundJob extends JobImpl implements ReplyJob { public String getName() { return "Update Reply Found for Kademlia Search"; } public void runJob() { if (_log.shouldLog(Log.INFO)) - _log.info(getJobId() + ": Reply from " + _peer + " with message " + _message.getClass().getName()); + _log.info(getJobId() + ": Reply from " + _peer.toBase64() + + " with message " + _message.getClass().getName()); if (_message instanceof DatabaseStoreMessage) { long timeToReply = _state.dataFound(_peer); @@ -47,7 +50,9 @@ class SearchUpdateReplyFoundJob extends JobImpl implements ReplyJob { _facade.store(msg.getKey(), msg.getLeaseSet()); } else if (msg.getValueType() == DatabaseStoreMessage.KEY_TYPE_ROUTERINFO) { if (_log.shouldLog(Log.INFO)) - _log.info(getJobId() + ": dbStore received on search containing router " + msg.getKey() + " with publishDate of " + new Date(msg.getRouterInfo().getPublished())); + _log.info(getJobId() + ": dbStore received on search containing router " + + msg.getKey() + " with publishDate of " + + new Date(msg.getRouterInfo().getPublished())); _facade.store(msg.getKey(), msg.getRouterInfo()); } else { if (_log.shouldLog(Log.ERROR)) diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java index 5dce76e07..d41b8e24d 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java @@ -81,11 +81,20 @@ class StoreJob extends JobImpl { */ public StoreJob(RouterContext context, KademliaNetworkDatabaseFacade facade, Hash key, DataStructure data, Job onSuccess, Job onFailure, long timeoutMs) { + this(context, facade, key, data, onSuccess, onFailure, timeoutMs, null); + } + + /** + * @param toSkip set of peer hashes of people we dont want to send the data to (e.g. we + * already know they have it). This can be null. + */ + public StoreJob(RouterContext context, KademliaNetworkDatabaseFacade facade, Hash key, + DataStructure data, Job onSuccess, Job onFailure, long timeoutMs, Set toSkip) { super(context); _log = context.logManager().getLog(StoreJob.class); _context.statManager().createRateStat("netDb.storeSent", "How many netDb store messages have we sent?", "Network Database", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); _facade = facade; - _state = new StoreState(key, data); + _state = new StoreState(key, data, toSkip); _onSuccess = onSuccess; _onFailure = onFailure; _timeoutMs = timeoutMs; @@ -385,11 +394,16 @@ class StoreJob extends JobImpl { private volatile long _started; public StoreState(Hash key, DataStructure data) { + this(key, data, null); + } + public StoreState(Hash key, DataStructure data, Set toSkip) { _key = key; _data = data; _pendingPeers = new HashSet(16); _pendingPeerTimes = new HashMap(16); _attemptedPeers = new HashSet(16); + if (toSkip != null) + _attemptedPeers.addAll(toSkip); _failedPeers = new HashSet(16); _successfulPeers = new HashSet(16); _successfulExploratoryPeers = new HashSet(16);