get rid of the whole slice concept
dont time out for too many messages (just time out individual ones) however, if any of the messages that time out have been there for a minute, kill the con (since its hung) kaffe workaround for fast closing sockets
This commit is contained in:
@ -37,7 +37,7 @@ import net.i2p.util.Log;
|
||||
class RestrictiveTCPConnection extends TCPConnection {
|
||||
private Log _log;
|
||||
|
||||
public RestrictiveTCPConnection(RouterContext context, Socket s, boolean locallyInitiated) {
|
||||
public RestrictiveTCPConnection(RouterContext context, Socket s, boolean locallyInitiated) throws IOException {
|
||||
super(context, s, locallyInitiated);
|
||||
_log = context.logManager().getLog(RestrictiveTCPConnection.class);
|
||||
_context.statManager().createRateStat("tcp.establishConnectionTime", "How long does it take for us to successfully establish a connection (either locally or remotely initiated)?", "TCP Transport", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
|
@ -63,13 +63,12 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
|
||||
protected SessionKey _key;
|
||||
protected ByteArray _extraBytes;
|
||||
protected byte[] _iv;
|
||||
private long _lastSliceRun;
|
||||
private boolean _closed;
|
||||
private boolean _weInitiated;
|
||||
private long _created;
|
||||
protected RouterContext _context;
|
||||
|
||||
public TCPConnection(RouterContext context, Socket s, boolean locallyInitiated) {
|
||||
public TCPConnection(RouterContext context, Socket s, boolean locallyInitiated) throws IOException {
|
||||
_context = context;
|
||||
_log = context.logManager().getLog(TCPConnection.class);
|
||||
_context.statManager().createRateStat("tcp.queueSize", "How many messages were already in the queue when a new message was added?",
|
||||
@ -88,13 +87,16 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
|
||||
}
|
||||
_builder = new DHSessionKeyBuilder();
|
||||
_extraBytes = null;
|
||||
_lastSliceRun = -1;
|
||||
|
||||
// sun keeps the socket's InetAddress around after its been closed, but kaffe (and the rest of classpath)
|
||||
// doesn't, so we've got to check & cache it here if we want to log it later. (kaffe et al are acting per
|
||||
// spec, btw)
|
||||
_remoteHost = s.getInetAddress() + "";
|
||||
_remotePort = s.getPort();
|
||||
try {
|
||||
_remoteHost = s.getInetAddress() + "";
|
||||
_remotePort = s.getPort();
|
||||
} catch (NullPointerException npe) {
|
||||
throw new IOException("kaffe is being picky since the socket closed too fast...");
|
||||
}
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Connected with peer: " + _remoteHost + ":" + _remotePort);
|
||||
}
|
||||
@ -238,18 +240,32 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
|
||||
|
||||
public void setTransport(TCPTransport trans) { _transport = trans; }
|
||||
|
||||
/** dont bitch about expiring messages if they don't even last 60 seconds */
|
||||
private static final long MIN_MESSAGE_LIFETIME_FOR_PENALTY = 60*1000;
|
||||
|
||||
public void addMessage(OutNetMessage msg) {
|
||||
msg.timestamp("TCPConnection.addMessage");
|
||||
int totalPending = 0;
|
||||
boolean fail = false;
|
||||
long beforeAdd = _context.clock().now();
|
||||
StringBuffer pending = new StringBuffer(64);
|
||||
List removed = null;
|
||||
synchronized (_toBeSent) {
|
||||
for (int i = 0; i < _toBeSent.size(); i++) {
|
||||
OutNetMessage cur = (OutNetMessage)_toBeSent.get(i);
|
||||
if (cur.getExpiration() < beforeAdd) {
|
||||
fail = true;
|
||||
break;
|
||||
if (cur.getLifetime() > MIN_MESSAGE_LIFETIME_FOR_PENALTY) {
|
||||
fail = true;
|
||||
break;
|
||||
} else {
|
||||
// yeah, it expired, so drop it, but it wasn't our
|
||||
// fault (since it was almost expired when we got it
|
||||
if (removed == null)
|
||||
removed = new ArrayList(2);
|
||||
removed.add(cur);
|
||||
_toBeSent.remove(i);
|
||||
i--;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!fail) {
|
||||
@ -266,13 +282,24 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
|
||||
}
|
||||
}
|
||||
|
||||
// the ConnectionRunner.processSlice does a wait() until we have messages
|
||||
// the ConnectionRunner.getNext does a wait() until we have messages
|
||||
_toBeSent.notifyAll();
|
||||
}
|
||||
long afterAdd = _context.clock().now();
|
||||
|
||||
_context.statManager().addRateData("tcp.queueSize", totalPending-1, 0);
|
||||
|
||||
if (removed != null) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("messages expired on the queue to " + _remoteIdentity.getHash().toBase64()
|
||||
+ " but they weren't that old: " + removed.size());
|
||||
for (int i = 0; i < removed.size(); i++) {
|
||||
OutNetMessage cur = (OutNetMessage)removed.get(i);
|
||||
msg.timestamp("TCPConnection.addMessage expired but not our fault");
|
||||
_transport.afterSend(cur, false);
|
||||
}
|
||||
}
|
||||
|
||||
if (fail) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("messages expired on the queue to " + _remoteIdentity.getHash().toBase64() + ": " + pending.toString());
|
||||
@ -285,35 +312,23 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
|
||||
// should we really be closing a connection if they're that slow?
|
||||
// yeah, i think we should.
|
||||
closeConnection();
|
||||
return;
|
||||
}
|
||||
|
||||
long diff = afterAdd - beforeAdd;
|
||||
if (diff > 500) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Lock contention adding a message: " + diff + "ms to "
|
||||
+ _remoteIdentity.getHash().toBase64() + ": " + totalPending);
|
||||
}
|
||||
} else {
|
||||
|
||||
msg.timestamp("TCPConnection.addMessage after toBeSent.add and notify");
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Add message with toBeSent.size = " + totalPending + " to " + _remoteIdentity.getHash().toBase64());
|
||||
if (totalPending <= 0) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("WTF, total pending after adding " + msg.getMessage().getClass().getName() + " <= 0! " + msg);
|
||||
}
|
||||
|
||||
if (slicesTooLong()) {
|
||||
msg.timestamp("TCPTransport.addMessage noticed slices were taking too long");
|
||||
|
||||
if (_log.shouldLog(Log.ERROR)) {
|
||||
long sliceTime = _context.clock().now()-_lastSliceRun;
|
||||
_log.error("onAdd: Slices are taking too long (" + sliceTime
|
||||
+ "ms) - perhaps the remote side is disconnected or hung? remote="
|
||||
+ _remoteIdentity.getHash().toBase64() + " pending: " + pending.toString());
|
||||
long diff = afterAdd - beforeAdd;
|
||||
if (diff > 500) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Lock contention adding a message: " + diff + "ms to "
|
||||
+ _remoteIdentity.getHash().toBase64() + ": " + totalPending);
|
||||
}
|
||||
|
||||
msg.timestamp("TCPConnection.addMessage after toBeSent.add and notify");
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Add message with toBeSent.size = " + totalPending + " to " + _remoteIdentity.getHash().toBase64());
|
||||
if (totalPending <= 0) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("WTF, total pending after adding " + msg.getMessage().getClass().getName() + " <= 0! " + msg);
|
||||
}
|
||||
closeConnection();
|
||||
}
|
||||
}
|
||||
|
||||
@ -385,16 +400,6 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("How did we read a message that can't be written to memory...", ioe);
|
||||
}
|
||||
|
||||
if (slicesTooLong()) {
|
||||
if (_log.shouldLog(Log.ERROR)) {
|
||||
long sliceTime = _context.clock().now()-_lastSliceRun;
|
||||
_log.error("onReceive: Slices are taking too long (" + sliceTime
|
||||
+ "ms) - perhaps the remote side is disconnected or hung? peer = "
|
||||
+ _remoteIdentity.getHash().toBase64());
|
||||
}
|
||||
closeConnection();
|
||||
}
|
||||
}
|
||||
|
||||
public void readError(I2NPMessageReader reader, Exception error) {
|
||||
@ -404,183 +409,162 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
|
||||
_log.warn("Error reading from stream to " + _remoteIdentity.getHash().toBase64(), error);
|
||||
}
|
||||
|
||||
/**
|
||||
* if a slice takes 2 minutes, fuck 'im. slices at most send one I2NPMessage,
|
||||
* which can be up to around 32KB currently. Basically a minimum 273bps (slower for
|
||||
* larger messages - perhaps this min-throughput should be implemented on the
|
||||
* output stream as part of the throttling code? hmmm)
|
||||
*/
|
||||
private final static long MAX_SLICE_DURATION = 60*1000;
|
||||
/**
|
||||
* Determine if the connection runner is hanging while running its slices. This can
|
||||
* occur if there's a broken TCP connection that hasn't timed out yet (3 minutes later..)
|
||||
* or if the other side's router is just b0rked and isn't .read()ing from its socket anymore.
|
||||
* In either case, if this is true then the connection should be closed. Given the new threading / slice
|
||||
* model, a slice that doesn't do anything will take 30-ish seconds (all in .wait())
|
||||
*
|
||||
*/
|
||||
private boolean slicesTooLong() {
|
||||
if (_lastSliceRun <= 0) return false;
|
||||
synchronized (_toBeSent) {
|
||||
// if there's nothing pending, dont worry about it
|
||||
if (_toBeSent.size() <= 0)
|
||||
return false;
|
||||
}
|
||||
long diff = _context.clock().now() - _lastSliceRun;
|
||||
boolean tooLong = diff > MAX_SLICE_DURATION;
|
||||
if (tooLong)
|
||||
_log.warn("Slices are taking " + diff + "ms");
|
||||
return (diff > MAX_SLICE_DURATION);
|
||||
}
|
||||
|
||||
class ConnectionRunner implements Runnable {
|
||||
private boolean _running;
|
||||
public void run() {
|
||||
_running = true;
|
||||
while (_running) {
|
||||
long startSlice = _context.clock().now();
|
||||
_lastSliceRun = startSlice;
|
||||
boolean processOk = processSlice();
|
||||
if (!processOk) {
|
||||
closeConnection();
|
||||
return;
|
||||
OutNetMessage nextMessage = getNext();
|
||||
if (nextMessage != null) {
|
||||
boolean sent = doSend(nextMessage);
|
||||
if (!sent) {
|
||||
_running = false;
|
||||
}
|
||||
}
|
||||
long endSlice = _context.clock().now();
|
||||
}
|
||||
|
||||
closeConnection();
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a slice (push a message if available).
|
||||
*
|
||||
* @return true if the operation succeeded, false if there was a critical error
|
||||
*/
|
||||
private boolean processSlice() {
|
||||
long start = _context.clock().now();
|
||||
|
||||
|
||||
private OutNetMessage getNext() {
|
||||
OutNetMessage msg = null;
|
||||
int remaining = 0;
|
||||
List timedOut = null;
|
||||
|
||||
synchronized (_toBeSent) {
|
||||
// loop through, dropping expired messages, waiting until a non-expired
|
||||
// one is added, or 10 seconds have passed (catchall in case things bork)
|
||||
while (msg == null) {
|
||||
while (msg == null) {
|
||||
synchronized (_toBeSent) {
|
||||
if (_toBeSent.size() <= 0) {
|
||||
try {
|
||||
_toBeSent.wait(10*1000);
|
||||
_toBeSent.wait();
|
||||
} catch (InterruptedException ie) {}
|
||||
}
|
||||
remaining = _toBeSent.size();
|
||||
if (remaining <= 0) return true;
|
||||
msg = (OutNetMessage)_toBeSent.remove(0);
|
||||
remaining--;
|
||||
if ( (msg.getExpiration() > 0) && (msg.getExpiration() < start) ) {
|
||||
if (timedOut == null) timedOut = new ArrayList(4);
|
||||
timedOut.add(msg);
|
||||
msg = null; // keep looking
|
||||
|
||||
boolean ancientFound = locked_expireOldMessages();
|
||||
if (ancientFound) {
|
||||
_running = false;
|
||||
return null;
|
||||
}
|
||||
|
||||
if (_toBeSent.size() > 0) {
|
||||
msg = (OutNetMessage)_toBeSent.remove(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
return msg;
|
||||
}
|
||||
|
||||
/**
|
||||
* Fail any messages that have expired on the queue
|
||||
*
|
||||
* @return true if any of the messages expired are really really old
|
||||
* (indicating a hung connection)
|
||||
*/
|
||||
private boolean locked_expireOldMessages() {
|
||||
long now = _context.clock().now();
|
||||
List timedOut = null;
|
||||
for (int i = 0; i < _toBeSent.size(); i++) {
|
||||
OutNetMessage cur = (OutNetMessage)_toBeSent.get(i);
|
||||
if (cur.getExpiration() < now) {
|
||||
if (timedOut == null)
|
||||
timedOut = new ArrayList(2);
|
||||
timedOut.add(cur);
|
||||
_toBeSent.remove(i);
|
||||
i--;
|
||||
}
|
||||
}
|
||||
|
||||
boolean reallySlowFound = false;
|
||||
|
||||
if (timedOut != null) {
|
||||
for (int i = 0; i < timedOut.size(); i++) {
|
||||
OutNetMessage failed = (OutNetMessage)timedOut.get(i);
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Message timed out while sitting on the TCP Connection's queue! was too slow by: "
|
||||
+ (start-msg.getExpiration()) + "ms to "
|
||||
+ _remoteIdentity.getHash().toBase64() + ": " + msg);
|
||||
msg.timestamp("TCPConnection.runner.processSlice expired");
|
||||
_transport.afterSend(msg, false);
|
||||
return true;
|
||||
_log.warn("Message " + i + "/" + timedOut.size()
|
||||
+ " timed out while sitting on the TCP Connection's queue! was too slow by: "
|
||||
+ (now-failed.getExpiration()) + "ms to "
|
||||
+ _remoteIdentity.getHash().toBase64() + ": " + failed);
|
||||
failed.timestamp("TCPConnection.runner.locked_expireOldMessages expired");
|
||||
_transport.afterSend(failed, false);
|
||||
if (failed.getLifetime() >= MIN_MESSAGE_LIFETIME_FOR_PENALTY)
|
||||
reallySlowFound = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (remaining > 0) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("After pulling off a pending message, there are still " + remaining +
|
||||
" messages queued up for sending to " + _remoteIdentity.getHash().toBase64());
|
||||
}
|
||||
|
||||
long afterExpire = _context.clock().now();
|
||||
|
||||
if (msg != null) {
|
||||
msg.timestamp("TCPConnection.runner.processSlice fetched");
|
||||
//_log.debug("Processing slice - msg to be sent");
|
||||
|
||||
byte data[] = msg.getMessageData();
|
||||
if (data == null) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("message " + msg.getMessageType() + "/" + msg.getMessageId() + "expired before it could be sent");
|
||||
_transport.afterSend(msg, false, false);
|
||||
return true;
|
||||
}
|
||||
msg.timestamp("TCPConnection.runner.processSlice before sending "
|
||||
+ data.length + " bytes");
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Sending " + data.length + " bytes in the slice... to "
|
||||
+ _remoteIdentity.getHash().toBase64());
|
||||
|
||||
long exp = msg.getMessage().getMessageExpiration().getTime();
|
||||
|
||||
long beforeWrite = 0;
|
||||
try {
|
||||
synchronized (_out) {
|
||||
beforeWrite = _context.clock().now();
|
||||
_out.write(data);
|
||||
_out.flush();
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("IO error writing out a " + data.length + " byte message to "
|
||||
+ _remoteIdentity.getHash().toBase64());
|
||||
return false;
|
||||
}
|
||||
|
||||
long end = _context.clock().now();
|
||||
long timeLeft = exp - end;
|
||||
|
||||
msg.timestamp("TCPConnection.runner.processSlice sent and flushed");
|
||||
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Message " + msg.getMessageType()
|
||||
+ " (expiring in " + timeLeft + "ms) sent to "
|
||||
+ _remoteIdentity.getHash().toBase64() + " from "
|
||||
+ _context.routerHash().toBase64()
|
||||
+ " over connection " + _id + " with " + data.length
|
||||
+ " bytes in " + (end - afterExpire) + "ms (write took "
|
||||
+ (end - beforeWrite) + "ms, prepare took "
|
||||
+ (beforeWrite - afterExpire) + "ms)");
|
||||
if (timeLeft < 2*1000) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.warn("Very little time left... time to send [" + (end-start)
|
||||
+ "] time left [" + timeLeft + "] to "
|
||||
+ _remoteIdentity.getHash().toBase64() + "\n" + msg.toString(),
|
||||
msg.getCreatedBy());
|
||||
}
|
||||
|
||||
long lifetime = msg.getLifetime();
|
||||
if (lifetime > 10*1000) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("The processing of the message took way too long (" + lifetime
|
||||
+ "ms) - time to send (" + (end-start) + "), time left (" + timeLeft
|
||||
+ ") to " + _remoteIdentity.getHash().toBase64() + "\n" + msg.toString());
|
||||
}
|
||||
_transport.afterSend(msg, true);
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Processing slice - message sent completely: "
|
||||
+ msg.getMessageSize() + " byte " + msg.getMessageType() + " message to "
|
||||
+ _remoteIdentity.getHash().toBase64());
|
||||
if (end - afterExpire > 1000) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Actual sending took too long ( " + (end-afterExpire)
|
||||
+ "ms) sending " + data.length + " bytes to "
|
||||
+ _remoteIdentity.getHash().toBase64());
|
||||
}
|
||||
}
|
||||
return true;
|
||||
return reallySlowFound;
|
||||
}
|
||||
|
||||
/**
|
||||
* send the message
|
||||
*
|
||||
* @return true if the message was sent ok, false if the connection b0rked
|
||||
*/
|
||||
private boolean doSend(OutNetMessage msg) {
|
||||
msg.timestamp("TCPConnection.runner.doSend fetched");
|
||||
long afterExpire = _context.clock().now();
|
||||
|
||||
byte data[] = msg.getMessageData();
|
||||
if (data == null) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("message " + msg.getMessageType() + "/" + msg.getMessageId()
|
||||
+ " expired before it could be sent");
|
||||
_transport.afterSend(msg, false, false);
|
||||
return true;
|
||||
}
|
||||
msg.timestamp("TCPConnection.runner.doSend before sending "
|
||||
+ data.length + " bytes");
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Sending " + data.length + " bytes to "
|
||||
+ _remoteIdentity.getHash().toBase64());
|
||||
|
||||
long exp = msg.getMessage().getMessageExpiration().getTime();
|
||||
|
||||
long beforeWrite = 0;
|
||||
try {
|
||||
synchronized (_out) {
|
||||
beforeWrite = _context.clock().now();
|
||||
_out.write(data);
|
||||
_out.flush();
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("IO error writing out a " + data.length + " byte message to "
|
||||
+ _remoteIdentity.getHash().toBase64());
|
||||
return false;
|
||||
}
|
||||
|
||||
long end = _context.clock().now();
|
||||
long timeLeft = exp - end;
|
||||
|
||||
msg.timestamp("TCPConnection.runner.doSend sent and flushed");
|
||||
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Message " + msg.getMessageType()
|
||||
+ " (expiring in " + timeLeft + "ms) sent to "
|
||||
+ _remoteIdentity.getHash().toBase64() + " from "
|
||||
+ _context.routerHash().toBase64()
|
||||
+ " over connection " + _id + " with " + data.length
|
||||
+ " bytes in " + (end - afterExpire) + "ms (write took "
|
||||
+ (end - beforeWrite) + "ms, prepare took "
|
||||
+ (beforeWrite - afterExpire) + "ms)");
|
||||
|
||||
long lifetime = msg.getLifetime();
|
||||
if (lifetime > 10*1000) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("The processing of the message took way too long (" + lifetime
|
||||
+ "ms) - time left (" + timeLeft + ") to "
|
||||
+ _remoteIdentity.getHash().toBase64() + "\n" + msg.toString());
|
||||
}
|
||||
_transport.afterSend(msg, true);
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("doSend - message sent completely: "
|
||||
+ msg.getMessageSize() + " byte " + msg.getMessageType() + " message to "
|
||||
+ _remoteIdentity.getHash().toBase64());
|
||||
if (end - afterExpire > 1000) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Actual sending took too long ( " + (end-afterExpire)
|
||||
+ "ms) sending " + data.length + " bytes to "
|
||||
+ _remoteIdentity.getHash().toBase64());
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public void stopRunning() {
|
||||
_running = false;
|
||||
// stop the wait(...)
|
||||
|
Reference in New Issue
Block a user