* i2psnark:

- Fix bug preventing completion announcement, broken in 0.9.1
   - Fix setting short retry time after initial announce failure
   - Fix DHT announce and getPeers
   - Fix DHT warning message
   - log tweaks
This commit is contained in:
zzz
2012-08-15 12:44:46 +00:00
parent 44edf70842
commit 523d39b3bb
6 changed files with 56 additions and 44 deletions

View File

@ -828,7 +828,7 @@ public class SnarkManager implements Snark.CompleteListener {
} else if (_util.shouldUseOpenTrackers() && _util.getOpenTrackers() != null) { } else if (_util.shouldUseOpenTrackers() && _util.getOpenTrackers() != null) {
addMessage(_("Warning - No I2P trackers in \"{0}\", will announce to I2P open trackers and DHT only.", info.getName())); addMessage(_("Warning - No I2P trackers in \"{0}\", will announce to I2P open trackers and DHT only.", info.getName()));
//addMessage(_("Warning - No I2P trackers in \"{0}\", will announce to I2P open trackers only.", info.getName())); //addMessage(_("Warning - No I2P trackers in \"{0}\", will announce to I2P open trackers only.", info.getName()));
} else if (_util.getDHT() != null) { } else if (_util.shouldUseDHT()) {
addMessage(_("Warning - No I2P trackers in \"{0}\", and open trackers are disabled, will announce to DHT only.", info.getName())); addMessage(_("Warning - No I2P trackers in \"{0}\", and open trackers are disabled, will announce to DHT only.", info.getName()));
} else { } else {
addMessage(_("Warning - No I2P trackers in \"{0}\", and DHT and open trackers are disabled, you should enable open trackers or DHT before starting the torrent.", info.getName())); addMessage(_("Warning - No I2P trackers in \"{0}\", and DHT and open trackers are disabled, you should enable open trackers or DHT before starting the torrent.", info.getName()));

View File

@ -27,6 +27,7 @@ import java.io.InputStream;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.URL; import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.Iterator; import java.util.Iterator;
@ -100,6 +101,7 @@ public class TrackerClient implements Runnable {
// these 2 used in loop() // these 2 used in loop()
private volatile boolean runStarted; private volatile boolean runStarted;
private volatile int consecutiveFails; private volatile int consecutiveFails;
private boolean completed;
private volatile boolean _fastUnannounce; private volatile boolean _fastUnannounce;
private final List<Tracker> trackers; private final List<Tracker> trackers;
@ -218,11 +220,16 @@ public class TrackerClient implements Runnable {
try { try {
if (!_initialized) { if (!_initialized) {
setup(); setup();
// FIXME dht }
if (trackers.isEmpty()) { if (trackers.isEmpty() && _util.getDHT() == null) {
stop = true; stop = true;
return; this.snark.addMessage(_util.getString("No valid trackers for {0} - enable opentrackers or DHT?",
} this.snark.getBaseName()));
_log.error("No valid trackers for " + this.snark.getBaseName());
this.snark.stopTorrent();
return;
}
if (!_initialized) {
_initialized = true; _initialized = true;
// FIXME only when starting everybody at once, not for a single torrent // FIXME only when starting everybody at once, not for a single torrent
long delay = I2PAppContext.getGlobalContext().random().nextInt(30*1000); long delay = I2PAppContext.getGlobalContext().random().nextInt(30*1000);
@ -296,15 +303,7 @@ public class TrackerClient implements Runnable {
_log.debug("Additional announce: [" + url + "] for infoHash: " + infoHash); _log.debug("Additional announce: [" + url + "] for infoHash: " + infoHash);
} }
} }
this.completed = coordinator.getLeft() == 0;
if (trackers.isEmpty() && _util.getDHT() == null) {
stop = true;
this.snark.addMessage(_util.getString("No valid trackers for {0} - enable opentrackers or DHT?",
this.snark.getBaseName()));
_log.error("No valid trackers for " + this.snark.getBaseName());
this.snark.stopTorrent();
return;
}
} }
/** /**
@ -331,7 +330,6 @@ public class TrackerClient implements Runnable {
long uploaded = coordinator.getUploaded(); long uploaded = coordinator.getUploaded();
long downloaded = coordinator.getDownloaded(); long downloaded = coordinator.getDownloaded();
long left = coordinator.getLeft(); // -1 in magnet mode long left = coordinator.getLeft(); // -1 in magnet mode
boolean completed = (left == 0);
// First time we got a complete download? // First time we got a complete download?
String event; String event;
@ -345,8 +343,7 @@ public class TrackerClient implements Runnable {
// *** loop once for each tracker // *** loop once for each tracker
int maxSeenPeers = 0; int maxSeenPeers = 0;
for (Iterator iter = trackers.iterator(); iter.hasNext(); ) { for (Tracker tr : trackers) {
Tracker tr = (Tracker)iter.next();
if ((!stop) && (!tr.stop) && if ((!stop) && (!tr.stop) &&
(completed || coordinator.needOutboundPeers() || !tr.started) && (completed || coordinator.needOutboundPeers() || !tr.started) &&
(event.equals(COMPLETED_EVENT) || System.currentTimeMillis() > tr.lastRequestTime + tr.interval)) (event.equals(COMPLETED_EVENT) || System.currentTimeMillis() > tr.lastRequestTime + tr.interval))
@ -468,7 +465,9 @@ public class TrackerClient implements Runnable {
numwant = 1; numwant = 1;
else else
numwant = _util.getMaxConnections(); numwant = _util.getMaxConnections();
List<Hash> hashes = dht.getPeers(snark.getInfoHash(), numwant, 2*60*1000); Collection<Hash> hashes = dht.getPeers(snark.getInfoHash(), numwant, 2*60*1000);
if (!hashes.isEmpty())
runStarted = true;
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Got " + hashes + " from DHT"); _log.info("Got " + hashes + " from DHT");
// announce ourselves while the token is still good // announce ourselves while the token is still good
@ -509,10 +508,6 @@ public class TrackerClient implements Runnable {
if (stop) if (stop)
return; return;
if (!runStarted)
if (_log.shouldLog(Log.DEBUG))
_log.debug(" Retrying in one minute...");
try { try {
// Sleep some minutes... // Sleep some minutes...
// Sleep the minimum interval for all the trackers, but 60s minimum // Sleep the minimum interval for all the trackers, but 60s minimum
@ -522,6 +517,8 @@ public class TrackerClient implements Runnable {
delay = 3*SLEEP*60*1000 + random; delay = 3*SLEEP*60*1000 + random;
else if (snark.getTrackerProblems() != null && ++consecutiveFails < MAX_CONSEC_FAILS) else if (snark.getTrackerProblems() != null && ++consecutiveFails < MAX_CONSEC_FAILS)
delay = INITIAL_SLEEP; delay = INITIAL_SLEEP;
else if ((!runStarted) && _runCount < MAX_CONSEC_FAILS)
delay = INITIAL_SLEEP;
else else
// sleep a while, when we wake up we will contact only the trackers whose intervals have passed // sleep a while, when we wake up we will contact only the trackers whose intervals have passed
delay = SLEEP*60*1000 + random; delay = SLEEP*60*1000 + random;

View File

@ -4,7 +4,7 @@ package org.klomp.snark.dht;
* GPLv2 * GPLv2
*/ */
import java.util.List; import java.util.Collection;
import net.i2p.data.Destination; import net.i2p.data.Destination;
import net.i2p.data.Hash; import net.i2p.data.Hash;
@ -42,9 +42,9 @@ public interface DHT {
* @param ih the Info Hash (torrent) * @param ih the Info Hash (torrent)
* @param max maximum number of peers to return * @param max maximum number of peers to return
* @param maxWait the maximum time to wait (ms) must be > 0 * @param maxWait the maximum time to wait (ms) must be > 0
* @return list or empty list (never null) * @return possibly empty (never null)
*/ */
public List<Hash> getPeers(byte[] ih, int max, long maxWait); public Collection<Hash> getPeers(byte[] ih, int max, long maxWait);
/** /**
* Announce to ourselves. * Announce to ourselves.

View File

@ -299,25 +299,28 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
* @param ih the Info Hash (torrent) * @param ih the Info Hash (torrent)
* @param max maximum number of peers to return * @param max maximum number of peers to return
* @param maxWait the maximum time to wait (ms) must be > 0 * @param maxWait the maximum time to wait (ms) must be > 0
* @return list or empty list (never null) * @return possibly empty (never null)
*/ */
public List<Hash> getPeers(byte[] ih, int max, long maxWait) { public Collection<Hash> getPeers(byte[] ih, int max, long maxWait) {
// check local tracker first // check local tracker first
InfoHash iHash = new InfoHash(ih); InfoHash iHash = new InfoHash(ih);
List<Hash> rv = _tracker.getPeers(iHash, max); Collection<Hash> rv = _tracker.getPeers(iHash, max);
rv.remove(_myNodeInfo.getHash()); rv.remove(_myNodeInfo.getHash());
if (!rv.isEmpty()) if (rv.size() >= max)
return rv; // TODO get DHT too? return rv;
rv = new HashSet(rv);
long endTime = _context.clock().now() + maxWait;
// Initial set to try, will get added to as we go // Initial set to try, will get added to as we go
List<NodeInfo> nodes = _knownNodes.findClosest(iHash, max); int maxNodes = 12;
List<NodeInfo> nodes = _knownNodes.findClosest(iHash, maxNodes);
SortedSet<NodeInfo> toTry = new TreeSet(new NodeInfoComparator(iHash)); SortedSet<NodeInfo> toTry = new TreeSet(new NodeInfoComparator(iHash));
toTry.addAll(nodes); toTry.addAll(nodes);
Set<NodeInfo> tried = new HashSet(); Set<NodeInfo> tried = new HashSet();
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Starting getPeers with " + nodes.size() + " to try"); _log.info("Starting getPeers for " + iHash + " with " + nodes.size() + " to try");
for (int i = 0; i < max; i++) { for (int i = 0; i < maxNodes; i++) {
if (!_isRunning) if (!_isRunning)
break; break;
NodeInfo nInfo; NodeInfo nInfo;
@ -334,7 +337,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
continue; continue;
synchronized(waiter) { synchronized(waiter) {
try { try {
waiter.wait(maxWait); waiter.wait(Math.max(20*1000, (Math.min(40*1000, endTime - _context.clock().now()))));
} catch (InterruptedException ie) {} } catch (InterruptedException ie) {}
} }
@ -350,9 +353,12 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
_log.info("Got peers"); _log.info("Got peers");
List<Hash> reply = (List<Hash>) waiter.getReplyObject(); List<Hash> reply = (List<Hash>) waiter.getReplyObject();
if (!reply.isEmpty()) { if (!reply.isEmpty()) {
for (int j = 0; j < reply.size() && rv.size() < max; j++) {
rv.add(reply.get(j));
}
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Finished get Peers, returning " + reply.size()); _log.info("Finished get Peers, got " + rv.size() + " from DHT, returning " + reply.size());
return reply; return rv;
} }
} else if (replyType == REPLY_NODES) { } else if (replyType == REPLY_NODES) {
List<NodeInfo> reply = (List<NodeInfo>) waiter.getReplyObject(); List<NodeInfo> reply = (List<NodeInfo>) waiter.getReplyObject();
@ -366,10 +372,12 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Got unexpected reply " + replyType + ": " + waiter.getReplyObject()); _log.info("Got unexpected reply " + replyType + ": " + waiter.getReplyObject());
} }
if (_context.clock().now() > endTime)
break;
} }
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Finished get Peers, fail"); _log.info("Finished get Peers, " + rv.size() + " from local and none from DHT");
return Collections.EMPTY_LIST; return rv;
} }
/** /**
@ -465,7 +473,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
if (maxWait <= 0) if (maxWait <= 0)
return false; return false;
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("No token for announce to " + nInfo + " sending get_peers first"); _log.info("No token for announce to " + nInfo + ", sending get_peers first");
ReplyWaiter waiter = sendGetPeers(nInfo, iHash); ReplyWaiter waiter = sendGetPeers(nInfo, iHash);
if (waiter == null) if (waiter == null)
return false; return false;
@ -478,20 +486,20 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
int replyType = waiter.getReplyCode(); int replyType = waiter.getReplyCode();
if (!(replyType == REPLY_PEERS || replyType == REPLY_NODES)) { if (!(replyType == REPLY_PEERS || replyType == REPLY_NODES)) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Get_peers failed to " + nInfo); _log.info("Get_peers in announce() failed to " + nInfo);
return false; return false;
} }
// we should have a token now // we should have a token now
token = _incomingTokens.get(nInfo.getNID()); token = _incomingTokens.get(nInfo.getNID());
if (token == null) { if (token == null) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Huh? no token after get_peers succeeded to " + nInfo); _log.info("Huh? no token after get_peers in announce() succeeded to " + nInfo);
return false; return false;
} }
maxWait -= _context.clock().now() - start; maxWait -= _context.clock().now() - start;
if (maxWait < 1000) { if (maxWait < 1000) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Ran out of time after get_peers succeeded to " + nInfo); _log.info("Ran out of time after get_peers in announce() succeeded to " + nInfo);
return false; return false;
} }
} }

View File

@ -1,3 +1,10 @@
2012-08-15 zzz
* i2psnark:
- Fix bug preventing completion announcement, broken in 0.9.1
- Fix setting short retry time after initial announce failure
- Fix DHT announce and getPeers
- Fix DHT warning message
2012-08-13 zzz 2012-08-13 zzz
* SSU EstablishmentManager: * SSU EstablishmentManager:
- Remove use of outbound timers in EstablishmentManager; drive all events in Establisher thread - Remove use of outbound timers in EstablishmentManager; drive all events in Establisher thread

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */ /** deprecated */
public final static String ID = "Monotone"; public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION; public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 9; public final static long BUILD = 10;
/** for example "-test" */ /** for example "-test" */
public final static String EXTRA = ""; public final static String EXTRA = "";