* Streaming:

- Implement changing connection limits on a running session
   - Implement global blacklist
This commit is contained in:
zzz
2012-09-26 20:02:36 +00:00
parent 5d3984e353
commit 20e463e41b
6 changed files with 133 additions and 55 deletions

View File

@ -15,8 +15,8 @@ import net.i2p.util.SimpleTimer;
*/
class ConnThrottler {
private final ObjectCounter<Hash> counter;
private final int _max;
private final int _totalMax;
private volatile int _max;
private volatile int _totalMax;
private final AtomicInteger _currentTotal;
/*
@ -27,17 +27,21 @@ class ConnThrottler {
ConnThrottler(int max, int totalMax, long period) {
_max = max;
_totalMax = totalMax;
if (max > 0)
this.counter = new ObjectCounter<Hash>();
else
this.counter = null;
if (totalMax > 0)
_currentTotal = new AtomicInteger();
else
_currentTotal = null;
this.counter = new ObjectCounter<Hash>();
_currentTotal = new AtomicInteger();
SimpleScheduler.getInstance().addPeriodicEvent(new Cleaner(), period);
}
/*
* @param max per-peer, 0 for unlimited
* @param totalMax for all peers, 0 for unlimited
* @since 0.9.3
*/
public void updateLimits(int max, int totalMax) {
_max = max;
_totalMax = totalMax;
}
/**
* Checks both individual and total. Increments before checking.
*/

View File

@ -4,6 +4,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.ConcurrentHashMap;
import net.i2p.I2PAppContext;
@ -12,6 +13,8 @@ import net.i2p.client.I2PSession;
import net.i2p.data.Destination;
import net.i2p.data.Hash;
import net.i2p.data.SessionKey;
import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.ConvertToHash;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer2;
@ -35,21 +38,28 @@ class ConnectionManager {
private final ConcurrentHashMap<Long, Connection> _connectionByInboundId;
/** Ping ID (Long) to PingRequest */
private final Map<Long, PingRequest> _pendingPings;
private boolean _throttlersInitialized;
private int _maxConcurrentStreams;
private volatile boolean _throttlersInitialized;
private final ConnectionOptions _defaultOptions;
private volatile int _numWaiting;
private long _soTimeout;
private ConnThrottler _minuteThrottler;
private ConnThrottler _hourThrottler;
private ConnThrottler _dayThrottler;
private volatile ConnThrottler _minuteThrottler;
private volatile ConnThrottler _hourThrottler;
private volatile ConnThrottler _dayThrottler;
/** since 0.9, each manager instantiates its own timer */
private final SimpleTimer2 _timer;
/** cache of the property to detect changes */
private static volatile String _currentBlacklist = "";
private static final Set<Hash> _globalBlacklist = new ConcurrentHashSet();
public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) {
/** @since 0.9.3 */
public static final String PROP_BLACKLIST = "i2p.streaming.blacklist";
/**
* Manage all conns for this session
*/
public ConnectionManager(I2PAppContext context, I2PSession session, ConnectionOptions defaultOptions) {
_context = context;
_session = session;
_maxConcurrentStreams = maxConcurrent;
_defaultOptions = defaultOptions;
_log = _context.logManager().getLog(ConnectionManager.class);
_connectionByInboundId = new ConcurrentHashMap(32);
@ -128,23 +138,44 @@ class ConnectionManager {
public void setAllowIncomingConnections(boolean allow) {
_connectionHandler.setActive(allow);
if (allow && !_throttlersInitialized) {
_throttlersInitialized = true;
if (_defaultOptions.getMaxConnsPerMinute() > 0 || _defaultOptions.getMaxTotalConnsPerMinute() > 0) {
_context.statManager().createRateStat("stream.con.throttledMinute", "Dropped for conn limit", "Stream", new long[] { 5*60*1000 });
_minuteThrottler = new ConnThrottler(_defaultOptions.getMaxConnsPerMinute(), _defaultOptions.getMaxTotalConnsPerMinute(), 60*1000);
}
if (_defaultOptions.getMaxConnsPerHour() > 0 || _defaultOptions.getMaxTotalConnsPerHour() > 0) {
_context.statManager().createRateStat("stream.con.throttledHour", "Dropped for conn limit", "Stream", new long[] { 5*60*1000 });
_hourThrottler = new ConnThrottler(_defaultOptions.getMaxConnsPerHour(), _defaultOptions.getMaxTotalConnsPerHour(), 60*60*1000);
}
if (_defaultOptions.getMaxConnsPerDay() > 0 || _defaultOptions.getMaxTotalConnsPerDay() > 0) {
_context.statManager().createRateStat("stream.con.throttledDay", "Dropped for conn limit", "Stream", new long[] { 5*60*1000 });
_dayThrottler = new ConnThrottler(_defaultOptions.getMaxConnsPerDay(), _defaultOptions.getMaxTotalConnsPerDay(), 24*60*60*1000);
if (allow) {
synchronized(this) {
if (!_throttlersInitialized) {
updateOptions();
_throttlersInitialized = true;
}
}
}
}
/*
* Update the throttler options
* @since 0.9.3
*/
public synchronized void updateOptions() {
if ((_defaultOptions.getMaxConnsPerMinute() > 0 || _defaultOptions.getMaxTotalConnsPerMinute() > 0) &&
_minuteThrottler == null) {
_context.statManager().createRateStat("stream.con.throttledMinute", "Dropped for conn limit", "Stream", new long[] { 5*60*1000 });
_minuteThrottler = new ConnThrottler(_defaultOptions.getMaxConnsPerMinute(), _defaultOptions.getMaxTotalConnsPerMinute(), 60*1000);
} else if (_minuteThrottler != null) {
_minuteThrottler.updateLimits(_defaultOptions.getMaxConnsPerMinute(), _defaultOptions.getMaxTotalConnsPerMinute());
}
if ((_defaultOptions.getMaxConnsPerHour() > 0 || _defaultOptions.getMaxTotalConnsPerHour() > 0) &&
_hourThrottler == null) {
_context.statManager().createRateStat("stream.con.throttledHour", "Dropped for conn limit", "Stream", new long[] { 5*60*1000 });
_hourThrottler = new ConnThrottler(_defaultOptions.getMaxConnsPerHour(), _defaultOptions.getMaxTotalConnsPerHour(), 60*60*1000);
} else if (_hourThrottler != null) {
_hourThrottler.updateLimits(_defaultOptions.getMaxConnsPerHour(), _defaultOptions.getMaxTotalConnsPerHour());
}
if ((_defaultOptions.getMaxConnsPerDay() > 0 || _defaultOptions.getMaxTotalConnsPerDay() > 0) &&
_dayThrottler == null) {
_context.statManager().createRateStat("stream.con.throttledDay", "Dropped for conn limit", "Stream", new long[] { 5*60*1000 });
_dayThrottler = new ConnThrottler(_defaultOptions.getMaxConnsPerDay(), _defaultOptions.getMaxTotalConnsPerDay(), 24*60*60*1000);
} else if (_dayThrottler != null) {
_dayThrottler.updateLimits(_defaultOptions.getMaxConnsPerDay(), _defaultOptions.getMaxTotalConnsPerDay());
}
}
/** @return if we should accept connections */
public boolean getAllowIncomingConnections() {
return _connectionHandler.getActive();
@ -177,7 +208,7 @@ class ConnectionManager {
//}
if (locked_tooManyStreams()) {
_log.logAlways(Log.WARN, "Refusing connection since we have exceeded our max of "
+ _maxConcurrentStreams + " connections");
+ _defaultOptions.getMaxConns() + " connections");
reject = true;
} else {
// this may not be right if more than one is enabled
@ -267,16 +298,17 @@ class ConnectionManager {
long remaining = expiration - _context.clock().now();
if (remaining <= 0) {
_log.logAlways(Log.WARN, "Refusing to connect since we have exceeded our max of "
+ _maxConcurrentStreams + " connections");
+ _defaultOptions.getMaxConns() + " connections");
_numWaiting--;
return null;
}
if (locked_tooManyStreams()) {
int max = _defaultOptions.getMaxConns();
// allow a full buffer of pending/waiting streams
if (_numWaiting > _maxConcurrentStreams) {
if (_numWaiting > max) {
_log.logAlways(Log.WARN, "Refusing connection since we have exceeded our max of "
+ _maxConcurrentStreams + " and there are " + _numWaiting
+ max + " and there are " + _numWaiting
+ " waiting already");
_numWaiting--;
return null;
@ -320,8 +352,9 @@ class ConnectionManager {
* @return too many
*/
private boolean locked_tooManyStreams() {
if (_maxConcurrentStreams <= 0) return false;
if (_connectionByInboundId.size() < _maxConcurrentStreams) return false;
int max = _defaultOptions.getMaxConns();
if (max <= 0) return false;
if (_connectionByInboundId.size() < max) return false;
int active = 0;
for (Connection con : _connectionByInboundId.values()) {
if (con.getIsConnected())
@ -332,7 +365,7 @@ class ConnectionManager {
_log.info("More than 100 connections! " + active
+ " total: " + _connectionByInboundId.size());
return (active >= _maxConcurrentStreams);
return (active >= max);
}
/**
@ -398,6 +431,32 @@ class ConnectionManager {
if (_defaultOptions.isBlacklistEnabled() &&
_defaultOptions.getBlacklist().contains(h))
return "blacklisted";
String hashes = _context.getProperty(PROP_BLACKLIST, "");
if (!_currentBlacklist.equals(hashes)) {
// rebuild _globalBlacklist when property changes
synchronized(_globalBlacklist) {
if (hashes != null) {
Set<Hash> newSet = new HashSet();
StringTokenizer tok = new StringTokenizer(hashes, ",; ");
while (tok.hasMoreTokens()) {
String hashstr = tok.nextToken();
Hash hh = ConvertToHash.getHash(hashstr);
if (hh != null)
newSet.add(hh);
else
_log.error("Bad blacklist entry: " + hashstr);
}
_globalBlacklist.addAll(newSet);
_globalBlacklist.retainAll(newSet);
_currentBlacklist = hashes;
} else {
_globalBlacklist.clear();
_currentBlacklist = "";
}
}
}
if (hashes.length() > 0 && _globalBlacklist.contains(h))
return "blacklisted globally";
return null;
}

View File

@ -14,6 +14,8 @@ import net.i2p.util.Log;
/**
* Define the current options for the con (and allow custom tweaking midstream)
*
* TODO many of these are not per-connection options, and should be migrated
* somewhere so they aren't copied for every connection
*/
class ConnectionOptions extends I2PSocketOptionsImpl {
private int _connectDelay;
@ -47,6 +49,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
private int _maxTotalConnsPerMinute;
private int _maxTotalConnsPerHour;
private int _maxTotalConnsPerDay;
private int _maxConns;
// NOTE - almost all the options are below, but see
// I2PSocketOptions in ministreaming for a few more
@ -90,6 +93,12 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
public static final String PROP_MAX_TOTAL_CONNS_DAY = "i2p.streaming.maxTotalConnsPerDay";
/** @since 0.9.1 */
public static final String PROP_ENFORCE_PROTO = "i2p.streaming.enforceProtocol";
/**
* how many streams will we allow at once?
* @since 0.9.3 moved from I2PSocketManagerFull
*/
public static final String PROP_MAX_STREAMS = "i2p.streaming.maxConcurrentStreams";
private static final int TREND_COUNT = 3;
static final int INITIAL_WINDOW_SIZE = 6;
@ -308,6 +317,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
_maxTotalConnsPerMinute = opts.getMaxTotalConnsPerMinute();
_maxTotalConnsPerHour = opts.getMaxTotalConnsPerHour();
_maxTotalConnsPerDay = opts.getMaxTotalConnsPerDay();
_maxConns = opts.getMaxConns();
}
/**
@ -344,6 +354,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
_maxTotalConnsPerMinute = getInt(opts, PROP_MAX_TOTAL_CONNS_MIN, 0);
_maxTotalConnsPerHour = getInt(opts, PROP_MAX_TOTAL_CONNS_HOUR, 0);
_maxTotalConnsPerDay = getInt(opts, PROP_MAX_TOTAL_CONNS_DAY, 0);
_maxConns = getInt(opts, PROP_MAX_STREAMS, 0);
}
/**
@ -408,6 +419,8 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
_maxTotalConnsPerHour = getInt(opts, PROP_MAX_TOTAL_CONNS_HOUR, 0);
if (opts.containsKey(PROP_MAX_TOTAL_CONNS_DAY))
_maxTotalConnsPerDay = getInt(opts, PROP_MAX_TOTAL_CONNS_DAY, 0);
if (opts.containsKey(PROP_MAX_STREAMS))
_maxConns = getInt(opts, PROP_MAX_STREAMS, 0);
}
/**
@ -660,6 +673,8 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
public int getMaxTotalConnsPerMinute() { return _maxTotalConnsPerMinute; }
public int getMaxTotalConnsPerHour() { return _maxTotalConnsPerHour; }
public int getMaxTotalConnsPerDay() { return _maxTotalConnsPerDay; }
/** @since 0.9.3; no public setter */
public int getMaxConns() { return _maxConns; }
public boolean isAccessListEnabled() { return _accessListEnabled; }
public boolean isBlacklistEnabled() { return _blackListEnabled; }
@ -691,7 +706,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
String hashes = opts.getProperty(PROP_ACCESS_LIST);
if (hashes == null)
return;
StringTokenizer tok = new StringTokenizer(hashes, ", ");
StringTokenizer tok = new StringTokenizer(hashes, ",; ");
while (tok.hasMoreTokens()) {
String hashstr = tok.nextToken();
Hash h = ConvertToHash.getHash(hashstr);

View File

@ -36,7 +36,6 @@ public class I2PSocketManagerFull implements I2PSocketManager {
private final ConnectionOptions _defaultOptions;
private long _acceptTimeout;
private String _name;
private int _maxStreams;
private static int __managerId = 0;
private final ConnectionManager _connectionManager;
@ -54,9 +53,6 @@ public class I2PSocketManagerFull implements I2PSocketManager {
throw new UnsupportedOperationException();
}
/** how many streams will we allow at once? */
public static final String PROP_MAX_STREAMS = "i2p.streaming.maxConcurrentStreams";
/**
* @deprecated use 4-arg constructor
* @throws UnsupportedOperationException always
@ -79,19 +75,10 @@ public class I2PSocketManagerFull implements I2PSocketManager {
_session = session;
_log = _context.logManager().getLog(I2PSocketManagerFull.class);
_maxStreams = -1;
try {
String num = (opts != null ? opts.getProperty(PROP_MAX_STREAMS, "-1") : "-1");
_maxStreams = Integer.parseInt(num);
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid max # of concurrent streams, defaulting to unlimited", nfe);
_maxStreams = -1;
}
_name = name + " " + (++__managerId);
_acceptTimeout = ACCEPT_TIMEOUT_DEFAULT;
_defaultOptions = new ConnectionOptions(opts);
_connectionManager = new ConnectionManager(_context, _session, _maxStreams, _defaultOptions);
_connectionManager = new ConnectionManager(_context, _session, _defaultOptions);
_serverSocket = new I2PServerSocketFull(this);
if (_log.shouldLog(Log.INFO)) {
@ -182,6 +169,7 @@ public class I2PSocketManagerFull implements I2PSocketManager {
if (_log.shouldLog(Log.WARN))
_log.warn("Changing options from:\n " + _defaultOptions + "\nto:\n " + options);
_defaultOptions.updateAll((ConnectionOptions) options);
_connectionManager.updateOptions();
}
/**
@ -244,7 +232,7 @@ public class I2PSocketManagerFull implements I2PSocketManager {
// the following blocks unless connect delay > 0
Connection con = _connectionManager.connect(peer, opts);
if (con == null)
throw new TooManyStreamsException("Too many streams (max " + _maxStreams + ")");
throw new TooManyStreamsException("Too many streams, max " + _defaultOptions.getMaxConns());
I2PSocketFull socket = new I2PSocketFull(con);
con.setSocket(socket);
if (con.getConnectionError() != null) {