allow throttles on the number of streams participated in, as well as how a timeout + queue for overflow

This commit is contained in:
jrandom
2004-11-11 21:19:59 +00:00
committed by zzz
parent 9774ded4dd
commit 45b3fecfff
4 changed files with 162 additions and 55 deletions

View File

@ -54,45 +54,40 @@ class ConnectionHandler {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Accept("+ timeoutMs+") called"); _log.debug("Accept("+ timeoutMs+") called");
long expiration = timeoutMs; long expiration = timeoutMs + _context.clock().now();
if (expiration > 0) while (true) {
expiration += _context.clock().now(); if ( (timeoutMs > 0) && (expiration < _context.clock().now()) )
Packet syn = null; return null;
synchronized (_synQueue) { if (!_active)
while ( _active && (_synQueue.size() <= 0) ) { return null;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Accept("+ timeoutMs+"): active=" + _active + " queue: " + _synQueue.size()); Packet syn = null;
if (timeoutMs <= 0) { synchronized (_synQueue) {
try { _synQueue.wait(); } catch (InterruptedException ie) {} while ( _active && (_synQueue.size() <= 0) ) {
} else { if (_log.shouldLog(Log.DEBUG))
long remaining = expiration - _context.clock().now(); _log.debug("Accept("+ timeoutMs+"): active=" + _active + " queue: "
if (remaining < 0) + _synQueue.size());
break; if (timeoutMs <= 0) {
try { _synQueue.wait(remaining); } catch (InterruptedException ie) {} try { _synQueue.wait(); } catch (InterruptedException ie) {}
} else {
long remaining = expiration - _context.clock().now();
if (remaining < 0)
break;
try { _synQueue.wait(remaining); } catch (InterruptedException ie) {}
}
}
if (_active && _synQueue.size() > 0) {
syn = (Packet)_synQueue.remove(0);
} }
} }
if (_active && _synQueue.size() > 0) {
syn = (Packet)_synQueue.remove(0); if (syn != null) {
// deal with forged / invalid syn packets
Connection con = _manager.receiveConnection(syn);
if (con != null)
return con;
} }
} // keep looping...
if (syn != null) {
// deal with forged / invalid syn packets
Connection con = _manager.receiveConnection(syn);
if (con != null) {
return con;
} else if (timeoutMs > 0) {
long remaining = expiration - _context.clock().now();
if (remaining <= 0) {
return null;
} else {
return accept(remaining);
}
} else {
return accept(timeoutMs);
}
} else {
return null;
} }
} }

View File

