* i2psnark:

- Store seed/leech status in DHT tracker (ticket #1280)
   - Increase max received DHT nodes (Vuze sends more than K)
   - Recognize not-registered message from diftracker
   - Fix bug in DHT unannounce()
This commit is contained in:
zzz
2014-06-01 17:13:00 +00:00
parent 633b71ba19
commit b97a53177e
8 changed files with 103 additions and 44 deletions

View File

@ -221,7 +221,8 @@ class PeerCheckerTask implements Runnable
peer.keepAlive();
// announce them to local tracker (TrackerClient does this too)
if (dht != null && (_runCount % 5) == 0) {
dht.announce(coordinator.getInfoHash(), peer.getPeerID().getDestHash());
dht.announce(coordinator.getInfoHash(), peer.getPeerID().getDestHash(),
peer.isCompleted());
}
}
@ -270,7 +271,7 @@ class PeerCheckerTask implements Runnable
// announce ourselves to local tracker (TrackerClient does this too)
if (dht != null && (_runCount % 16) == 0) {
dht.announce(coordinator.getInfoHash());
dht.announce(coordinator.getInfoHash(), coordinator.completed());
}
}
}

View File

@ -71,6 +71,7 @@ public class TrackerClient implements Runnable {
private static final String COMPLETED_EVENT = "completed";
private static final String STOPPED_EVENT = "stopped";
private static final String NOT_REGISTERED = "torrent not registered"; //bytemonsoon
private static final String NOT_REGISTERED_2 = "torrent not found"; // diftracker
/** this is our equivalent to router.utorrent.com for bootstrap */
private static final String DEFAULT_BACKUP_TRACKER = "http://tracker.welterde.i2p/a";
@ -109,6 +110,8 @@ public class TrackerClient implements Runnable {
// these 2 used in loop()
private volatile boolean runStarted;
private volatile int consecutiveFails;
// if we don't want anything else.
// Not necessarily seeding, as we may have skipped some files.
private boolean completed;
private volatile boolean _fastUnannounce;
private long lastDHTAnnounce;
@ -391,7 +394,7 @@ public class TrackerClient implements Runnable {
// Local DHT tracker announce
DHT dht = _util.getDHT();
if (dht != null && (meta == null || !meta.isPrivate()))
dht.announce(snark.getInfoHash());
dht.announce(snark.getInfoHash(), coordinator.completed());
int oldSeenPeers = snark.getTrackerSeenPeers();
int maxSeenPeers = 0;
@ -539,7 +542,8 @@ public class TrackerClient implements Runnable {
DHT dht = _util.getDHT();
if (dht != null) {
for (Peer peer : peers) {
dht.announce(snark.getInfoHash(), peer.getPeerID().getDestHash());
dht.announce(snark.getInfoHash(), peer.getPeerID().getDestHash(),
false); // TODO actual seed/leech status
}
}
@ -572,13 +576,15 @@ public class TrackerClient implements Runnable {
// don't show secondary tracker problems to the user
if (tr.isPrimary)
snark.setTrackerProblems(tr.trackerProblems);
if (tr.trackerProblems.toLowerCase(Locale.US).startsWith(NOT_REGISTERED)) {
String tplc = tr.trackerProblems.toLowerCase(Locale.US);
if (tplc.startsWith(NOT_REGISTERED) || tplc.startsWith(NOT_REGISTERED_2)) {
// Give a guy some time to register it if using opentrackers too
//if (trckrs.size() == 1) {
// stop = true;
// snark.stopTorrent();
//} else { // hopefully each on the opentrackers list is really open
if (tr.registerFails++ > MAX_REGISTER_FAILS)
if (tr.registerFails++ > MAX_REGISTER_FAILS ||
(!tr.isPrimary && tr.registerFails > MAX_REGISTER_FAILS / 2))
tr.stop = true;
//
}
@ -654,7 +660,7 @@ public class TrackerClient implements Runnable {
numwant = _util.getMaxConnections();
Collection<Hash> hashes = dht.getPeersAndAnnounce(snark.getInfoHash(), numwant,
5*60*1000, DHT_ANNOUNCE_PEERS, 3*60*1000,
coordinator.completed());
coordinator.completed(), numwant <= 1);
if (!hashes.isEmpty()) {
runStarted = true;
lastDHTAnnounce = _util.getContext().clock().now();

View File

@ -46,11 +46,12 @@ public interface DHT {
* @param annMax the number of peers to announce to
* @param annMaxWait the maximum total time to wait for announces, may be 0 to return immediately without waiting for acks
* @param isSeed true if seed, false if leech
* @param noSeeds true if we do not want seeds in the result
* @return possibly empty (never null)
*/
public Collection<Hash> getPeersAndAnnounce(byte[] ih, int max, long maxWait,
int annMax, long annMaxWait,
boolean isSeed);
boolean isSeed, boolean noSeeds);
/**
* Announce to ourselves.
@ -58,7 +59,7 @@ public interface DHT {
*
* @param ih the Info Hash (torrent)
*/
public void announce(byte[] ih);
public void announce(byte[] ih, boolean isSeed);
/**
* Announce somebody else we know about to ourselves.
@ -67,7 +68,7 @@ public interface DHT {
* @param ih the Info Hash (torrent)
* @param peerHash the peer's Hash
*/
public void announce(byte[] ih, byte[] peerHash);
public void announce(byte[] ih, byte[] peerHash, boolean isSeed);
/**
* Remove reference to ourselves in the local tracker.

View File

@ -60,7 +60,7 @@ class DHTTracker {
_isRunning = false;
}
void announce(InfoHash ih, Hash hash) {
void announce(InfoHash ih, Hash hash, boolean isSeed) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Announce " + hash + " for " + ih);
Peers peers = _torrents.get(ih);
@ -79,6 +79,9 @@ class DHTTracker {
if (peer2 != null)
peer = peer2;
peer.setLastSeen(_context.clock().now());
// don't let false trump true, as not all sources know the seed status
if (isSeed)
peer.setSeed(true);
} else {
// We could update setLastSeen if he is already
// in there, but that would tend to keep
@ -94,26 +97,42 @@ class DHTTracker {
Peers peers = _torrents.get(ih);
if (peers == null)
return;
Peer peer = new Peer(hash.getData());
peers.remove(peer);
peers.remove(hash);
}
/**
* Caller's responsibility to remove himself from the list
*
* @param noSeeds true if we do not want seeds in the result
* @return list or empty list (never null)
*/
List<Hash> getPeers(InfoHash ih, int max) {
List<Hash> getPeers(InfoHash ih, int max, boolean noSeeds) {
Peers peers = _torrents.get(ih);
if (peers == null)
if (peers == null || max <= 0)
return Collections.emptyList();
int size = peers.size();
List<Hash> rv = new ArrayList<Hash>(peers.values());
if (max < size) {
Collections.shuffle(rv, _context.random());
List<Peer> rv = new ArrayList<Peer>(peers.values());
int size = rv.size();
if (max < size)
Collections.shuffle(rv, _context.random());
if (noSeeds) {
int i = 0;
for (Iterator<Peer> iter = rv.iterator(); iter.hasNext(); ) {
if (iter.next().isSeed())
iter.remove();
else if (++i >= max)
break;
}
if (max < rv.size())
rv = rv.subList(0, max);
} else {
if (max < size)
rv = rv.subList(0, max);
}
return rv;
// a Peer is a Hash
List rv1 = rv;
List<Hash> rv2 = rv1;
return rv2;
}
/**

View File

@ -317,14 +317,15 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
* @param annMax the number of peers to announce to
* @param annMaxWait the maximum total time to wait for announces, may be 0 to return immediately without waiting for acks
* @param isSeed true if seed, false if leech
* @param noSeeds true if we do not want seeds in the result
* @return possibly empty (never null)
*/
public Collection<Hash> getPeersAndAnnounce(byte[] ih, int max, long maxWait,
int annMax, long annMaxWait,
boolean isSeed) {
boolean isSeed, boolean noSeeds) {
// check local tracker first
InfoHash iHash = new InfoHash(ih);
Collection<Hash> rv = _tracker.getPeers(iHash, max);
Collection<Hash> rv = _tracker.getPeers(iHash, max, noSeeds);
rv.remove(_myNodeInfo.getHash());
if (rv.size() >= max)
return rv;
@ -360,7 +361,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Try " + i + ": " + nInfo);
ReplyWaiter waiter = sendGetPeers(nInfo, iHash);
ReplyWaiter waiter = sendGetPeers(nInfo, iHash, noSeeds);
if (waiter == null)
continue;
synchronized(waiter) {
@ -419,7 +420,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
}
// now announce
if (!heardFrom.isEmpty()) {
announce(ih);
announce(ih, isSeed);
// announce to the closest we've heard from
int annCnt = 0;
long start = _context.clock().now();
@ -458,9 +459,9 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
*
* @param ih the Info Hash (torrent)
*/
public void announce(byte[] ih) {
public void announce(byte[] ih, boolean isSeed) {
InfoHash iHash = new InfoHash(ih);
_tracker.announce(iHash, _myNodeInfo.getHash());
_tracker.announce(iHash, _myNodeInfo.getHash(), isSeed);
}
/**
@ -470,9 +471,9 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
* @param ih the Info Hash (torrent)
* @param peerHash the peer's Hash
*/
public void announce(byte[] ih, byte[] peerHash) {
public void announce(byte[] ih, byte[] peerHash, boolean isSeed) {
InfoHash iHash = new InfoHash(ih);
_tracker.announce(iHash, new Hash(peerHash));
_tracker.announce(iHash, new Hash(peerHash), isSeed);
// Do NOT do this, corrupts the Hash cache and the Peer ID
//_tracker.announce(iHash, Hash.create(peerHash));
}
@ -508,7 +509,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
* @return the number of successful announces, not counting ourselves.
*/
public int announce(byte[] ih, int max, long maxWait, boolean isSeed) {
announce(ih);
announce(ih, isSeed);
int rv = 0;
long start = _context.clock().now();
InfoHash iHash = new InfoHash(ih);
@ -555,7 +556,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
return false;
if (_log.shouldLog(Log.INFO))
_log.info("No token for announce to " + nInfo + ", sending get_peers first");
ReplyWaiter waiter = sendGetPeers(nInfo, iHash);
ReplyWaiter waiter = sendGetPeers(nInfo, iHash, false);
if (waiter == null)
return false;
long start = _context.clock().now();
@ -728,15 +729,18 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
* Blocking if we have to look up the dest for the nodeinfo
*
* @param nInfo who to send it to
* @param noSeeds true if we do not want seeds in the result
* @return null on error
*/
private ReplyWaiter sendGetPeers(NodeInfo nInfo, InfoHash ih) {
private ReplyWaiter sendGetPeers(NodeInfo nInfo, InfoHash ih, boolean noSeeds) {
if (_log.shouldLog(Log.INFO))
_log.info("Sending get peers of " + ih + " to: " + nInfo);
_log.info("Sending get peers of " + ih + " to: " + nInfo + " noseeds? " + noSeeds);
Map<String, Object> map = new HashMap<String, Object>();
map.put("q", "get_peers");
Map<String, Object> args = new HashMap<String, Object>();
args.put("info_hash", ih.getData());
if (noSeeds)
args.put("noseed", Integer.valueOf(1));
map.put("a", args);
ReplyWaiter rv = sendQuery(nInfo, map, true);
// save the InfoHash so we can get it later
@ -754,7 +758,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
*/
private ReplyWaiter sendAnnouncePeer(NodeInfo nInfo, InfoHash ih, Token token, boolean isSeed) {
if (_log.shouldLog(Log.INFO))
_log.info("Sending announce of " + ih + " to: " + nInfo);
_log.info("Sending announce of " + ih + " to: " + nInfo + " seed? " + isSeed);
Map<String, Object> map = new HashMap<String, Object>();
map.put("q", "announce_peer");
Map<String, Object> args = new HashMap<String, Object>();
@ -1126,14 +1130,22 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
} else if (method.equals("get_peers")) {
byte[] hash = args.get("info_hash").getBytes();
InfoHash ih = new InfoHash(hash);
receiveGetPeers(msgID, nInfo, ih);
boolean noSeeds = false;
BEValue nos = args.get("noseed");
if (nos != null)
noSeeds = nos.getInt() == 1;
receiveGetPeers(msgID, nInfo, ih, noSeeds);
} else if (method.equals("announce_peer")) {
byte[] hash = args.get("info_hash").getBytes();
InfoHash ih = new InfoHash(hash);
// this is the "TCP" port, we don't care
//int port = args.get("port").getInt();
byte[] token = args.get("token").getBytes();
receiveAnnouncePeer(msgID, ih, token);
boolean isSeed = false;
BEValue iss = args.get("seed");
if (iss != null)
isSeed = iss.getInt() == 1;
receiveAnnouncePeer(msgID, ih, token, isSeed);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Unknown query method rcvd: " + method);
@ -1246,16 +1258,17 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
/**
* Handle and respond to the query
*/
private void receiveGetPeers(MsgID msgID, NodeInfo nInfo, InfoHash ih) throws InvalidBEncodingException {
private void receiveGetPeers(MsgID msgID, NodeInfo nInfo,
InfoHash ih, boolean noSeeds) throws InvalidBEncodingException {
if (_log.shouldLog(Log.INFO))
_log.info("Rcvd get_peers from: " + nInfo + " for: " + ih);
_log.info("Rcvd get_peers from: " + nInfo + " for: " + ih + " noseeds? " + noSeeds);
// generate and save random token
Token token = new Token(_context);
_outgoingTokens.put(token, nInfo);
if (_log.shouldLog(Log.INFO))
_log.info("Stored new OB token: " + token + " for: " + nInfo);
List<Hash> peers = _tracker.getPeers(ih, MAX_WANT);
List<Hash> peers = _tracker.getPeers(ih, MAX_WANT, noSeeds);
// Check this before removing him, so we don't needlessly send nodes
// if he's the only one on the torrent.
boolean noPeers = peers.isEmpty();
@ -1290,7 +1303,8 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
* We have no node info here, it came on response port, we have to get it from the token.
* So we can't verify that it came from the same peer, as BEP 5 specifies.
*/
private void receiveAnnouncePeer(MsgID msgID, InfoHash ih, byte[] tok) throws InvalidBEncodingException {
private void receiveAnnouncePeer(MsgID msgID, InfoHash ih,
byte[] tok, boolean isSeed) throws InvalidBEncodingException {
Token token = new Token(tok);
NodeInfo nInfo = _outgoingTokens.get(token);
if (nInfo == null) {
@ -1301,9 +1315,9 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
return;
}
if (_log.shouldLog(Log.INFO))
_log.info("Rcvd announce from: " + nInfo + " for: " + ih);
_log.info("Rcvd announce from: " + nInfo + " for: " + ih + " seed? " + isSeed);
_tracker.announce(ih, nInfo.getHash());
_tracker.announce(ih, nInfo.getHash(), isSeed);
// the reply for an announce is the same as the reply for a ping
sendPong(nInfo, msgID);
}
@ -1362,7 +1376,8 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
* @throws NPE, IllegalArgumentException, and others too
*/
private List<NodeInfo> receiveNodes(NodeInfo nInfo, byte[] ids) throws InvalidBEncodingException {
int max = Math.min(K, ids.length / NodeInfo.LENGTH);
// Azureus sends 20
int max = Math.min(3 * K, ids.length / NodeInfo.LENGTH);
List<NodeInfo> rv = new ArrayList<NodeInfo>(max);
for (int off = 0; off < ids.length && rv.size() < max; off += NodeInfo.LENGTH) {
NodeInfo nInf = new NodeInfo(ids, off);

View File

@ -14,7 +14,9 @@ import net.i2p.data.Hash;
*/
class Peer extends Hash {
private long lastSeen;
private volatile long lastSeen;
// todo we could pack this into the upper bit of lastSeen
private volatile boolean isSeed;
public Peer(byte[] data) {
super(data);
@ -27,4 +29,14 @@ class Peer extends Hash {
public void setLastSeen(long now) {
lastSeen = now;
}
/** @since 0.9.14 */
public boolean isSeed() {
return isSeed;
}
/** @since 0.9.14 */
public void setSeed(boolean isSeed) {
this.isSeed = isSeed;
}
}

View File

@ -1,3 +1,8 @@
2014-06-01 zzz
* i2psnark:
- Store seed/leech status in DHT tracker (ticket #1280)
- Increase max received DHT nodes (Vuze sends more)
2014-05-31 zzz
Prop from i2p.i2p.zzz.test2:
* Console: Fix shutdown error on old wrappers (ticket #1285)

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 2;
public final static long BUILD = 3;
/** for example "-test" */
public final static String EXTRA = "";