Bound and concurrentify SYN queue to hopefully prevent explosion

This commit is contained in:
zzz
2009-02-04 14:32:09 +00:00
parent 69f051da41
commit a6dc27adaf

View File

@ -1,5 +1,7 @@
package net.i2p.client.streaming; package net.i2p.client.streaming;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -10,24 +12,36 @@ import net.i2p.util.SimpleTimer;
/** /**
* Receive new connection attempts * Receive new connection attempts
*
* Use a bounded queue to limit the damage from SYN floods,
* router overload, or a slow client
*
* @author zzz modded to use concurrent and bound queue size
*/ */
class ConnectionHandler { class ConnectionHandler {
private I2PAppContext _context; private I2PAppContext _context;
private Log _log; private Log _log;
private ConnectionManager _manager; private ConnectionManager _manager;
private List _synQueue; private LinkedBlockingQueue<Packet> _synQueue;
private boolean _active; private boolean _active;
private int _acceptTimeout; private int _acceptTimeout;
/** max time after receiveNewSyn() and before the matched accept() */ /** max time after receiveNewSyn() and before the matched accept() */
private static final int DEFAULT_ACCEPT_TIMEOUT = 3*1000; private static final int DEFAULT_ACCEPT_TIMEOUT = 3*1000;
/**
* This is both SYNs and subsequent packets, and with an initial window size of 12,
* this is a backlog of 5 to 64 Syns, which seems like plenty for now
* Don't make this too big because the removal by all the TimeoutSyns is O(n**2) - sortof.
*/
private static final int MAX_QUEUE_SIZE = 64;
/** Creates a new instance of ConnectionHandler */ /** Creates a new instance of ConnectionHandler */
public ConnectionHandler(I2PAppContext context, ConnectionManager mgr) { public ConnectionHandler(I2PAppContext context, ConnectionManager mgr) {
_context = context; _context = context;
_log = context.logManager().getLog(ConnectionHandler.class); _log = context.logManager().getLog(ConnectionHandler.class);
_manager = mgr; _manager = mgr;
_synQueue = new ArrayList(5); _synQueue = new LinkedBlockingQueue(MAX_QUEUE_SIZE);
_active = false; _active = false;
_acceptTimeout = DEFAULT_ACCEPT_TIMEOUT; _acceptTimeout = DEFAULT_ACCEPT_TIMEOUT;
} }
@ -35,9 +49,11 @@ class ConnectionHandler {
public void setActive(boolean active) { public void setActive(boolean active) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("setActive(" + active + ") called"); _log.debug("setActive(" + active + ") called");
synchronized (_synQueue) { _active = active;
_active = active; if (!active) {
_synQueue.notifyAll(); // so we break from the accept() try {
_synQueue.put(new PoisonPacket()); // so we break from the accept() - waits until space is available
} catch (InterruptedException ie) {}
} }
} }
public boolean getActive() { return _active; } public boolean getActive() { return _active; }
@ -45,6 +61,11 @@ class ConnectionHandler {
/** /**
* Non-SYN packets with a zero SendStreamID may also be queued here so * Non-SYN packets with a zero SendStreamID may also be queued here so
* that they don't get thrown away while the SYN packet before it is queued. * that they don't get thrown away while the SYN packet before it is queued.
*
* Additional overload protection may be required here...
* We don't have a 3-way handshake, so the SYN fully opens a connection.
* Does that make us more or less vulnerable to SYN flooding?
*
*/ */
public void receiveNewSyn(Packet packet) { public void receiveNewSyn(Packet packet) {
if (!_active) { if (!_active) {
@ -55,10 +76,15 @@ class ConnectionHandler {
} }
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive new SYN: " + packet + ": timeout in " + _acceptTimeout); _log.debug("Receive new SYN: " + packet + ": timeout in " + _acceptTimeout);
SimpleScheduler.getInstance().addEvent(new TimeoutSyn(packet), _acceptTimeout); // also check if expiration of the head is long past for overload detection with peek() ?
synchronized (_synQueue) { boolean success = _synQueue.offer(packet); // fail immediately if full
_synQueue.add(packet); if (success) {
_synQueue.notifyAll(); SimpleScheduler.getInstance().addEvent(new TimeoutSyn(packet), _acceptTimeout);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping new SYN request, as the queue is full");
if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE))
sendReset(packet);
} }
} }
@ -82,41 +108,44 @@ class ConnectionHandler {
return null; return null;
if (!_active) { if (!_active) {
// fail all the ones we had queued up // fail all the ones we had queued up
synchronized (_synQueue) { while(true) {
for (int i = 0; i < _synQueue.size(); i++) { Packet packet = _synQueue.poll(); // fails immediately if empty
Packet packet = (Packet)_synQueue.get(i); if (packet == null || packet.getOptionalDelay() == PoisonPacket.MAX_DELAY_REQUEST)
sendReset(packet); break;
} sendReset(packet);
_synQueue.clear();
} }
return null; return null;
} }
Packet syn = null; Packet syn = null;
synchronized (_synQueue) { while ( _active && syn == null) {
while ( _active && (_synQueue.size() <= 0) ) { if (_log.shouldLog(Log.DEBUG))
if (_log.shouldLog(Log.DEBUG)) _log.debug("Accept("+ timeoutMs+"): active=" + _active + " queue: "
_log.debug("Accept("+ timeoutMs+"): active=" + _active + " queue: " + _synQueue.size());
+ _synQueue.size()); if (timeoutMs <= 0) {
if (timeoutMs <= 0) { try {
try { _synQueue.wait(); } catch (InterruptedException ie) {} syn = _synQueue.take(); // waits forever
} else { } catch (InterruptedException ie) {}
long remaining = expiration - _context.clock().now(); } else {
// BUGFIX long remaining = expiration - _context.clock().now();
// The specified amount of real time has elapsed, more or less. // (dont think this applies anymore for LinkedBlockingQueue)
// If timeout is zero, however, then real time is not taken into consideration // BUGFIX
// and the thread simply waits until notified. // The specified amount of real time has elapsed, more or less.
if (remaining < 1) // If timeout is zero, however, then real time is not taken into consideration
break; // and the thread simply waits until notified.
try { _synQueue.wait(remaining); } catch (InterruptedException ie) {} if (remaining < 1)
} break;
} try {
if (_active && _synQueue.size() > 0) { syn = _synQueue.poll(remaining, TimeUnit.MILLISECONDS); // waits the specified time max
syn = (Packet)_synQueue.remove(0); } catch (InterruptedException ie) {}
break;
} }
} }
if (syn != null) { if (syn != null) {
if (syn.getOptionalDelay() == PoisonPacket.MAX_DELAY_REQUEST)
return null;
// deal with forged / invalid syn packets // deal with forged / invalid syn packets
// Handle both SYN and non-SYN packets in the queue // Handle both SYN and non-SYN packets in the queue
@ -179,10 +208,7 @@ class ConnectionHandler {
} }
public void timeReached() { public void timeReached() {
boolean removed = false; boolean removed = _synQueue.remove(_synPacket);
synchronized (_synQueue) {
removed = _synQueue.remove(_synPacket);
}
if (removed) { if (removed) {
if (_synPacket.isFlagSet(Packet.FLAG_SYNCHRONIZE)) if (_synPacket.isFlagSet(Packet.FLAG_SYNCHRONIZE))
@ -196,4 +222,17 @@ class ConnectionHandler {
} }
} }
} }
/**
* Simple end-of-queue marker.
* The standard class limits the delay to MAX_DELAY_REQUEST so
* an evil user can't use this to shut us down
*/
private static class PoisonPacket extends Packet {
public static final int MAX_DELAY_REQUEST = Packet.MAX_DELAY_REQUEST + 1;
public PoisonPacket() {
setOptionalDelay(MAX_DELAY_REQUEST);
}
}
} }