i2psnark: Periodically connect out to other seeds to fetch comments (ticket #2288)

javadoc updates
This commit is contained in:
zzz
2020-04-18 15:04:43 +00:00
parent 823dc72eaa
commit bf425d8ac9

View File

@ -30,7 +30,6 @@ import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -43,6 +42,7 @@ import net.i2p.data.Destination;
import net.i2p.util.ConcurrentHashSet; import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.I2PAppThread; import net.i2p.util.I2PAppThread;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.util.RandomSource;
import net.i2p.util.SimpleTimer2; import net.i2p.util.SimpleTimer2;
import org.klomp.snark.bencode.BEValue; import org.klomp.snark.bencode.BEValue;
@ -145,7 +145,13 @@ class PeerCoordinator implements PeerListener
private final MagnetState magnetState; private final MagnetState magnetState;
private final CoordinatorListener listener; private final CoordinatorListener listener;
private final I2PSnarkUtil _util; private final I2PSnarkUtil _util;
private final Random _random; private final RandomSource _random;
private final AtomicLong _commentsLastRequested = new AtomicLong();
private final AtomicInteger _commentsNotRequested = new AtomicInteger();
private static final long COMMENT_REQ_INTERVAL = 12*60*60*1000L;
private static final long COMMENT_REQ_DELAY = 60*60*1000L;
private static final int MAX_COMMENT_NOT_REQ = 10;
/** /**
* @param metainfo null if in magnet mode * @param metainfo null if in magnet mode
@ -176,6 +182,9 @@ class PeerCoordinator implements PeerListener
// this will help the behavior with global limits // this will help the behavior with global limits
timer = new CheckEvent(_util.getContext(), new PeerCheckerTask(_util, this)); timer = new CheckEvent(_util.getContext(), new PeerCheckerTask(_util, this));
timer.schedule((CHECK_PERIOD / 2) + _random.nextInt((int) CHECK_PERIOD)); timer.schedule((CHECK_PERIOD / 2) + _random.nextInt((int) CHECK_PERIOD));
// we don't store the last-requested time, so just delay a random amount
_commentsLastRequested.set(util.getContext().clock().now() - (COMMENT_REQ_INTERVAL - _random.nextLong(COMMENT_REQ_DELAY)));
} }
/** /**
@ -329,7 +338,8 @@ class PeerCoordinator implements PeerListener
} }
/** /**
* Returns the 4-minute-average rate in Bps * Returns the average rate in Bps
* over last RATE_DEPTH * CHECK_PERIOD seconds
*/ */
public long getDownloadRate() public long getDownloadRate()
{ {
@ -338,6 +348,10 @@ class PeerCoordinator implements PeerListener
return getRate(downloaded_old); return getRate(downloaded_old);
} }
/**
* Returns the average rate in Bps
* over last RATE_DEPTH * CHECK_PERIOD seconds
*/
public long getUploadRate() public long getUploadRate()
{ {
if (halted) if (halted)
@ -345,6 +359,10 @@ class PeerCoordinator implements PeerListener
return getRate(uploaded_old); return getRate(uploaded_old);
} }
/**
* Returns the rate in Bps
* over last complete CHECK_PERIOD seconds
*/
public long getCurrentUploadRate() public long getCurrentUploadRate()
{ {
if (halted) if (halted)
@ -404,7 +422,12 @@ class PeerCoordinator implements PeerListener
public boolean needOutboundPeers() { public boolean needOutboundPeers() {
//return wantedBytes != 0 && needPeers(); //return wantedBytes != 0 && needPeers();
// minus two to make it a little easier for new peers to get in on large swarms // minus two to make it a little easier for new peers to get in on large swarms
return wantedBytes != 0 && return (wantedBytes != 0 ||
(_util.utCommentsEnabled() &&
// we should also check SnarkManager.getSavedCommentsEnabled() for this torrent,
// but that reads in the config file, there's no caching.
// TODO
_commentsLastRequested.get() < _util.getContext().clock().now() - COMMENT_REQ_INTERVAL)) &&
!halted && !halted &&
peers.size() < getMaxConnections() - 2 && peers.size() < getMaxConnections() - 2 &&
(storage == null || !storage.isChecking()); (storage == null || !storage.isChecking());
@ -617,6 +640,8 @@ class PeerCoordinator implements PeerListener
bitfield = storage.getBitField(); bitfield = storage.getBitField();
else else
bitfield = null; bitfield = null;
if (!peer.isIncoming() && wantedBytes == 0 && _log.shouldInfo())
_log.info("Outbound connection as seed to get comments for " + snark.getBaseName() + " to " + peer);
// if we aren't a seed but we don't want any more // if we aren't a seed but we don't want any more
final boolean partialComplete = wantedBytes == 0 && bitfield != null && !bitfield.complete(); final boolean partialComplete = wantedBytes == 0 && bitfield != null && !bitfield.complete();
Runnable r = new Runnable() Runnable r = new Runnable()
@ -1447,11 +1472,17 @@ class PeerCoordinator implements PeerListener
*/ */
void sendCommentReq(Peer peer) { void sendCommentReq(Peer peer) {
Map<String, BEValue> handshake = peer.getHandshakeMap(); Map<String, BEValue> handshake = peer.getHandshakeMap();
if (handshake == null) if (handshake == null) {
if (wantedBytes == 0 && _commentsNotRequested.incrementAndGet() >= MAX_COMMENT_NOT_REQ)
_commentsLastRequested.set(_util.getContext().clock().now());
return; return;
}
BEValue bev = handshake.get("m"); BEValue bev = handshake.get("m");
if (bev == null) if (bev == null) {
if (wantedBytes == 0 && _commentsNotRequested.incrementAndGet() >= MAX_COMMENT_NOT_REQ)
_commentsLastRequested.set(_util.getContext().clock().now());
return; return;
}
// TODO if peer hasn't been connected very long, don't bother // TODO if peer hasn't been connected very long, don't bother
// unless forced at handshake time (see above) // unless forced at handshake time (see above)
try { try {
@ -1463,9 +1494,16 @@ class PeerCoordinator implements PeerListener
sz = comments.size(); sz = comments.size();
} }
} }
_commentsNotRequested.set(0);
_commentsLastRequested.set(_util.getContext().clock().now());
if (sz >= CommentSet.MAX_SIZE) if (sz >= CommentSet.MAX_SIZE)
return; return;
ExtensionHandler.sendCommentReq(peer, CommentSet.MAX_SIZE - sz); ExtensionHandler.sendCommentReq(peer, CommentSet.MAX_SIZE - sz);
} else {
// failsafe to prevent seed excessively connecting out to a swarm for comments
// when nobody in the swarm supports comments
if (wantedBytes == 0 && _commentsNotRequested.incrementAndGet() >= MAX_COMMENT_NOT_REQ)
_commentsLastRequested.set(_util.getContext().clock().now());
} }
} catch (InvalidBEncodingException ibee) {} } catch (InvalidBEncodingException ibee) {}
} }
@ -1671,6 +1709,9 @@ class PeerCoordinator implements PeerListener
interestedAndChoking.addAndGet(toAdd); interestedAndChoking.addAndGet(toAdd);
} }
/**
* Is snark as a whole over its limit?
*/
public boolean overUpBWLimit() public boolean overUpBWLimit()
{ {
if (listener != null) if (listener != null)
@ -1678,6 +1719,10 @@ class PeerCoordinator implements PeerListener
return false; return false;
} }
/**
* Is a particular peer who has downloaded this many bytes from us
* in the last CHECK_PERIOD over its limit?
*/
public boolean overUpBWLimit(long total) public boolean overUpBWLimit(long total)
{ {
if (listener != null) if (listener != null)