Snark DHT:

- Combine getPeers and announce into a single method, as we must announce to
     the closest from the getPeers, not the closest from the kbuckets
   - Stop getPeers when nothing closer is found
This commit is contained in:
zzz
2013-10-09 21:09:34 +00:00
parent c8843a736d
commit 380783c1ba
3 changed files with 65 additions and 25 deletions

View File

@ -634,25 +634,16 @@ public class TrackerClient implements Runnable {
numwant = 1; numwant = 1;
else else
numwant = _util.getMaxConnections(); numwant = _util.getMaxConnections();
Collection<Hash> hashes = dht.getPeers(snark.getInfoHash(), numwant, 5*60*1000); Collection<Hash> hashes = dht.getPeersAndAnnounce(snark.getInfoHash(), numwant, 5*60*1000, 1, 3*60*1000);
if (!hashes.isEmpty()) { if (!hashes.isEmpty()) {
runStarted = true; runStarted = true;
lastDHTAnnounce = _util.getContext().clock().now();
rv = hashes.size(); rv = hashes.size();
} else {
lastDHTAnnounce = 0;
} }
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Got " + hashes + " from DHT"); _log.info("Got " + hashes + " from DHT");
// announce ourselves while the token is still good
// FIXME this needs to be in its own thread
if (!stop) {
// announce only to the 1 closest
int good = dht.announce(snark.getInfoHash(), 1, 5*60*1000);
if (_log.shouldLog(Log.INFO))
_log.info("Sent " + good + " good announces to DHT");
if (good > 0)
lastDHTAnnounce = _util.getContext().clock().now();
else
lastDHTAnnounce = 0;
}
// now try these peers // now try these peers
if ((!stop) && !hashes.isEmpty()) { if ((!stop) && !hashes.isEmpty()) {

View File

@ -36,16 +36,18 @@ public interface DHT {
public void ping(Destination dest, int port); public void ping(Destination dest, int port);
/** /**
* Get peers for a torrent. * Get peers for a torrent, and announce to the closest node we find.
* Blocking! * Blocking!
* Caller should run in a thread. * Caller should run in a thread.
* *
* @param ih the Info Hash (torrent) * @param ih the Info Hash (torrent)
* @param max maximum number of peers to return * @param max maximum number of peers to return
* @param maxWait the maximum time to wait (ms) must be > 0 * @param maxWait the maximum time to wait (ms) must be > 0
* @param annMax the number of peers to announce to
* @param maxWait the maximum total time to wait for announces, may be 0 to return immediately without waiting for acks
* @return possibly empty (never null) * @return possibly empty (never null)
*/ */
public Collection<Hash> getPeers(byte[] ih, int max, long maxWait); public Collection<Hash> getPeersAndAnnounce(byte[] ih, int max, long maxWait, int annMax, long annMaxWait);
/** /**
* Announce to ourselves. * Announce to ourselves.

View File

@ -308,7 +308,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
} }
/** /**
* Get peers for a torrent. * Get peers for a torrent, and announce to the closest node we find.
* This is an iterative lookup in the DHT. * This is an iterative lookup in the DHT.
* Blocking! * Blocking!
* Caller should run in a thread. * Caller should run in a thread.
@ -316,9 +316,11 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
* @param ih the Info Hash (torrent) * @param ih the Info Hash (torrent)
* @param max maximum number of peers to return * @param max maximum number of peers to return
* @param maxWait the maximum time to wait (ms) must be > 0 * @param maxWait the maximum time to wait (ms) must be > 0
* @param annMax the number of peers to announce to
* @param maxWait the maximum total time to wait for announces, may be 0 to return immediately without waiting for acks
* @return possibly empty (never null) * @return possibly empty (never null)
*/ */
public Collection<Hash> getPeers(byte[] ih, int max, long maxWait) { public Collection<Hash> getPeersAndAnnounce(byte[] ih, int max, long maxWait, int annMax, long annMaxWait) {
// check local tracker first // check local tracker first
InfoHash iHash = new InfoHash(ih); InfoHash iHash = new InfoHash(ih);
Collection<Hash> rv = _tracker.getPeers(iHash, max); Collection<Hash> rv = _tracker.getPeers(iHash, max);
@ -333,9 +335,11 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
int maxNodes = 30; int maxNodes = 30;
// Initial set to try, will get added to as we go // Initial set to try, will get added to as we go
List<NodeInfo> nodes = _knownNodes.findClosest(iHash, maxNodes); List<NodeInfo> nodes = _knownNodes.findClosest(iHash, maxNodes);
SortedSet<NodeInfo> toTry = new TreeSet(new NodeInfoComparator(iHash)); NodeInfoComparator comp = new NodeInfoComparator(iHash);
SortedSet<NodeInfo> toTry = new TreeSet(comp);
SortedSet<NodeInfo> heardFrom = new TreeSet(comp);
toTry.addAll(nodes); toTry.addAll(nodes);
Set<NodeInfo> tried = new HashSet(); SortedSet<NodeInfo> tried = new TreeSet(comp);
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Starting getPeers for " + iHash + " (b64: " + new NID(ih) + ") " + " with " + nodes.size() + " to try"); _log.info("Starting getPeers for " + iHash + " (b64: " + new NID(ih) + ") " + " with " + nodes.size() + " to try");
@ -372,20 +376,24 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Got pong"); _log.debug("Got pong");
} else if (replyType == REPLY_PEERS) { } else if (replyType == REPLY_PEERS) {
heardFrom.add(waiter.getSentTo());
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Got peers"); _log.debug("Got peers");
List<Hash> reply = (List<Hash>) waiter.getReplyObject(); List<Hash> reply = (List<Hash>) waiter.getReplyObject();
// shouldn't send us an empty peers list but through
// 0.9.8.1 it will
if (!reply.isEmpty()) { if (!reply.isEmpty()) {
for (int j = 0; j < reply.size() && rv.size() < max; j++) { for (int j = 0; j < reply.size() && rv.size() < max; j++) {
Hash h = reply.get(j); Hash h = reply.get(j);
if (!h.equals(_myNodeInfo.getHash())) if (!h.equals(_myNodeInfo.getHash()))
rv.add(h); rv.add(h);
} }
if (_log.shouldLog(Log.INFO))
_log.info("Finished get Peers, got " + reply.size() + " from DHT, returning " + rv.size());
return rv;
} }
if (_log.shouldLog(Log.INFO))
_log.info("Finished get Peers, got " + reply.size() + " from DHT, returning " + rv.size());
break;
} else if (replyType == REPLY_NODES) { } else if (replyType == REPLY_NODES) {
heardFrom.add(waiter.getSentTo());
List<NodeInfo> reply = (List<NodeInfo>) waiter.getReplyObject(); List<NodeInfo> reply = (List<NodeInfo>) waiter.getReplyObject();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Got " + reply.size() + " nodes"); _log.debug("Got " + reply.size() + " nodes");
@ -401,9 +409,45 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
} }
if (_context.clock().now() > endTime) if (_context.clock().now() > endTime)
break; break;
if (!toTry.isEmpty() && !heardFrom.isEmpty() &&
comp.compare(toTry.first(), heardFrom.first()) >= 0) {
if (_log.shouldLog(Log.INFO))
_log.info("Finished get Peers, nothing closer to try after " + (i+1));
break;
}
}
// now announce
if (!heardFrom.isEmpty()) {
announce(ih);
// announce to the closest we've heard from
int annCnt = 0;
long start = _context.clock().now();
for (Iterator<NodeInfo> iter = heardFrom.iterator(); iter.hasNext() && annCnt < annMax && _isRunning; ) {
NodeInfo annTo = iter.next();
if (_log.shouldLog(Log.INFO))
_log.info("Announcing to closest from get peers: " + annTo);
long toWait = annMaxWait > 0 ? Math.min(annMaxWait, 60*1000) : 0;
if (announce(ih, annTo, toWait))
annCnt++;
if (annMaxWait > 0) {
annMaxWait -= _context.clock().now() - start;
if (annMaxWait < 1000)
break;
}
}
} else {
// spray it, but unlikely to work, we just went through the kbuckets,
// so this is essentially just a retry
if (_log.shouldLog(Log.INFO))
_log.info("Announcing to closest in kbuckets after get peers failed");
announce(ih, annMax, annMaxWait);
}
if (_log.shouldLog(Log.INFO)) {
_log.info("Finished get Peers, returning " + rv.size());
_log.info("Tried: " + tried);
_log.info("Heard from: " + heardFrom);
_log.info("Not tried: " + toTry);
} }
if (_log.shouldLog(Log.INFO))
_log.info("Finished get Peers, " + rv.size() + " from local and none from DHT");
return rv; return rv;
} }
@ -445,13 +489,16 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
} }
/** /**
* Not recommended - use getPeersAndAnnounce().
*
* Announce to the closest peers in the local DHT. * Announce to the closest peers in the local DHT.
* This is NOT iterative - call getPeers() first to get the closest * This is NOT iterative - call getPeers() first to get the closest
* peers into the local DHT. * peers into the local DHT.
* Blocking unless maxWait <= 0 * Blocking unless maxWait <= 0
* Caller should run in a thread. * Caller should run in a thread.
* This also automatically announces ourself to our local tracker. * This also automatically announces ourself to our local tracker.
* For best results do a getPeers() first so we have tokens. * For best results do a getPeersAndAnnounce() instead, as this announces to
* the closest in the kbuckets, it does NOT sort through the known nodes hashmap.
* *
* @param ih the Info Hash (torrent) * @param ih the Info Hash (torrent)
* @param max maximum number of peers to announce to * @param max maximum number of peers to announce to