forked from I2P_Developers/i2p.i2p
Streaming: configurable response when over connection limits (ticket #2145)
Drop when way over limits. Default to HTTP 429 for HTTP Server tunnels Increase recently-closed cache size
This commit is contained in:
@ -70,6 +70,16 @@ class ConnThrottler {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if individual count is over the limit by this much. Does not increment.
|
||||
* @since 0.9.34
|
||||
*/
|
||||
boolean isOverBy(Hash h, int over) {
|
||||
if (_max > 0)
|
||||
return this.counter.count(h) > _max + over;
|
||||
return false;
|
||||
}
|
||||
|
||||
private class Cleaner implements SimpleTimer.TimedEvent {
|
||||
public void timeReached() {
|
||||
if (_totalMax > 0)
|
||||
|
@ -12,6 +12,7 @@ import net.i2p.I2PAppContext;
|
||||
import net.i2p.I2PException;
|
||||
import net.i2p.client.I2PSession;
|
||||
import net.i2p.data.ByteArray;
|
||||
import net.i2p.data.DataHelper;
|
||||
import net.i2p.data.Destination;
|
||||
import net.i2p.data.Hash;
|
||||
import net.i2p.data.SessionKey;
|
||||
@ -61,6 +62,21 @@ class ConnectionManager {
|
||||
public static final String PROP_BLACKLIST = "i2p.streaming.blacklist";
|
||||
private static final long MAX_PING_TIMEOUT = 5*60*1000;
|
||||
private static final int MAX_PONG_PAYLOAD = 32;
|
||||
/** once over throttle limits, respond this many times before just dropping */
|
||||
private static final int DROP_OVER_LIMIT = 3;
|
||||
|
||||
// TODO https://stackoverflow.com/questions/16022624/examples-of-http-api-rate-limiting-http-response-headers
|
||||
private static final String LIMIT_HTTP_RESPONSE =
|
||||
"HTTP/1.1 429 Denied\r\n"+
|
||||
"Content-Type: text/html; charset=iso-8859-1\r\n"+
|
||||
"Cache-Control: no-cache\r\n"+
|
||||
"Connection: close\r\n"+
|
||||
"Proxy-Connection: close\r\n"+
|
||||
"\r\n"+
|
||||
"<html><head><title>429 Denied</title></head>"+
|
||||
"<body><h3>429 Denied</h3>" +
|
||||
"<p>Denied due to excessive requests. Please try again later." +
|
||||
"</body></html>";
|
||||
|
||||
/**
|
||||
* Manage all conns for this session
|
||||
@ -89,7 +105,7 @@ class ConnectionManager {
|
||||
int protocol = defaultOptions.getEnforceProtocol() ? I2PSession.PROTO_STREAMING : I2PSession.PROTO_ANY;
|
||||
_session.addMuxedSessionListener(_messageHandler, protocol, defaultOptions.getLocalPort());
|
||||
_outboundQueue = new PacketQueue(_context, _timer);
|
||||
_recentlyClosed = new LHMCache<Long, Object>(32);
|
||||
_recentlyClosed = new LHMCache<Long, Object>(64);
|
||||
/** Socket timeout for accept() */
|
||||
_soTimeout = -1;
|
||||
|
||||
@ -217,9 +233,6 @@ class ConnectionManager {
|
||||
ConnectionOptions opts = new ConnectionOptions(_defaultOptions);
|
||||
opts.setPort(synPacket.getRemotePort());
|
||||
opts.setLocalPort(synPacket.getLocalPort());
|
||||
Connection con = new Connection(_context, this, synPacket.getSession(), _schedulerChooser,
|
||||
_timer, _outboundQueue, _conPacketHandler, opts, true);
|
||||
_tcbShare.updateOptsFromShare(con);
|
||||
boolean reject = false;
|
||||
int active = 0;
|
||||
int total = 0;
|
||||
@ -243,8 +256,6 @@ class ConnectionManager {
|
||||
_log.logAlways(Log.WARN, "Refusing connection since peer is " + why +
|
||||
(synPacket.getOptionalFrom() == null ? "" : ": " + synPacket.getOptionalFrom().toBase32()));
|
||||
reject = true;
|
||||
} else {
|
||||
assignReceiveStreamId(con);
|
||||
}
|
||||
}
|
||||
|
||||
@ -254,40 +265,72 @@ class ConnectionManager {
|
||||
Destination from = synPacket.getOptionalFrom();
|
||||
if (from == null)
|
||||
return null;
|
||||
if (_dayThrottler != null || _hourThrottler != null) {
|
||||
Hash h = from.calculateHash();
|
||||
if ((_hourThrottler != null && _hourThrottler.isThrottled(h)) ||
|
||||
(_dayThrottler != null && _dayThrottler.isThrottled(h)) ||
|
||||
_globalBlacklist.contains(h) ||
|
||||
(_defaultOptions.isAccessListEnabled() && !_defaultOptions.getAccessList().contains(h)) ||
|
||||
(_defaultOptions.isBlacklistEnabled() && _defaultOptions.getBlacklist().contains(h))) {
|
||||
// A signed RST packet + ElGamal + session tags is fairly expensive, so
|
||||
// once the hour/day limit is hit for a particular peer, don't even send it.
|
||||
// Ditto for blacklist / whitelist
|
||||
// This is a tradeoff, because it will keep retransmitting the SYN for a while,
|
||||
// thus more inbound, but let's not spend several KB on the outbound.
|
||||
if (!Boolean.valueOf(_context.getProperty("i2p.streaming.sendResetOnBlock"))) {
|
||||
// this is the default. Set property to send reset for debugging.
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Dropping RST to " + h);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
String resp = _defaultOptions.getLimitAction();
|
||||
if ("drop".equals(resp)) {
|
||||
// always drop
|
||||
return null;
|
||||
}
|
||||
Hash h = from.calculateHash();
|
||||
if (_globalBlacklist.contains(h) ||
|
||||
(_defaultOptions.isAccessListEnabled() && !_defaultOptions.getAccessList().contains(h)) ||
|
||||
(_defaultOptions.isBlacklistEnabled() && _defaultOptions.getBlacklist().contains(h))) {
|
||||
// always drop these regardless of setting
|
||||
return null;
|
||||
}
|
||||
|
||||
if ((_minuteThrottler != null && _minuteThrottler.isOverBy(h, DROP_OVER_LIMIT)) ||
|
||||
(_hourThrottler != null && _hourThrottler.isOverBy(h, DROP_OVER_LIMIT)) ||
|
||||
(_dayThrottler != null && _dayThrottler.isOverBy(h, DROP_OVER_LIMIT))) {
|
||||
// A signed RST/close packet + ElGamal + session tags is fairly expensive, so
|
||||
// once a limit is significantly exceeded for a particular peer, don't even send it.
|
||||
// This is a tradeoff, because it will keep retransmitting the SYN for a while,
|
||||
// thus more inbound, but let's not spend several KB on the outbound.
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Dropping limit response to " + from.toBase32());
|
||||
return null;
|
||||
}
|
||||
|
||||
boolean reset = resp == null || resp.equals("reset") || resp.length() <= 0;
|
||||
boolean http = !reset && "http".equals(resp);
|
||||
boolean custom = !(reset || http);
|
||||
String sendResponse;
|
||||
if (http) {
|
||||
sendResponse = LIMIT_HTTP_RESPONSE;
|
||||
} else if (custom) {
|
||||
sendResponse = resp.replace("\\r", "\r").replace("\\n", "\n");
|
||||
} else {
|
||||
sendResponse = null;
|
||||
}
|
||||
|
||||
PacketLocal reply = new PacketLocal(_context, from, synPacket.getSession());
|
||||
reply.setFlag(Packet.FLAG_RESET);
|
||||
reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
|
||||
if (sendResponse != null) {
|
||||
reply.setFlag(Packet.FLAG_SYNCHRONIZE | Packet.FLAG_CLOSE | Packet.FLAG_SIGNATURE_INCLUDED);
|
||||
reply.setSequenceNum(0);
|
||||
ByteArray payload = new ByteArray(DataHelper.getUTF8(sendResponse));
|
||||
reply.setPayload(payload);
|
||||
} else {
|
||||
reply.setFlag(Packet.FLAG_RESET | Packet.FLAG_SIGNATURE_INCLUDED);
|
||||
}
|
||||
reply.setAckThrough(synPacket.getSequenceNum());
|
||||
reply.setSendStreamId(synPacket.getReceiveStreamId());
|
||||
reply.setReceiveStreamId(0);
|
||||
long rcvStreamId = assignRejectId();
|
||||
reply.setReceiveStreamId(rcvStreamId);
|
||||
reply.setOptionalFrom();
|
||||
reply.setLocalPort(synPacket.getLocalPort());
|
||||
reply.setRemotePort(synPacket.getRemotePort());
|
||||
if (_log.shouldInfo())
|
||||
//_log.info("Over limit, sending " + (sendResponse != null ? "configured response" : "reset") + " to " + from.toBase32());
|
||||
_log.info("Over limit, sending " + reply + " to " + from.toBase32());
|
||||
// this just sends the packet - no retries or whatnot
|
||||
_outboundQueue.enqueue(reply);
|
||||
return null;
|
||||
}
|
||||
|
||||
Connection con = new Connection(_context, this, synPacket.getSession(), _schedulerChooser,
|
||||
_timer, _outboundQueue, _conPacketHandler, opts, true);
|
||||
_tcbShare.updateOptsFromShare(con);
|
||||
assignReceiveStreamId(con);
|
||||
|
||||
// finally, we know enough that we can log the packet with the conn filled in
|
||||
if (I2PSocketManagerFull.pcapWriter != null &&
|
||||
_context.getBooleanProperty(I2PSocketManagerFull.PROP_PCAP))
|
||||
@ -388,6 +431,26 @@ class ConnectionManager {
|
||||
}
|
||||
return receiveId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Pick a new random stream ID that we are rejecting,
|
||||
* taking care to avoid duplicates, and return it.
|
||||
*
|
||||
* @since 0.9.34
|
||||
*/
|
||||
private long assignRejectId() {
|
||||
long receiveId;
|
||||
synchronized(_recentlyClosed) {
|
||||
Long rcvID;
|
||||
do {
|
||||
receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
|
||||
rcvID = Long.valueOf(receiveId);
|
||||
} while (_recentlyClosed.containsKey(rcvID) ||
|
||||
_connectionByInboundId.containsKey(rcvID));
|
||||
_recentlyClosed.put(rcvID, DUMMY);
|
||||
}
|
||||
return receiveId;
|
||||
}
|
||||
|
||||
private static final long DEFAULT_STREAM_DELAY_MAX = 10*1000;
|
||||
|
||||
|
@ -52,6 +52,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
|
||||
private int _maxTotalConnsPerDay;
|
||||
private int _maxConns;
|
||||
private boolean _disableRejectLog;
|
||||
private String _limitAction;
|
||||
|
||||
/** state of a connection */
|
||||
private enum AckInit {
|
||||
@ -122,6 +123,8 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
|
||||
public static final String PROP_MAX_STREAMS = "i2p.streaming.maxConcurrentStreams";
|
||||
/** @since 0.9.4 default false */
|
||||
public static final String PROP_DISABLE_REJ_LOG = "i2p.streaming.disableRejectLogging";
|
||||
/** @since 0.9.34 reset,drop,http, or custom string, default reset */
|
||||
public static final String PROP_LIMIT_ACTION = "i2p.streaming.limitAction";
|
||||
|
||||
|
||||
private static final int TREND_COUNT = 3;
|
||||
@ -136,6 +139,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
|
||||
private static final int DEFAULT_INACTIVITY_ACTION = INACTIVITY_ACTION_SEND;
|
||||
private static final int DEFAULT_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR = 1;
|
||||
private static final int DEFAULT_SLOW_START_GROWTH_RATE_FACTOR = 1;
|
||||
private static final String DEFAULT_LIMIT_ACTION = "reset";
|
||||
|
||||
|
||||
/**
|
||||
@ -347,6 +351,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
|
||||
_maxTotalConnsPerHour = opts.getMaxTotalConnsPerHour();
|
||||
_maxTotalConnsPerDay = opts.getMaxTotalConnsPerDay();
|
||||
_maxConns = opts.getMaxConns();
|
||||
_limitAction = opts.getLimitAction();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -384,6 +389,10 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
|
||||
_maxTotalConnsPerHour = getInt(opts, PROP_MAX_TOTAL_CONNS_HOUR, 0);
|
||||
_maxTotalConnsPerDay = getInt(opts, PROP_MAX_TOTAL_CONNS_DAY, 0);
|
||||
_maxConns = getInt(opts, PROP_MAX_STREAMS, 0);
|
||||
if (opts != null)
|
||||
_limitAction = opts.getProperty(PROP_LIMIT_ACTION, DEFAULT_LIMIT_ACTION);
|
||||
else
|
||||
_limitAction = DEFAULT_LIMIT_ACTION;
|
||||
|
||||
_rto = getInt(opts, PROP_INITIAL_RTO, INITIAL_RTO);
|
||||
}
|
||||
@ -453,6 +462,8 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
|
||||
_maxTotalConnsPerDay = getInt(opts, PROP_MAX_TOTAL_CONNS_DAY, 0);
|
||||
if (opts.getProperty(PROP_MAX_STREAMS) != null)
|
||||
_maxConns = getInt(opts, PROP_MAX_STREAMS, 0);
|
||||
if (opts.getProperty(PROP_LIMIT_ACTION) != null)
|
||||
_limitAction = opts.getProperty(PROP_LIMIT_ACTION);
|
||||
|
||||
_rto = getInt(opts, PROP_INITIAL_RTO, INITIAL_RTO);
|
||||
}
|
||||
@ -772,6 +783,14 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
|
||||
public Set<Hash> getAccessList() { return _accessList; }
|
||||
public Set<Hash> getBlacklist() { return _blackList; }
|
||||
|
||||
/**
|
||||
* "reset", "drop", "http", or custom string.
|
||||
* Default "reset".
|
||||
*
|
||||
* @since 0.9.34
|
||||
*/
|
||||
public String getLimitAction() { return _limitAction; }
|
||||
|
||||
private void initLists(ConnectionOptions opts) {
|
||||
_accessList = opts.getAccessList();
|
||||
_blackList = opts.getBlacklist();
|
||||
|
Reference in New Issue
Block a user