if the send queue to the peer is too large, fail the message but also mark it as a comm error (since either their net con is insanely saturated, or disconnected)
logging
This commit is contained in:
@ -17,7 +17,6 @@ import java.math.BigInteger;
|
|||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import net.i2p.crypto.AESInputStream;
|
import net.i2p.crypto.AESInputStream;
|
||||||
@ -256,14 +255,22 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
|
|||||||
int totalPending = 0;
|
int totalPending = 0;
|
||||||
boolean fail = false;
|
boolean fail = false;
|
||||||
long beforeAdd = _context.clock().now();
|
long beforeAdd = _context.clock().now();
|
||||||
|
StringBuffer pending = new StringBuffer(64);
|
||||||
synchronized (_toBeSent) {
|
synchronized (_toBeSent) {
|
||||||
if ( (_maxQueuedMessages > 0) && (_toBeSent.size() >= _maxQueuedMessages) ) {
|
if ( (_maxQueuedMessages > 0) && (_toBeSent.size() >= _maxQueuedMessages) ) {
|
||||||
fail = true;
|
fail = true;
|
||||||
} else {
|
} else {
|
||||||
_toBeSent.add(msg);
|
_toBeSent.add(msg);
|
||||||
totalPending = _toBeSent.size();
|
|
||||||
// the ConnectionRunner.processSlice does a wait() until we have messages
|
// the ConnectionRunner.processSlice does a wait() until we have messages
|
||||||
}
|
}
|
||||||
|
totalPending = _toBeSent.size();
|
||||||
|
if (fail) {
|
||||||
|
pending.append(totalPending).append(": ");
|
||||||
|
for (int i = 0; i < totalPending; i++) {
|
||||||
|
OutNetMessage cur = (OutNetMessage)_toBeSent.get(i);
|
||||||
|
pending.append(cur.getMessage().getClass().getName()).append(" ");
|
||||||
|
}
|
||||||
|
}
|
||||||
_toBeSent.notifyAll();
|
_toBeSent.notifyAll();
|
||||||
}
|
}
|
||||||
long afterAdd = _context.clock().now();
|
long afterAdd = _context.clock().now();
|
||||||
@ -272,8 +279,11 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
|
|||||||
|
|
||||||
if (fail) {
|
if (fail) {
|
||||||
if (_log.shouldLog(Log.ERROR))
|
if (_log.shouldLog(Log.ERROR))
|
||||||
_log.error("too many queued messages to " + _remoteIdentity.getHash().toBase64() + ": " + totalPending);
|
_log.error("too many queued messages to " + _remoteIdentity.getHash().toBase64() + ": " + pending.toString());
|
||||||
|
|
||||||
|
// do we really want to give them a comm error because they're so.damn.slow reading their stream?
|
||||||
|
_context.profileManager().commErrorOccurred(_remoteIdentity.getHash());
|
||||||
|
|
||||||
msg.timestamp("TCPConnection.addMessage exceeded max queued");
|
msg.timestamp("TCPConnection.addMessage exceeded max queued");
|
||||||
_transport.afterSend(msg, false);
|
_transport.afterSend(msg, false);
|
||||||
return;
|
return;
|
||||||
@ -281,8 +291,9 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
|
|||||||
|
|
||||||
long diff = afterAdd - beforeAdd;
|
long diff = afterAdd - beforeAdd;
|
||||||
if (diff > 500) {
|
if (diff > 500) {
|
||||||
if (_log.shouldLog(Log.ERROR))
|
if (_log.shouldLog(Log.WARN))
|
||||||
_log.error("Lock contention adding a message: " + diff + "ms");
|
_log.warn("Lock contention adding a message: " + diff + "ms to "
|
||||||
|
+ _remoteIdentity.getHash().toBase64() + ": " + totalPending);
|
||||||
}
|
}
|
||||||
|
|
||||||
msg.timestamp("TCPConnection.addMessage after toBeSent.add and notify");
|
msg.timestamp("TCPConnection.addMessage after toBeSent.add and notify");
|
||||||
@ -437,7 +448,7 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
|
|||||||
|
|
||||||
OutNetMessage msg = null;
|
OutNetMessage msg = null;
|
||||||
int remaining = 0;
|
int remaining = 0;
|
||||||
List timedOut = new LinkedList();
|
List timedOut = null;
|
||||||
|
|
||||||
synchronized (_toBeSent) {
|
synchronized (_toBeSent) {
|
||||||
// loop through, dropping expired messages, waiting until a non-expired
|
// loop through, dropping expired messages, waiting until a non-expired
|
||||||
@ -453,21 +464,24 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
|
|||||||
msg = (OutNetMessage)_toBeSent.remove(0);
|
msg = (OutNetMessage)_toBeSent.remove(0);
|
||||||
remaining--;
|
remaining--;
|
||||||
if ( (msg.getExpiration() > 0) && (msg.getExpiration() < start) ) {
|
if ( (msg.getExpiration() > 0) && (msg.getExpiration() < start) ) {
|
||||||
|
if (timedOut == null) timedOut = new ArrayList(4);
|
||||||
timedOut.add(msg);
|
timedOut.add(msg);
|
||||||
msg = null; // keep looking
|
msg = null; // keep looking
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (Iterator iter = timedOut.iterator(); iter.hasNext(); ) {
|
if (timedOut != null) {
|
||||||
OutNetMessage failed = (OutNetMessage)iter.next();
|
for (int i = 0; i < timedOut.size(); i++) {
|
||||||
if (_log.shouldLog(Log.WARN))
|
OutNetMessage failed = (OutNetMessage)timedOut.get(i);
|
||||||
_log.warn("Message timed out while sitting on the TCP Connection's queue! was too slow by: "
|
if (_log.shouldLog(Log.WARN))
|
||||||
+ (start-msg.getExpiration()) + "ms to "
|
_log.warn("Message timed out while sitting on the TCP Connection's queue! was too slow by: "
|
||||||
+ _remoteIdentity.getHash().toBase64() + ": " + msg);
|
+ (start-msg.getExpiration()) + "ms to "
|
||||||
msg.timestamp("TCPConnection.runner.processSlice expired");
|
+ _remoteIdentity.getHash().toBase64() + ": " + msg);
|
||||||
_transport.afterSend(msg, false);
|
msg.timestamp("TCPConnection.runner.processSlice expired");
|
||||||
return true;
|
_transport.afterSend(msg, false);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (remaining > 0) {
|
if (remaining > 0) {
|
||||||
|
Reference in New Issue
Block a user