diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelController.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelController.java
index 0106e1affe..84a3858576 100644
--- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelController.java
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelController.java
@@ -105,6 +105,9 @@ public class TunnelController implements Logging {
public static final int DEFAULT_MAX_TOTAL_CONNS_DAY = 0;
public static final int DEFAULT_MAX_STREAMS = 20;
+ /** @since 0.9.34 */
+ public static final String PROP_LIMIT_ACTION = "i2p.streaming.limitAction";
+
/** @since 0.9.14 */
public static final String PFX_OPTION = "option.";
@@ -130,6 +133,9 @@ public class TunnelController implements Logging {
public static final String OPT_POST_MAX = PFX_OPTION + I2PTunnelHTTPServer.OPT_POST_MAX;
public static final String OPT_POST_TOTAL_MAX = PFX_OPTION + I2PTunnelHTTPServer.OPT_POST_TOTAL_MAX;
+ /** @since 0.9.34 */
+ private static final String OPT_LIMIT_ACTION = PFX_OPTION + PROP_LIMIT_ACTION;
+
/** all of these @since 0.9.14 */
public static final String TYPE_CONNECT = "connectclient";
public static final String TYPE_HTTP_BIDIR_SERVER = "httpbidirserver";
@@ -800,6 +806,10 @@ public class TunnelController implements Logging {
// is done in the I2PTunnelServer constructor.
String type = getType();
if (type != null) {
+ if (type.equals(TYPE_HTTP_SERVER)) {
+ if (!_config.containsKey(OPT_LIMIT_ACTION))
+ _config.setProperty(OPT_LIMIT_ACTION, "http");
+ }
if (type.equals(TYPE_HTTP_SERVER) || type.equals(TYPE_STREAMR_SERVER)) {
if (!_config.containsKey(OPT_BUNDLE_REPLY))
_config.setProperty(OPT_BUNDLE_REPLY, "false");
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnThrottler.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnThrottler.java
index 3b69c11375..e58f46d7b6 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnThrottler.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnThrottler.java
@@ -70,6 +70,16 @@ class ConnThrottler {
return false;
}
+ /**
+ * Checks if individual count is over the limit by this much. Does not increment.
+ * @since 0.9.34
+ */
+ boolean isOverBy(Hash h, int over) {
+ if (_max > 0)
+ return this.counter.count(h) > _max + over;
+ return false;
+ }
+
private class Cleaner implements SimpleTimer.TimedEvent {
public void timeReached() {
if (_totalMax > 0)
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java
index f98062b662..1c24931c94 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java
@@ -12,6 +12,7 @@ import net.i2p.I2PAppContext;
import net.i2p.I2PException;
import net.i2p.client.I2PSession;
import net.i2p.data.ByteArray;
+import net.i2p.data.DataHelper;
import net.i2p.data.Destination;
import net.i2p.data.Hash;
import net.i2p.data.SessionKey;
@@ -61,6 +62,21 @@ class ConnectionManager {
public static final String PROP_BLACKLIST = "i2p.streaming.blacklist";
private static final long MAX_PING_TIMEOUT = 5*60*1000;
private static final int MAX_PONG_PAYLOAD = 32;
+ /** once over throttle limits, respond this many times before just dropping */
+ private static final int DROP_OVER_LIMIT = 3;
+
+ // TODO https://stackoverflow.com/questions/16022624/examples-of-http-api-rate-limiting-http-response-headers
+ private static final String LIMIT_HTTP_RESPONSE =
+ "HTTP/1.1 429 Denied\r\n"+
+ "Content-Type: text/html; charset=iso-8859-1\r\n"+
+ "Cache-Control: no-cache\r\n"+
+ "Connection: close\r\n"+
+ "Proxy-Connection: close\r\n"+
+ "\r\n"+
+ "
429 Denied"+
+ "429 Denied
" +
+ "Denied due to excessive requests. Please try again later." +
+ "";
/**
* Manage all conns for this session
@@ -89,7 +105,7 @@ class ConnectionManager {
int protocol = defaultOptions.getEnforceProtocol() ? I2PSession.PROTO_STREAMING : I2PSession.PROTO_ANY;
_session.addMuxedSessionListener(_messageHandler, protocol, defaultOptions.getLocalPort());
_outboundQueue = new PacketQueue(_context, _timer);
- _recentlyClosed = new LHMCache(32);
+ _recentlyClosed = new LHMCache(64);
/** Socket timeout for accept() */
_soTimeout = -1;
@@ -217,9 +233,6 @@ class ConnectionManager {
ConnectionOptions opts = new ConnectionOptions(_defaultOptions);
opts.setPort(synPacket.getRemotePort());
opts.setLocalPort(synPacket.getLocalPort());
- Connection con = new Connection(_context, this, synPacket.getSession(), _schedulerChooser,
- _timer, _outboundQueue, _conPacketHandler, opts, true);
- _tcbShare.updateOptsFromShare(con);
boolean reject = false;
int active = 0;
int total = 0;
@@ -243,8 +256,6 @@ class ConnectionManager {
_log.logAlways(Log.WARN, "Refusing connection since peer is " + why +
(synPacket.getOptionalFrom() == null ? "" : ": " + synPacket.getOptionalFrom().toBase32()));
reject = true;
- } else {
- assignReceiveStreamId(con);
}
}
@@ -254,40 +265,72 @@ class ConnectionManager {
Destination from = synPacket.getOptionalFrom();
if (from == null)
return null;
- if (_dayThrottler != null || _hourThrottler != null) {
- Hash h = from.calculateHash();
- if ((_hourThrottler != null && _hourThrottler.isThrottled(h)) ||
- (_dayThrottler != null && _dayThrottler.isThrottled(h)) ||
- _globalBlacklist.contains(h) ||
- (_defaultOptions.isAccessListEnabled() && !_defaultOptions.getAccessList().contains(h)) ||
- (_defaultOptions.isBlacklistEnabled() && _defaultOptions.getBlacklist().contains(h))) {
- // A signed RST packet + ElGamal + session tags is fairly expensive, so
- // once the hour/day limit is hit for a particular peer, don't even send it.
- // Ditto for blacklist / whitelist
- // This is a tradeoff, because it will keep retransmitting the SYN for a while,
- // thus more inbound, but let's not spend several KB on the outbound.
- if (!Boolean.valueOf(_context.getProperty("i2p.streaming.sendResetOnBlock"))) {
- // this is the default. Set property to send reset for debugging.
- if (_log.shouldLog(Log.INFO))
- _log.info("Dropping RST to " + h);
- return null;
- }
- }
+ String resp = _defaultOptions.getLimitAction();
+ if ("drop".equals(resp)) {
+ // always drop
+ return null;
}
+ Hash h = from.calculateHash();
+ if (_globalBlacklist.contains(h) ||
+ (_defaultOptions.isAccessListEnabled() && !_defaultOptions.getAccessList().contains(h)) ||
+ (_defaultOptions.isBlacklistEnabled() && _defaultOptions.getBlacklist().contains(h))) {
+ // always drop these regardless of setting
+ return null;
+ }
+
+ if ((_minuteThrottler != null && _minuteThrottler.isOverBy(h, DROP_OVER_LIMIT)) ||
+ (_hourThrottler != null && _hourThrottler.isOverBy(h, DROP_OVER_LIMIT)) ||
+ (_dayThrottler != null && _dayThrottler.isOverBy(h, DROP_OVER_LIMIT))) {
+ // A signed RST/close packet + ElGamal + session tags is fairly expensive, so
+ // once a limit is significantly exceeded for a particular peer, don't even send it.
+ // This is a tradeoff, because it will keep retransmitting the SYN for a while,
+ // thus more inbound, but let's not spend several KB on the outbound.
+ if (_log.shouldLog(Log.INFO))
+ _log.info("Dropping limit response to " + from.toBase32());
+ return null;
+ }
+
+ boolean reset = resp == null || resp.equals("reset") || resp.length() <= 0;
+ boolean http = !reset && "http".equals(resp);
+ boolean custom = !(reset || http);
+ String sendResponse;
+ if (http) {
+ sendResponse = LIMIT_HTTP_RESPONSE;
+ } else if (custom) {
+ sendResponse = resp.replace("\\r", "\r").replace("\\n", "\n");
+ } else {
+ sendResponse = null;
+ }
+
PacketLocal reply = new PacketLocal(_context, from, synPacket.getSession());
- reply.setFlag(Packet.FLAG_RESET);
- reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
+ if (sendResponse != null) {
+ reply.setFlag(Packet.FLAG_SYNCHRONIZE | Packet.FLAG_CLOSE | Packet.FLAG_SIGNATURE_INCLUDED);
+ reply.setSequenceNum(0);
+ ByteArray payload = new ByteArray(DataHelper.getUTF8(sendResponse));
+ reply.setPayload(payload);
+ } else {
+ reply.setFlag(Packet.FLAG_RESET | Packet.FLAG_SIGNATURE_INCLUDED);
+ }
reply.setAckThrough(synPacket.getSequenceNum());
reply.setSendStreamId(synPacket.getReceiveStreamId());
- reply.setReceiveStreamId(0);
+ long rcvStreamId = assignRejectId();
+ reply.setReceiveStreamId(rcvStreamId);
reply.setOptionalFrom();
reply.setLocalPort(synPacket.getLocalPort());
reply.setRemotePort(synPacket.getRemotePort());
+ if (_log.shouldInfo())
+ //_log.info("Over limit, sending " + (sendResponse != null ? "configured response" : "reset") + " to " + from.toBase32());
+ _log.info("Over limit, sending " + reply + " to " + from.toBase32());
// this just sends the packet - no retries or whatnot
_outboundQueue.enqueue(reply);
return null;
}
+ Connection con = new Connection(_context, this, synPacket.getSession(), _schedulerChooser,
+ _timer, _outboundQueue, _conPacketHandler, opts, true);
+ _tcbShare.updateOptsFromShare(con);
+ assignReceiveStreamId(con);
+
// finally, we know enough that we can log the packet with the conn filled in
if (I2PSocketManagerFull.pcapWriter != null &&
_context.getBooleanProperty(I2PSocketManagerFull.PROP_PCAP))
@@ -388,6 +431,26 @@ class ConnectionManager {
}
return receiveId;
}
+
+ /**
+ * Pick a new random stream ID that we are rejecting,
+ * taking care to avoid duplicates, and return it.
+ *
+ * @since 0.9.34
+ */
+ private long assignRejectId() {
+ long receiveId;
+ synchronized(_recentlyClosed) {
+ Long rcvID;
+ do {
+ receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
+ rcvID = Long.valueOf(receiveId);
+ } while (_recentlyClosed.containsKey(rcvID) ||
+ _connectionByInboundId.containsKey(rcvID));
+ _recentlyClosed.put(rcvID, DUMMY);
+ }
+ return receiveId;
+ }
private static final long DEFAULT_STREAM_DELAY_MAX = 10*1000;
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java
index aab7eb5dc1..b3c7aae5ec 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java
@@ -52,6 +52,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
private int _maxTotalConnsPerDay;
private int _maxConns;
private boolean _disableRejectLog;
+ private String _limitAction;
/** state of a connection */
private enum AckInit {
@@ -122,6 +123,8 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
public static final String PROP_MAX_STREAMS = "i2p.streaming.maxConcurrentStreams";
/** @since 0.9.4 default false */
public static final String PROP_DISABLE_REJ_LOG = "i2p.streaming.disableRejectLogging";
+ /** @since 0.9.34 reset,drop,http, or custom string, default reset */
+ public static final String PROP_LIMIT_ACTION = "i2p.streaming.limitAction";
private static final int TREND_COUNT = 3;
@@ -136,6 +139,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
private static final int DEFAULT_INACTIVITY_ACTION = INACTIVITY_ACTION_SEND;
private static final int DEFAULT_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR = 1;
private static final int DEFAULT_SLOW_START_GROWTH_RATE_FACTOR = 1;
+ private static final String DEFAULT_LIMIT_ACTION = "reset";
/**
@@ -347,6 +351,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
_maxTotalConnsPerHour = opts.getMaxTotalConnsPerHour();
_maxTotalConnsPerDay = opts.getMaxTotalConnsPerDay();
_maxConns = opts.getMaxConns();
+ _limitAction = opts.getLimitAction();
}
/**
@@ -384,6 +389,10 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
_maxTotalConnsPerHour = getInt(opts, PROP_MAX_TOTAL_CONNS_HOUR, 0);
_maxTotalConnsPerDay = getInt(opts, PROP_MAX_TOTAL_CONNS_DAY, 0);
_maxConns = getInt(opts, PROP_MAX_STREAMS, 0);
+ if (opts != null)
+ _limitAction = opts.getProperty(PROP_LIMIT_ACTION, DEFAULT_LIMIT_ACTION);
+ else
+ _limitAction = DEFAULT_LIMIT_ACTION;
_rto = getInt(opts, PROP_INITIAL_RTO, INITIAL_RTO);
}
@@ -453,6 +462,8 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
_maxTotalConnsPerDay = getInt(opts, PROP_MAX_TOTAL_CONNS_DAY, 0);
if (opts.getProperty(PROP_MAX_STREAMS) != null)
_maxConns = getInt(opts, PROP_MAX_STREAMS, 0);
+ if (opts.getProperty(PROP_LIMIT_ACTION) != null)
+ _limitAction = opts.getProperty(PROP_LIMIT_ACTION);
_rto = getInt(opts, PROP_INITIAL_RTO, INITIAL_RTO);
}
@@ -772,6 +783,14 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
public Set getAccessList() { return _accessList; }
public Set getBlacklist() { return _blackList; }
+ /**
+ * "reset", "drop", "http", or custom string.
+ * Default "reset".
+ *
+ * @since 0.9.34
+ */
+ public String getLimitAction() { return _limitAction; }
+
private void initLists(ConnectionOptions opts) {
_accessList = opts.getAccessList();
_blackList = opts.getBlacklist();
diff --git a/history.txt b/history.txt
index 051ab2ae29..656cdcf646 100644
--- a/history.txt
+++ b/history.txt
@@ -1,4 +1,11 @@
+2018-02-16 zzz
+ * i2psnark: Fix NPE on torrent not found (ticket #2167)
+ * i2ptunnel: Change POST throttle response to 429
+ * Streaming: Configurable response when over conn limits (ticket #2145)
+
2018-02-12 zzz
+ * i2ptunnel: Close sockets
+ * Proxy: Update clearnet user-agent (ticket #2163)
* SusiMail:
- Background email checking (ticket #2087)
- Set Cache-Control header for attachments
diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java
index 4e28a2c8d2..c10128fe62 100644
--- a/router/java/src/net/i2p/router/RouterVersion.java
+++ b/router/java/src/net/i2p/router/RouterVersion.java
@@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
- public final static long BUILD = 5;
+ public final static long BUILD = 6;
/** for example "-test" */
public final static String EXTRA = "";