diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnThrottler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnThrottler.java index 69f0bb8571..2b24d09095 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnThrottler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnThrottler.java @@ -15,8 +15,8 @@ import net.i2p.util.SimpleTimer; */ class ConnThrottler { private final ObjectCounter counter; - private final int _max; - private final int _totalMax; + private volatile int _max; + private volatile int _totalMax; private final AtomicInteger _currentTotal; /* @@ -27,17 +27,21 @@ class ConnThrottler { ConnThrottler(int max, int totalMax, long period) { _max = max; _totalMax = totalMax; - if (max > 0) - this.counter = new ObjectCounter(); - else - this.counter = null; - if (totalMax > 0) - _currentTotal = new AtomicInteger(); - else - _currentTotal = null; + this.counter = new ObjectCounter(); + _currentTotal = new AtomicInteger(); SimpleScheduler.getInstance().addPeriodicEvent(new Cleaner(), period); } + /* + * @param max per-peer, 0 for unlimited + * @param totalMax for all peers, 0 for unlimited + * @since 0.9.3 + */ + public void updateLimits(int max, int totalMax) { + _max = max; + _totalMax = totalMax; + } + /** * Checks both individual and total. Increments before checking. */ diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java index 3547b55440..5047a04541 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java @@ -4,6 +4,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; +import java.util.StringTokenizer; import java.util.concurrent.ConcurrentHashMap; import net.i2p.I2PAppContext; @@ -12,6 +13,8 @@ import net.i2p.client.I2PSession; import net.i2p.data.Destination; import net.i2p.data.Hash; import net.i2p.data.SessionKey; +import net.i2p.util.ConcurrentHashSet; +import net.i2p.util.ConvertToHash; import net.i2p.util.Log; import net.i2p.util.SimpleTimer2; @@ -35,21 +38,28 @@ class ConnectionManager { private final ConcurrentHashMap _connectionByInboundId; /** Ping ID (Long) to PingRequest */ private final Map _pendingPings; - private boolean _throttlersInitialized; - private int _maxConcurrentStreams; + private volatile boolean _throttlersInitialized; private final ConnectionOptions _defaultOptions; private volatile int _numWaiting; private long _soTimeout; - private ConnThrottler _minuteThrottler; - private ConnThrottler _hourThrottler; - private ConnThrottler _dayThrottler; + private volatile ConnThrottler _minuteThrottler; + private volatile ConnThrottler _hourThrottler; + private volatile ConnThrottler _dayThrottler; /** since 0.9, each manager instantiates its own timer */ private final SimpleTimer2 _timer; + /** cache of the property to detect changes */ + private static volatile String _currentBlacklist = ""; + private static final Set _globalBlacklist = new ConcurrentHashSet(); - public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) { + /** @since 0.9.3 */ + public static final String PROP_BLACKLIST = "i2p.streaming.blacklist"; + + /** + * Manage all conns for this session + */ + public ConnectionManager(I2PAppContext context, I2PSession session, ConnectionOptions defaultOptions) { _context = context; _session = session; - _maxConcurrentStreams = maxConcurrent; _defaultOptions = defaultOptions; _log = _context.logManager().getLog(ConnectionManager.class); _connectionByInboundId = new ConcurrentHashMap(32); @@ -128,23 +138,44 @@ class ConnectionManager { public void setAllowIncomingConnections(boolean allow) { _connectionHandler.setActive(allow); - if (allow && !_throttlersInitialized) { - _throttlersInitialized = true; - if (_defaultOptions.getMaxConnsPerMinute() > 0 || _defaultOptions.getMaxTotalConnsPerMinute() > 0) { - _context.statManager().createRateStat("stream.con.throttledMinute", "Dropped for conn limit", "Stream", new long[] { 5*60*1000 }); - _minuteThrottler = new ConnThrottler(_defaultOptions.getMaxConnsPerMinute(), _defaultOptions.getMaxTotalConnsPerMinute(), 60*1000); - } - if (_defaultOptions.getMaxConnsPerHour() > 0 || _defaultOptions.getMaxTotalConnsPerHour() > 0) { - _context.statManager().createRateStat("stream.con.throttledHour", "Dropped for conn limit", "Stream", new long[] { 5*60*1000 }); - _hourThrottler = new ConnThrottler(_defaultOptions.getMaxConnsPerHour(), _defaultOptions.getMaxTotalConnsPerHour(), 60*60*1000); - } - if (_defaultOptions.getMaxConnsPerDay() > 0 || _defaultOptions.getMaxTotalConnsPerDay() > 0) { - _context.statManager().createRateStat("stream.con.throttledDay", "Dropped for conn limit", "Stream", new long[] { 5*60*1000 }); - _dayThrottler = new ConnThrottler(_defaultOptions.getMaxConnsPerDay(), _defaultOptions.getMaxTotalConnsPerDay(), 24*60*60*1000); + if (allow) { + synchronized(this) { + if (!_throttlersInitialized) { + updateOptions(); + _throttlersInitialized = true; + } } } } + /* + * Update the throttler options + * @since 0.9.3 + */ + public synchronized void updateOptions() { + if ((_defaultOptions.getMaxConnsPerMinute() > 0 || _defaultOptions.getMaxTotalConnsPerMinute() > 0) && + _minuteThrottler == null) { + _context.statManager().createRateStat("stream.con.throttledMinute", "Dropped for conn limit", "Stream", new long[] { 5*60*1000 }); + _minuteThrottler = new ConnThrottler(_defaultOptions.getMaxConnsPerMinute(), _defaultOptions.getMaxTotalConnsPerMinute(), 60*1000); + } else if (_minuteThrottler != null) { + _minuteThrottler.updateLimits(_defaultOptions.getMaxConnsPerMinute(), _defaultOptions.getMaxTotalConnsPerMinute()); + } + if ((_defaultOptions.getMaxConnsPerHour() > 0 || _defaultOptions.getMaxTotalConnsPerHour() > 0) && + _hourThrottler == null) { + _context.statManager().createRateStat("stream.con.throttledHour", "Dropped for conn limit", "Stream", new long[] { 5*60*1000 }); + _hourThrottler = new ConnThrottler(_defaultOptions.getMaxConnsPerHour(), _defaultOptions.getMaxTotalConnsPerHour(), 60*60*1000); + } else if (_hourThrottler != null) { + _hourThrottler.updateLimits(_defaultOptions.getMaxConnsPerHour(), _defaultOptions.getMaxTotalConnsPerHour()); + } + if ((_defaultOptions.getMaxConnsPerDay() > 0 || _defaultOptions.getMaxTotalConnsPerDay() > 0) && + _dayThrottler == null) { + _context.statManager().createRateStat("stream.con.throttledDay", "Dropped for conn limit", "Stream", new long[] { 5*60*1000 }); + _dayThrottler = new ConnThrottler(_defaultOptions.getMaxConnsPerDay(), _defaultOptions.getMaxTotalConnsPerDay(), 24*60*60*1000); + } else if (_dayThrottler != null) { + _dayThrottler.updateLimits(_defaultOptions.getMaxConnsPerDay(), _defaultOptions.getMaxTotalConnsPerDay()); + } + } + /** @return if we should accept connections */ public boolean getAllowIncomingConnections() { return _connectionHandler.getActive(); @@ -177,7 +208,7 @@ class ConnectionManager { //} if (locked_tooManyStreams()) { _log.logAlways(Log.WARN, "Refusing connection since we have exceeded our max of " - + _maxConcurrentStreams + " connections"); + + _defaultOptions.getMaxConns() + " connections"); reject = true; } else { // this may not be right if more than one is enabled @@ -267,16 +298,17 @@ class ConnectionManager { long remaining = expiration - _context.clock().now(); if (remaining <= 0) { _log.logAlways(Log.WARN, "Refusing to connect since we have exceeded our max of " - + _maxConcurrentStreams + " connections"); + + _defaultOptions.getMaxConns() + " connections"); _numWaiting--; return null; } if (locked_tooManyStreams()) { + int max = _defaultOptions.getMaxConns(); // allow a full buffer of pending/waiting streams - if (_numWaiting > _maxConcurrentStreams) { + if (_numWaiting > max) { _log.logAlways(Log.WARN, "Refusing connection since we have exceeded our max of " - + _maxConcurrentStreams + " and there are " + _numWaiting + + max + " and there are " + _numWaiting + " waiting already"); _numWaiting--; return null; @@ -320,8 +352,9 @@ class ConnectionManager { * @return too many */ private boolean locked_tooManyStreams() { - if (_maxConcurrentStreams <= 0) return false; - if (_connectionByInboundId.size() < _maxConcurrentStreams) return false; + int max = _defaultOptions.getMaxConns(); + if (max <= 0) return false; + if (_connectionByInboundId.size() < max) return false; int active = 0; for (Connection con : _connectionByInboundId.values()) { if (con.getIsConnected()) @@ -332,7 +365,7 @@ class ConnectionManager { _log.info("More than 100 connections! " + active + " total: " + _connectionByInboundId.size()); - return (active >= _maxConcurrentStreams); + return (active >= max); } /** @@ -398,6 +431,32 @@ class ConnectionManager { if (_defaultOptions.isBlacklistEnabled() && _defaultOptions.getBlacklist().contains(h)) return "blacklisted"; + String hashes = _context.getProperty(PROP_BLACKLIST, ""); + if (!_currentBlacklist.equals(hashes)) { + // rebuild _globalBlacklist when property changes + synchronized(_globalBlacklist) { + if (hashes != null) { + Set newSet = new HashSet(); + StringTokenizer tok = new StringTokenizer(hashes, ",; "); + while (tok.hasMoreTokens()) { + String hashstr = tok.nextToken(); + Hash hh = ConvertToHash.getHash(hashstr); + if (hh != null) + newSet.add(hh); + else + _log.error("Bad blacklist entry: " + hashstr); + } + _globalBlacklist.addAll(newSet); + _globalBlacklist.retainAll(newSet); + _currentBlacklist = hashes; + } else { + _globalBlacklist.clear(); + _currentBlacklist = ""; + } + } + } + if (hashes.length() > 0 && _globalBlacklist.contains(h)) + return "blacklisted globally"; return null; } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java index 95eef4687a..ee01d462c8 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -14,6 +14,8 @@ import net.i2p.util.Log; /** * Define the current options for the con (and allow custom tweaking midstream) * + * TODO many of these are not per-connection options, and should be migrated + * somewhere so they aren't copied for every connection */ class ConnectionOptions extends I2PSocketOptionsImpl { private int _connectDelay; @@ -47,6 +49,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { private int _maxTotalConnsPerMinute; private int _maxTotalConnsPerHour; private int _maxTotalConnsPerDay; + private int _maxConns; // NOTE - almost all the options are below, but see // I2PSocketOptions in ministreaming for a few more @@ -90,6 +93,12 @@ class ConnectionOptions extends I2PSocketOptionsImpl { public static final String PROP_MAX_TOTAL_CONNS_DAY = "i2p.streaming.maxTotalConnsPerDay"; /** @since 0.9.1 */ public static final String PROP_ENFORCE_PROTO = "i2p.streaming.enforceProtocol"; + /** + * how many streams will we allow at once? + * @since 0.9.3 moved from I2PSocketManagerFull + */ + public static final String PROP_MAX_STREAMS = "i2p.streaming.maxConcurrentStreams"; + private static final int TREND_COUNT = 3; static final int INITIAL_WINDOW_SIZE = 6; @@ -308,6 +317,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { _maxTotalConnsPerMinute = opts.getMaxTotalConnsPerMinute(); _maxTotalConnsPerHour = opts.getMaxTotalConnsPerHour(); _maxTotalConnsPerDay = opts.getMaxTotalConnsPerDay(); + _maxConns = opts.getMaxConns(); } /** @@ -344,6 +354,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { _maxTotalConnsPerMinute = getInt(opts, PROP_MAX_TOTAL_CONNS_MIN, 0); _maxTotalConnsPerHour = getInt(opts, PROP_MAX_TOTAL_CONNS_HOUR, 0); _maxTotalConnsPerDay = getInt(opts, PROP_MAX_TOTAL_CONNS_DAY, 0); + _maxConns = getInt(opts, PROP_MAX_STREAMS, 0); } /** @@ -408,6 +419,8 @@ class ConnectionOptions extends I2PSocketOptionsImpl { _maxTotalConnsPerHour = getInt(opts, PROP_MAX_TOTAL_CONNS_HOUR, 0); if (opts.containsKey(PROP_MAX_TOTAL_CONNS_DAY)) _maxTotalConnsPerDay = getInt(opts, PROP_MAX_TOTAL_CONNS_DAY, 0); + if (opts.containsKey(PROP_MAX_STREAMS)) + _maxConns = getInt(opts, PROP_MAX_STREAMS, 0); } /** @@ -660,6 +673,8 @@ class ConnectionOptions extends I2PSocketOptionsImpl { public int getMaxTotalConnsPerMinute() { return _maxTotalConnsPerMinute; } public int getMaxTotalConnsPerHour() { return _maxTotalConnsPerHour; } public int getMaxTotalConnsPerDay() { return _maxTotalConnsPerDay; } + /** @since 0.9.3; no public setter */ + public int getMaxConns() { return _maxConns; } public boolean isAccessListEnabled() { return _accessListEnabled; } public boolean isBlacklistEnabled() { return _blackListEnabled; } @@ -691,7 +706,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { String hashes = opts.getProperty(PROP_ACCESS_LIST); if (hashes == null) return; - StringTokenizer tok = new StringTokenizer(hashes, ", "); + StringTokenizer tok = new StringTokenizer(hashes, ",; "); while (tok.hasMoreTokens()) { String hashstr = tok.nextToken(); Hash h = ConvertToHash.getHash(hashstr); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java index cfc0bf7838..dc16811913 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java @@ -36,7 +36,6 @@ public class I2PSocketManagerFull implements I2PSocketManager { private final ConnectionOptions _defaultOptions; private long _acceptTimeout; private String _name; - private int _maxStreams; private static int __managerId = 0; private final ConnectionManager _connectionManager; @@ -54,9 +53,6 @@ public class I2PSocketManagerFull implements I2PSocketManager { throw new UnsupportedOperationException(); } - /** how many streams will we allow at once? */ - public static final String PROP_MAX_STREAMS = "i2p.streaming.maxConcurrentStreams"; - /** * @deprecated use 4-arg constructor * @throws UnsupportedOperationException always @@ -79,19 +75,10 @@ public class I2PSocketManagerFull implements I2PSocketManager { _session = session; _log = _context.logManager().getLog(I2PSocketManagerFull.class); - _maxStreams = -1; - try { - String num = (opts != null ? opts.getProperty(PROP_MAX_STREAMS, "-1") : "-1"); - _maxStreams = Integer.parseInt(num); - } catch (NumberFormatException nfe) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Invalid max # of concurrent streams, defaulting to unlimited", nfe); - _maxStreams = -1; - } _name = name + " " + (++__managerId); _acceptTimeout = ACCEPT_TIMEOUT_DEFAULT; _defaultOptions = new ConnectionOptions(opts); - _connectionManager = new ConnectionManager(_context, _session, _maxStreams, _defaultOptions); + _connectionManager = new ConnectionManager(_context, _session, _defaultOptions); _serverSocket = new I2PServerSocketFull(this); if (_log.shouldLog(Log.INFO)) { @@ -182,6 +169,7 @@ public class I2PSocketManagerFull implements I2PSocketManager { if (_log.shouldLog(Log.WARN)) _log.warn("Changing options from:\n " + _defaultOptions + "\nto:\n " + options); _defaultOptions.updateAll((ConnectionOptions) options); + _connectionManager.updateOptions(); } /** @@ -244,7 +232,7 @@ public class I2PSocketManagerFull implements I2PSocketManager { // the following blocks unless connect delay > 0 Connection con = _connectionManager.connect(peer, opts); if (con == null) - throw new TooManyStreamsException("Too many streams (max " + _maxStreams + ")"); + throw new TooManyStreamsException("Too many streams, max " + _defaultOptions.getMaxConns()); I2PSocketFull socket = new I2PSocketFull(con); con.setSocket(socket); if (con.getConnectionError() != null) { diff --git a/history.txt b/history.txt index add456e93b..7af4f12cc5 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,15 @@ +2012-09-xx zzz + * Addresses: Reject numeric IPs of the form n, n.n, and n.n.n + * Console, i2ptunnel: More validation of address and port in forms + * ConvertToHash: + - Add support for b64hash.i2p + - Cleanup and use cache + * i2psnark: Enable DHT by default + * RFC822Date: Synchronization fix + * Streaming: + - Implement changing connection limits on a running session + - Implement global blacklist + 2012-09-25 zzz * Context: Make files final * EventLog: Fix IAE on portable @@ -5,7 +17,7 @@ * OutboundEstablishState: Cleanup (ticket #671) * SimpleByteCache: Concurrent fix * UPnP: Cleanup & final - * URLLauncher: Add xdg-open (ticket #617) + * URLLauncher: Add xdg-open (ticket #717) 2012-09-21 zzz * BuildHandler: Use CoDel for inbound queue diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 282c18b422..0725033fa6 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 2; + public final static long BUILD = 3; /** for example "-test" */ public final static String EXTRA = "";