@ -35,9 +35,11 @@ public class ConnectionManager {
/** Ping ID (ByteArray) to PingRequest */ /** Ping ID (ByteArray) to PingRequest */
private Map _pendingPings; private Map _pendingPings;
private boolean _allowIncoming; private boolean _allowIncoming;
private int _maxConcurrentStreams;
private volatile int _numWaiting;
private Object _connectionLock; private Object _connectionLock;
public ConnectionManager(I2PAppContext context, I2PSession session) { public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent) {
_context = context; _context = context;
_log = context.logManager().getLog(ConnectionManager.class); _log = context.logManager().getLog(ConnectionManager.class);
_connectionByInboundId = new HashMap(32); _connectionByInboundId = new HashMap(32);
@ -52,6 +54,8 @@ public class ConnectionManager {
session.setSessionListener(_messageHandler); session.setSessionListener(_messageHandler);
_outboundQueue = new PacketQueue(context, session); _outboundQueue = new PacketQueue(context, session);
_allowIncoming = false; _allowIncoming = false;
_maxConcurrentStreams = maxConcurrent;
_numWaiting = 0;
} }
Connection getConnectionByInboundId(byte[] id) { Connection getConnectionByInboundId(byte[] id) {
@ -77,19 +81,40 @@ public class ConnectionManager {
Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler); Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler);
byte receiveId[] = new byte[4]; byte receiveId[] = new byte[4];
_context.random().nextBytes(receiveId); _context.random().nextBytes(receiveId);
boolean reject = false;
synchronized (_connectionLock) { synchronized (_connectionLock) {
while (true) { if (locked_tooManyStreams()) {
Connection oldCon = (Connection)_connectionByInboundId.put(new ByteArray(receiveId), con); reject = true;
if (oldCon == null) { } else {
break; while (true) {
} else { Connection oldCon = (Connection)_connectionByInboundId.put(new ByteArray(receiveId), con);
_connectionByInboundId.put(new ByteArray(receiveId), oldCon); if (oldCon == null) {
// receiveId already taken, try another break;
_context.random().nextBytes(receiveId); } else {
_connectionByInboundId.put(new ByteArray(receiveId), oldCon);
// receiveId already taken, try another
_context.random().nextBytes(receiveId);
}
} }
} }
} }
if (reject) {
if (_log.shouldLog(Log.WARN))
_log.warn("Refusing connection since we have exceeded our max of "
+ _maxConcurrentStreams + " connections");
PacketLocal reply = new PacketLocal(_context, synPacket.getOptionalFrom());
reply.setFlag(Packet.FLAG_RESET);
reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
reply.setAckThrough(synPacket.getSequenceNum());
reply.setSendStreamId(synPacket.getReceiveStreamId());
reply.setReceiveStreamId(null);
reply.setOptionalFrom(_session.getMyDestination());
// this just sends the packet - no retries or whatnot
_outboundQueue.enqueue(reply);
return null;
}
con.setReceiveStreamId(receiveId); con.setReceiveStreamId(receiveId);
try { try {
con.getPacketHandler().receivePacket(synPacket, con); con.getPacketHandler().receivePacket(synPacket, con);
@ -102,24 +127,59 @@ public class ConnectionManager {
return con; return con;
} }
private static final long DEFAULT_STREAM_DELAY_MAX = 10*1000;
/** /**
* Build a new connection to the given peer. This blocks if there is no * Build a new connection to the given peer. This blocks if there is no
* connection delay, otherwise it returns immediately. * connection delay, otherwise it returns immediately.
* *
* @return new connection, or null if we have exceeded our limit
*/ */
public Connection connect(Destination peer, ConnectionOptions opts) { public Connection connect(Destination peer, ConnectionOptions opts) {
Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts); Connection con = null;
con.setRemotePeer(peer);
byte receiveId[] = new byte[4]; byte receiveId[] = new byte[4];
_context.random().nextBytes(receiveId); long expiration = _context.clock().now() + opts.getConnectTimeout();
synchronized (_connectionLock) { if (opts.getConnectTimeout() <= 0)
ByteArray ba = new ByteArray(receiveId); expiration = _context.clock().now() + DEFAULT_STREAM_DELAY_MAX;
while (_connectionByInboundId.containsKey(ba)) { _numWaiting++;
_context.random().nextBytes(receiveId); while (true) {
if (expiration < _context.clock().now()) {
if (_log.shouldLog(Log.WARN))
_log.warn("Refusing to connect since we have exceeded our max of "
+ _maxConcurrentStreams + " connections");
_numWaiting--;
return null;
} }
_connectionByInboundId.put(ba, con); con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts);
con.setRemotePeer(peer);
_context.random().nextBytes(receiveId);
boolean reject = false;
synchronized (_connectionLock) {
if (locked_tooManyStreams()) {
// allow a full buffer of pending/waiting streams
if (_numWaiting > _maxConcurrentStreams) {
if (_log.shouldLog(Log.WARN))
_log.warn("Refusing connection since we have exceeded our max of "
+ _maxConcurrentStreams + " and there are " + _numWaiting
+ " waiting already");
_numWaiting--;
return null;
}
reject = true;
} else {
ByteArray ba = new ByteArray(receiveId);
while (_connectionByInboundId.containsKey(ba)) {
_context.random().nextBytes(receiveId);
}
_connectionByInboundId.put(ba, con);
}
}
if (!reject)
break;
} }
// ok we're in...
con.setReceiveStreamId(receiveId); con.setReceiveStreamId(receiveId);
con.eventOccurred(); con.eventOccurred();
@ -127,9 +187,24 @@ public class ConnectionManager {
if (opts.getConnectDelay() <= 0) { if (opts.getConnectDelay() <= 0) {
con.waitForConnect(); con.waitForConnect();
} }
if (_numWaiting > 0)
_numWaiting--;
return con; return con;
} }
private boolean locked_tooManyStreams() {
if (_maxConcurrentStreams <= 0) return false;
if (_connectionByInboundId.size() < _maxConcurrentStreams) return false;
int active = 0;
for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
Connection con = (Connection)iter.next();
if (con.getIsConnected())
active++;
}
return (active >= _maxConcurrentStreams);
}
public MessageHandler getMessageHandler() { return _messageHandler; } public MessageHandler getMessageHandler() { return _messageHandler; }
public PacketHandler getPacketHandler() { return _packetHandler; } public PacketHandler getPacketHandler() { return _packetHandler; }
public ConnectionHandler getConnectionHandler() { return _connectionHandler; } public ConnectionHandler getConnectionHandler() { return _connectionHandler; }

View File

@ -36,6 +36,7 @@ public class I2PSocketManagerFull implements I2PSocketManager {
private ConnectionOptions _defaultOptions; private ConnectionOptions _defaultOptions;
private long _acceptTimeout; private long _acceptTimeout;
private String _name; private String _name;
private int _maxStreams;
private static int __managerId = 0; private static int __managerId = 0;
private ConnectionManager _connectionManager; private ConnectionManager _connectionManager;
@ -54,6 +55,9 @@ public class I2PSocketManagerFull implements I2PSocketManager {
init(context, session, opts, name); init(context, session, opts, name);
} }
/** how many streams will we allow at once? */
public static final String PROP_MAX_STREAMS = "i2p.streaming.maxConcurrentStreams";
/** /**
* *
*/ */
@ -61,7 +65,17 @@ public class I2PSocketManagerFull implements I2PSocketManager {
_context = context; _context = context;
_session = session; _session = session;
_log = _context.logManager().getLog(I2PSocketManagerFull.class); _log = _context.logManager().getLog(I2PSocketManagerFull.class);
_connectionManager = new ConnectionManager(_context, _session);
_maxStreams = -1;
try {
String num = (opts != null ? opts.getProperty(PROP_MAX_STREAMS, "-1") : "-1");
_maxStreams = Integer.parseInt(num);
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid max # of concurrent streams, defaulting to unlimited", nfe);
_maxStreams = -1;
}
_connectionManager = new ConnectionManager(_context, _session, _maxStreams);
_name = name + " " + (++__managerId); _name = name + " " + (++__managerId);
_acceptTimeout = ACCEPT_TIMEOUT_DEFAULT; _acceptTimeout = ACCEPT_TIMEOUT_DEFAULT;
_defaultOptions = new ConnectionOptions(opts); _defaultOptions = new ConnectionOptions(opts);
@ -140,6 +154,8 @@ public class I2PSocketManagerFull implements I2PSocketManager {
else else
opts = new ConnectionOptions(options); opts = new ConnectionOptions(options);
Connection con = _connectionManager.connect(peer, opts); Connection con = _connectionManager.connect(peer, opts);
if (con == null)
throw new TooManyStreamsException("Too many streams (max " + _maxStreams + ")");
I2PSocketFull socket = new I2PSocketFull(con); I2PSocketFull socket = new I2PSocketFull(con);
con.setSocket(socket); con.setSocket(socket);
if (con.getConnectionError() != null) { if (con.getConnectionError() != null) {

View File

@ -0,0 +1,21 @@
package net.i2p.client.streaming;
import net.i2p.I2PException;
/**
* We attempted to have more open streams than we are willing to put up with
*
*/
public class TooManyStreamsException extends I2PException {
public TooManyStreamsException(String message, Throwable parent) {
super(message, parent);
}
public TooManyStreamsException(String message) {
super(message);
}
public TooManyStreamsException() {
super();
}
}