* i2psnark:

- Add idle detector, reduce tunnel count when idle (prep for torrent updates)
   - Cancel CoordinatorAcceptor cleaner when halted
   - Make PeerCoordinatorSet an Iterable
   - Reduce max protocol errors to 1
   - Disable unused PeerMonitorTask
This commit is contained in:
zzz
2013-06-01 16:57:50 +00:00
parent 7d08183334
commit ff20174572
9 changed files with 190 additions and 18 deletions

View File

@ -37,7 +37,9 @@ class BWLimits {
return rv; return rv;
} }
/****
public static void main(String args[]) { public static void main(String args[]) {
System.out.println(Arrays.toString(getBWLimits("127.0.0.1", 7654))); System.out.println(Arrays.toString(getBWLimits("127.0.0.1", 7654)));
} }
****/
} }

View File

@ -33,11 +33,10 @@ import net.i2p.data.Hash;
import net.i2p.util.I2PAppThread; import net.i2p.util.I2PAppThread;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.util.ObjectCounter; import net.i2p.util.ObjectCounter;
import net.i2p.util.SimpleScheduler; import net.i2p.util.SimpleTimer2;
import net.i2p.util.SimpleTimer;
/** /**
* Accepts connections on a TCP port and routes them to sub-acceptors. * Accepts connections on a I2PServerSocket and routes them to PeerAcceptors.
*/ */
class ConnectionAcceptor implements Runnable class ConnectionAcceptor implements Runnable
{ {
@ -47,14 +46,22 @@ class ConnectionAcceptor implements Runnable
private Thread thread; private Thread thread;
private final I2PSnarkUtil _util; private final I2PSnarkUtil _util;
private final ObjectCounter<Hash> _badCounter = new ObjectCounter(); private final ObjectCounter<Hash> _badCounter = new ObjectCounter();
private final SimpleTimer2.TimedEvent _cleaner;
private boolean stop; private volatile boolean stop;
private boolean socketChanged; private boolean socketChanged;
private static final int MAX_BAD = 2; // protocol errors before blacklisting.
private static final int MAX_BAD = 1;
private static final long BAD_CLEAN_INTERVAL = 30*60*1000; private static final long BAD_CLEAN_INTERVAL = 30*60*1000;
public ConnectionAcceptor(I2PSnarkUtil util) { _util = util; } /**
* Multitorrent
*/
public ConnectionAcceptor(I2PSnarkUtil util) {
_util = util;
_cleaner = new Cleaner();
}
public synchronized void startAccepting(PeerCoordinatorSet set, I2PServerSocket socket) { public synchronized void startAccepting(PeerCoordinatorSet set, I2PServerSocket socket) {
if (serverSocket != socket) { if (serverSocket != socket) {
@ -67,11 +74,14 @@ class ConnectionAcceptor implements Runnable
thread = new I2PAppThread(this, "I2PSnark acceptor"); thread = new I2PAppThread(this, "I2PSnark acceptor");
thread.setDaemon(true); thread.setDaemon(true);
thread.start(); thread.start();
_util.getContext().simpleScheduler().addPeriodicEvent(new Cleaner(), BAD_CLEAN_INTERVAL); _cleaner.schedule(BAD_CLEAN_INTERVAL);
} }
} }
} }
/**
* Unused (single torrent)
*/
public ConnectionAcceptor(I2PSnarkUtil util, I2PServerSocket serverSocket, public ConnectionAcceptor(I2PSnarkUtil util, I2PServerSocket serverSocket,
PeerAcceptor peeracceptor) PeerAcceptor peeracceptor)
{ {
@ -82,7 +92,7 @@ class ConnectionAcceptor implements Runnable
thread = new I2PAppThread(this, "I2PSnark acceptor"); thread = new I2PAppThread(this, "I2PSnark acceptor");
thread.setDaemon(true); thread.setDaemon(true);
thread.start(); thread.start();
_util.getContext().simpleScheduler().addPeriodicEvent(new Cleaner(), BAD_CLEAN_INTERVAL); _cleaner = new Cleaner();
} }
public void halt() public void halt()
@ -101,14 +111,20 @@ class ConnectionAcceptor implements Runnable
Thread t = thread; Thread t = thread;
if (t != null) if (t != null)
t.interrupt(); t.interrupt();
_cleaner.cancel();
} }
/**
* Effectively unused, would only be called if we changed
* I2CP host/port, which is hidden in the gui if in router context
*/
public void restart() { public void restart() {
serverSocket = _util.getServerSocket(); serverSocket = _util.getServerSocket();
socketChanged = true; socketChanged = true;
Thread t = thread; Thread t = thread;
if (t != null) if (t != null)
t.interrupt(); t.interrupt();
_cleaner.schedule(BAD_CLEAN_INTERVAL);
} }
public int getPort() public int getPort()
@ -150,9 +166,11 @@ class ConnectionAcceptor implements Runnable
try { socket.close(); } catch (IOException ioe) {} try { socket.close(); } catch (IOException ioe) {}
continue; continue;
} }
if (_badCounter.count(socket.getPeerDestination().calculateHash()) >= MAX_BAD) { int bad = _badCounter.count(socket.getPeerDestination().calculateHash());
if (count >= MAX_BAD) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Rejecting connection from " + socket.getPeerDestination().calculateHash() + " after " + MAX_BAD + " failures"); _log.warn("Rejecting connection from " + socket.getPeerDestination().calculateHash() +
" after " + count + " failures, max is " + MAX_BAD);
try { socket.close(); } catch (IOException ioe) {} try { socket.close(); } catch (IOException ioe) {}
continue; continue;
} }
@ -214,7 +232,17 @@ class ConnectionAcceptor implements Runnable
} }
/** @since 0.9.1 */ /** @since 0.9.1 */
private class Cleaner implements SimpleTimer.TimedEvent { private class Cleaner extends SimpleTimer2.TimedEvent {
public void timeReached() { _badCounter.clear(); }
public Cleaner() {
super(_util.getContext().simpleTimer2());
}
public void timeReached() {
if (stop)
return;
_badCounter.clear();
schedule(BAD_CLEAN_INTERVAL);
}
} }
} }

