instead of the maxQueuedMessages limit, use the rule 'if any of the messages time out on the queue, its going too slowly'
(this helps in situations where we've got a flash flood of small messages to send)
This commit is contained in:
@ -63,16 +63,12 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
|
|||||||
protected SessionKey _key;
|
protected SessionKey _key;
|
||||||
protected ByteArray _extraBytes;
|
protected ByteArray _extraBytes;
|
||||||
protected byte[] _iv;
|
protected byte[] _iv;
|
||||||
protected int _maxQueuedMessages;
|
|
||||||
private long _lastSliceRun;
|
private long _lastSliceRun;
|
||||||
private boolean _closed;
|
private boolean _closed;
|
||||||
private boolean _weInitiated;
|
private boolean _weInitiated;
|
||||||
private long _created;
|
private long _created;
|
||||||
protected RouterContext _context;
|
protected RouterContext _context;
|
||||||
|
|
||||||
public final static String PARAM_MAX_QUEUED_MESSAGES = "i2np.tcp.maxQueuedMessages";
|
|
||||||
private final static int DEFAULT_MAX_QUEUED_MESSAGES = 20;
|
|
||||||
|
|
||||||
public TCPConnection(RouterContext context, Socket s, boolean locallyInitiated) {
|
public TCPConnection(RouterContext context, Socket s, boolean locallyInitiated) {
|
||||||
_context = context;
|
_context = context;
|
||||||
_log = context.logManager().getLog(TCPConnection.class);
|
_log = context.logManager().getLog(TCPConnection.class);
|
||||||
@ -101,7 +97,6 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
|
|||||||
_remotePort = s.getPort();
|
_remotePort = s.getPort();
|
||||||
if (_log.shouldLog(Log.INFO))
|
if (_log.shouldLog(Log.INFO))
|
||||||
_log.info("Connected with peer: " + _remoteHost + ":" + _remotePort);
|
_log.info("Connected with peer: " + _remoteHost + ":" + _remotePort);
|
||||||
updateMaxQueuedMessages();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** how long has this connection been around for, or -1 if it isn't established yet */
|
/** how long has this connection been around for, or -1 if it isn't established yet */
|
||||||
@ -114,21 +109,6 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
|
|||||||
|
|
||||||
protected boolean weInitiatedConnection() { return _weInitiated; }
|
protected boolean weInitiatedConnection() { return _weInitiated; }
|
||||||
|
|
||||||
private void updateMaxQueuedMessages() {
|
|
||||||
String str = _context.router().getConfigSetting(PARAM_MAX_QUEUED_MESSAGES);
|
|
||||||
if ( (str != null) && (str.trim().length() > 0) ) {
|
|
||||||
try {
|
|
||||||
int max = Integer.parseInt(str);
|
|
||||||
_maxQueuedMessages = max;
|
|
||||||
return;
|
|
||||||
} catch (NumberFormatException nfe) {
|
|
||||||
if (_log.shouldLog(Log.WARN))
|
|
||||||
_log.warn("Invalid max queued messages [" + str + "]");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_maxQueuedMessages = DEFAULT_MAX_QUEUED_MESSAGES;
|
|
||||||
}
|
|
||||||
|
|
||||||
public RouterIdentity getRemoteRouterIdentity() { return _remoteIdentity; }
|
public RouterIdentity getRemoteRouterIdentity() { return _remoteIdentity; }
|
||||||
int getId() { return _id; }
|
int getId() { return _id; }
|
||||||
int getPendingMessageCount() {
|
int getPendingMessageCount() {
|
||||||
@ -265,20 +245,28 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
|
|||||||
long beforeAdd = _context.clock().now();
|
long beforeAdd = _context.clock().now();
|
||||||
StringBuffer pending = new StringBuffer(64);
|
StringBuffer pending = new StringBuffer(64);
|
||||||
synchronized (_toBeSent) {
|
synchronized (_toBeSent) {
|
||||||
if ( (_maxQueuedMessages > 0) && (_toBeSent.size() >= _maxQueuedMessages) ) {
|
for (int i = 0; i < _toBeSent.size(); i++) {
|
||||||
|
OutNetMessage cur = (OutNetMessage)_toBeSent.get(i);
|
||||||
|
if (cur.getExpiration() < beforeAdd) {
|
||||||
fail = true;
|
fail = true;
|
||||||
} else {
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!fail) {
|
||||||
_toBeSent.add(msg);
|
_toBeSent.add(msg);
|
||||||
// the ConnectionRunner.processSlice does a wait() until we have messages
|
|
||||||
}
|
}
|
||||||
totalPending = _toBeSent.size();
|
totalPending = _toBeSent.size();
|
||||||
pending.append(totalPending).append(": ");
|
pending.append(totalPending).append(": ");
|
||||||
|
if (fail) {
|
||||||
for (int i = 0; i < totalPending; i++) {
|
for (int i = 0; i < totalPending; i++) {
|
||||||
OutNetMessage cur = (OutNetMessage)_toBeSent.get(i);
|
OutNetMessage cur = (OutNetMessage)_toBeSent.get(i);
|
||||||
pending.append(cur.getMessageSize()).append(" byte ");
|
pending.append(cur.getMessageSize()).append(" byte ");
|
||||||
pending.append(cur.getMessageType()).append(" message added");
|
pending.append(cur.getMessageType()).append(" message added");
|
||||||
pending.append(" added ").append(cur.getLifetime()).append(" ms ago, ");
|
pending.append(" added ").append(cur.getLifetime()).append(" ms ago, ");
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// the ConnectionRunner.processSlice does a wait() until we have messages
|
||||||
_toBeSent.notifyAll();
|
_toBeSent.notifyAll();
|
||||||
}
|
}
|
||||||
long afterAdd = _context.clock().now();
|
long afterAdd = _context.clock().now();
|
||||||
@ -287,12 +275,12 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
|
|||||||
|
|
||||||
if (fail) {
|
if (fail) {
|
||||||
if (_log.shouldLog(Log.ERROR))
|
if (_log.shouldLog(Log.ERROR))
|
||||||
_log.error("too many queued messages to " + _remoteIdentity.getHash().toBase64() + ": " + pending.toString());
|
_log.error("messages expired on the queue to " + _remoteIdentity.getHash().toBase64() + ": " + pending.toString());
|
||||||
|
|
||||||
// do we really want to give them a comm error because they're so.damn.slow reading their stream?
|
// do we really want to give them a comm error because they're so.damn.slow reading their stream?
|
||||||
_context.profileManager().commErrorOccurred(_remoteIdentity.getHash());
|
_context.profileManager().commErrorOccurred(_remoteIdentity.getHash());
|
||||||
|
|
||||||
msg.timestamp("TCPConnection.addMessage exceeded max queued");
|
msg.timestamp("TCPConnection.addMessage saw an expired queued message");
|
||||||
_transport.afterSend(msg, false);
|
_transport.afterSend(msg, false);
|
||||||
// should we really be closing a connection if they're that slow?
|
// should we really be closing a connection if they're that slow?
|
||||||
// yeah, i think we should.
|
// yeah, i think we should.
|
||||||
|
Reference in New Issue
Block a user