View File

@ -37,8 +37,20 @@ interface CoordinatorListener
*/ */
void gotMetaInfo(PeerCoordinator coordinator, MetaInfo metainfo); void gotMetaInfo(PeerCoordinator coordinator, MetaInfo metainfo);
/**
* Is this number of uploaders over the per-torrent limit?
*/
public boolean overUploadLimit(int uploaders); public boolean overUploadLimit(int uploaders);
/**
* Are we currently over the upstream bandwidth limit?
*/
public boolean overUpBWLimit(); public boolean overUpBWLimit();
/**
* Is the total (in Bps) over the upstream bandwidth limit?
*/
public boolean overUpBWLimit(long total); public boolean overUpBWLimit(long total);
public void addMessage(String message); public void addMessage(String message);
} }

View File

@ -158,7 +158,7 @@ public class I2PSnarkUtil {
} }
/** /**
* This updates the session options and tells the router * This updates ALL the session options (not just the bw) and tells the router
* @param limit KBps * @param limit KBps
*/ */
public void setMaxUpBW(int limit) { public void setMaxUpBW(int limit) {

View File

@ -0,0 +1,120 @@
/*
* Released into the public domain
* with no warranty of any kind, either expressed or implied.
*/
package org.klomp.snark;
import java.util.Map;
import java.util.Properties;
import net.i2p.client.I2PSession;
import net.i2p.client.streaming.I2PSocketManager;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer2;
/**
* Periodically check for idle condition based on connected peers,
* and reduce/restore tunnel count as necessary.
* We can't use the I2CP idle detector because it's based on traffic,
* so DHT and announces would keep it non-idle.
*
* @since 0.9.7
*/
class IdleChecker extends SimpleTimer2.TimedEvent {
private final I2PSnarkUtil _util;
private final PeerCoordinatorSet _pcs;
private final Log _log;
private int _consec;
private boolean _isIdle;
private static final long CHECK_TIME = 63*1000;
private static final int MAX_CONSEC_IDLE = 4;
/**
* Caller must schedule
*/
public IdleChecker(I2PSnarkUtil util, PeerCoordinatorSet pcs) {
super(util.getContext().simpleTimer2());
_log = util.getContext().logManager().getLog(IdleChecker.class);
_util = util;
_pcs = pcs;
}
public void timeReached() {
if (_util.connected()) {
boolean hasPeers = false;
for (PeerCoordinator pc : _pcs) {
if (pc.getPeers() > 0) {
hasPeers = true;
break;
}
}
if (hasPeers) {
if (_isIdle)
restoreTunnels();
} else {
if (!_isIdle) {
if (_consec++ >= MAX_CONSEC_IDLE)
reduceTunnels();
}
}
} else {
_isIdle = false;
_consec = 0;
}
schedule(CHECK_TIME);
}
/**
* Reduce to 1 in / 1 out tunnel
*/
private void reduceTunnels() {
_isIdle = true;
if (_log.shouldLog(Log.INFO))
_log.info("Reducing tunnels on idle");
setTunnels("1", "1", "0", "0");
}
/**
* Restore tunnel count
*/
private void restoreTunnels() {
_isIdle = false;
if (_log.shouldLog(Log.INFO))
_log.info("Restoring tunnels on activity");
Map<String, String> opts = _util.getI2CPOptions();
String i = opts.get("inbound.quantity");
if (i == null)
i = "3";
String o = opts.get("outbound.quantity");
if (o == null)
o = "3";
String ib = opts.get("inbound.backupQuantity");
if (ib == null)
ib = "0";
String ob= opts.get("outbound.backupQuantity");
if (ob == null)
ob = "0";
setTunnels(i, o, ib, ob);
}
/**
* Set in / out / in backup / out backup tunnel counts
*/
private void setTunnels(String i, String o, String ib, String ob) {
_consec = 0;
I2PSocketManager mgr = _util.getSocketManager();
if (mgr != null) {
I2PSession sess = mgr.getSession();
if (sess != null) {
Properties newProps = new Properties();
newProps.setProperty("inbound.quantity", i);
newProps.setProperty("outbound.quantity", o);
newProps.setProperty("inbound.backupQuantity", ib);
newProps.setProperty("outbound.backupQuantity", ob);
sess.updateOptions(newProps);
}
}
}
}

View File

@ -12,7 +12,7 @@ import net.i2p.crypto.SHA1Hash;
* Each PeerCoordinator is added to the set from within the Snark (and removed * Each PeerCoordinator is added to the set from within the Snark (and removed
* from it there too) * from it there too)
*/ */
class PeerCoordinatorSet { class PeerCoordinatorSet implements Iterable<PeerCoordinator> {
private final Map<SHA1Hash, PeerCoordinator> _coordinators; private final Map<SHA1Hash, PeerCoordinator> _coordinators;
public PeerCoordinatorSet() { public PeerCoordinatorSet() {

View File

@ -27,6 +27,8 @@ import net.i2p.data.DataHelper;
/** /**
* TimerTask that monitors the peers and total up/download speeds. * TimerTask that monitors the peers and total up/download speeds.
* Works together with the main Snark class to report periodical statistics. * Works together with the main Snark class to report periodical statistics.
*
* @deprecated unused, for command line client only, commented out in Snark.java
*/ */
class PeerMonitorTask implements Runnable class PeerMonitorTask implements Runnable
{ {
@ -45,6 +47,7 @@ class PeerMonitorTask implements Runnable
public void run() public void run()
{ {
/*****
// Get some statistics // Get some statistics
int peers = 0; int peers = 0;
int uploaders = 0; int uploaders = 0;
@ -117,5 +120,6 @@ class PeerMonitorTask implements Runnable
lastDownloaded = downloaded; lastDownloaded = downloaded;
lastUploaded = uploaded; lastUploaded = uploaded;
****/
} }
} }

View File

@ -1226,8 +1226,7 @@ public class Snark
if (_peerCoordinatorSet == null || uploaders <= 0) if (_peerCoordinatorSet == null || uploaders <= 0)
return false; return false;
int totalUploaders = 0; int totalUploaders = 0;
for (Iterator<PeerCoordinator> iter = _peerCoordinatorSet.iterator(); iter.hasNext(); ) { for (PeerCoordinator c : _peerCoordinatorSet) {
PeerCoordinator c = iter.next();
if (!c.halted()) if (!c.halted())
totalUploaders += c.uploaders; totalUploaders += c.uploaders;
} }
@ -1240,8 +1239,7 @@ public class Snark
if (_peerCoordinatorSet == null) if (_peerCoordinatorSet == null)
return false; return false;
long total = 0; long total = 0;
for (Iterator<PeerCoordinator> iter = _peerCoordinatorSet.iterator(); iter.hasNext(); ) { for (PeerCoordinator c : _peerCoordinatorSet) {
PeerCoordinator c = iter.next();
if (!c.halted()) if (!c.halted())
total += c.getCurrentUploadRate(); total += c.getCurrentUploadRate();
} }

View File

@ -37,6 +37,7 @@ import net.i2p.util.SecureDirectory;
import net.i2p.util.SecureFileOutputStream; import net.i2p.util.SecureFileOutputStream;
import net.i2p.util.SimpleScheduler; import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer; import net.i2p.util.SimpleTimer;
import net.i2p.util.SimpleTimer2;
import org.klomp.snark.dht.DHT; import org.klomp.snark.dht.DHT;
@ -70,6 +71,7 @@ public class SnarkManager implements CompleteListener {
private final Map<String, Tracker> _trackerMap; private final Map<String, Tracker> _trackerMap;
private UpdateManager _umgr; private UpdateManager _umgr;
private UpdateHandler _uhandler; private UpdateHandler _uhandler;
private SimpleTimer2.TimedEvent _idleChecker;
public static final String PROP_I2CP_HOST = "i2psnark.i2cpHost"; public static final String PROP_I2CP_HOST = "i2psnark.i2cpHost";
public static final String PROP_I2CP_PORT = "i2psnark.i2cpPort"; public static final String PROP_I2CP_PORT = "i2psnark.i2cpPort";
@ -178,6 +180,8 @@ public class SnarkManager implements CompleteListener {
_context.simpleScheduler().addEvent(new Register(), 4*60*1000); _context.simpleScheduler().addEvent(new Register(), 4*60*1000);
// Not required, Jetty has a shutdown hook // Not required, Jetty has a shutdown hook
//_context.addShutdownTask(new SnarkManagerShutdown()); //_context.addShutdownTask(new SnarkManagerShutdown());
_idleChecker = new IdleChecker(_util, _peerCoordinatorSet);
_idleChecker.schedule(5*60*1000);
} }
/** @since 0.9.4 */ /** @since 0.9.4 */
@ -210,6 +214,7 @@ public class SnarkManager implements CompleteListener {
_running = false; _running = false;
_monitor.interrupt(); _monitor.interrupt();
_connectionAcceptor.halt(); _connectionAcceptor.halt();
_idleChecker.cancel();
stopAllTorrents(true); stopAllTorrents(true);
} }
@ -635,6 +640,7 @@ public class SnarkManager implements CompleteListener {
addMessage(_("I2CP options changed to {0}", i2cpOpts)); addMessage(_("I2CP options changed to {0}", i2cpOpts));
_util.setI2CPConfig(oldI2CPHost, oldI2CPPort, opts); _util.setI2CPConfig(oldI2CPHost, oldI2CPPort, opts);
} else { } else {
// Won't happen, I2CP host/port, are hidden in the GUI if in router context
if (_util.connected()) { if (_util.connected()) {
_util.disconnect(); _util.disconnect();
addMessage(_("Disconnecting old I2CP destination")); addMessage(_("Disconnecting old I2CP destination"));
@ -658,6 +664,8 @@ public class SnarkManager implements CompleteListener {
for (Snark snark : _snarks.values()) { for (Snark snark : _snarks.values()) {
if (snark.restartAcceptor()) { if (snark.restartAcceptor()) {
addMessage(_("I2CP listener restarted for \"{0}\"", snark.getBaseName())); addMessage(_("I2CP listener restarted for \"{0}\"", snark.getBaseName()));
// this is the common ConnectionAcceptor, so we only need to do it once
break;
} }
} }
} }