logging, formatting

This commit is contained in:
jrandom
2004-04-22 03:09:03 +00:00
committed by zzz
parent 4fe7105e2f
commit afeecdf4af

View File

@ -79,311 +79,325 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
private final static int DEFAULT_MAX_QUEUED_MESSAGES = 20; private final static int DEFAULT_MAX_QUEUED_MESSAGES = 20;
static { static {
StatManager.getInstance().createRateStat("tcp.queueSize", "How many messages were already in the queue when a new message was added?", "TCP Transport", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); StatManager.getInstance().createRateStat("tcp.queueSize", "How many messages were already in the queue when a new message was added?",
"TCP Transport", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
} }
public TCPConnection(Socket s, RouterIdentity myIdent, SigningPrivateKey signingKey, boolean locallyInitiated) { public TCPConnection(Socket s, RouterIdentity myIdent, SigningPrivateKey signingKey, boolean locallyInitiated) {
_id = ++_idCounter; _id = ++_idCounter;
_weInitiated = locallyInitiated; _weInitiated = locallyInitiated;
_closed = false; _closed = false;
_socket = s; _socket = s;
_myIdentity = myIdent; _myIdentity = myIdent;
_signingKey = signingKey; _signingKey = signingKey;
_created = -1; _created = -1;
_toBeSent = new ArrayList(); _toBeSent = new ArrayList();
try { try {
_in = _socket.getInputStream(); _in = _socket.getInputStream();
_out = _socket.getOutputStream(); _out = _socket.getOutputStream();
} catch (IOException ioe) { } catch (IOException ioe) {
_log.error("Error getting streams for the connection", ioe); _log.error("Error getting streams for the connection", ioe);
} }
_builder = new DHSessionKeyBuilder(); _builder = new DHSessionKeyBuilder();
_extraBytes = null; _extraBytes = null;
_lastSliceRun = -1; _lastSliceRun = -1;
if (_log.shouldLog(Log.INFO))
_log.info("Connected with peer: " + s.getInetAddress() + ":" + s.getPort()); _log.info("Connected with peer: " + s.getInetAddress() + ":" + s.getPort());
updateMaxQueuedMessages(); updateMaxQueuedMessages();
} }
/** how long has this connection been around for, or -1 if it isn't established yet */ /** how long has this connection been around for, or -1 if it isn't established yet */
public long getLifetime() { public long getLifetime() {
if (_created > 0) if (_created > 0)
return Clock.getInstance().now() - _created; return Clock.getInstance().now() - _created;
else else
return -1; return -1;
} }
protected boolean weInitiatedConnection() { return _weInitiated; } protected boolean weInitiatedConnection() { return _weInitiated; }
private void updateMaxQueuedMessages() { private void updateMaxQueuedMessages() {
String str = Router.getInstance().getConfigSetting(PARAM_MAX_QUEUED_MESSAGES); String str = Router.getInstance().getConfigSetting(PARAM_MAX_QUEUED_MESSAGES);
if ( (str != null) && (str.trim().length() > 0) ) { if ( (str != null) && (str.trim().length() > 0) ) {
try { try {
int max = Integer.parseInt(str); int max = Integer.parseInt(str);
_maxQueuedMessages = max; _maxQueuedMessages = max;
return; return;
} catch (NumberFormatException nfe) { } catch (NumberFormatException nfe) {
_log.warn("Invalid max queued messages [" + str + "]"); if (_log.shouldLog(Log.WARN))
} _log.warn("Invalid max queued messages [" + str + "]");
} }
_maxQueuedMessages = DEFAULT_MAX_QUEUED_MESSAGES; }
_maxQueuedMessages = DEFAULT_MAX_QUEUED_MESSAGES;
} }
public RouterIdentity getRemoteRouterIdentity() { return _remoteIdentity; } public RouterIdentity getRemoteRouterIdentity() { return _remoteIdentity; }
int getId() { return _id; } int getId() { return _id; }
int getPendingMessageCount() { int getPendingMessageCount() {
synchronized (_toBeSent) { synchronized (_toBeSent) {
return _toBeSent.size(); return _toBeSent.size();
} }
} }
protected void exchangeKey() throws IOException, DataFormatException { protected void exchangeKey() throws IOException, DataFormatException {
BigInteger myPub = _builder.getMyPublicValue(); BigInteger myPub = _builder.getMyPublicValue();
byte myPubBytes[] = myPub.toByteArray(); byte myPubBytes[] = myPub.toByteArray();
DataHelper.writeLong(_out, 2, myPubBytes.length); DataHelper.writeLong(_out, 2, myPubBytes.length);
_out.write(myPubBytes); _out.write(myPubBytes);
int rlen = (int)DataHelper.readLong(_in, 2); int rlen = (int)DataHelper.readLong(_in, 2);
byte peerPubBytes[] = new byte[rlen]; byte peerPubBytes[] = new byte[rlen];
int read = DataHelper.read(_in, peerPubBytes); int read = DataHelper.read(_in, peerPubBytes);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("rlen: " + rlen + " peerBytes: " + DataHelper.toString(peerPubBytes) + " read: " + read); _log.debug("rlen: " + rlen + " peerBytes: " + DataHelper.toString(peerPubBytes) + " read: " + read);
BigInteger peerPub = new BigInteger(1, peerPubBytes); BigInteger peerPub = new BigInteger(1, peerPubBytes);
_builder.setPeerPublicValue(peerPub); _builder.setPeerPublicValue(peerPub);
_key = _builder.getSessionKey(); _key = _builder.getSessionKey();
_extraBytes = _builder.getExtraBytes(); _extraBytes = _builder.getExtraBytes();
_iv = new byte[16]; _iv = new byte[16];
System.arraycopy(_extraBytes.getData(), 0, _iv, 0, Math.min(_extraBytes.getData().length, _iv.length)); System.arraycopy(_extraBytes.getData(), 0, _iv, 0, Math.min(_extraBytes.getData().length, _iv.length));
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Session key: " + _key.toBase64() + " extra bytes: " + _extraBytes.getData().length); _log.debug("Session key: " + _key.toBase64() + " extra bytes: " + _extraBytes.getData().length);
} }
protected boolean identifyStationToStation() throws IOException, DataFormatException { protected boolean identifyStationToStation() throws IOException, DataFormatException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(512); ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
_myIdentity.writeBytes(baos); _myIdentity.writeBytes(baos);
Hash keyHash = SHA256Generator.getInstance().calculateHash(_key.getData()); Hash keyHash = SHA256Generator.getInstance().calculateHash(_key.getData());
keyHash.writeBytes(baos); keyHash.writeBytes(baos);
Signature sig = DSAEngine.getInstance().sign(baos.toByteArray(), _signingKey); Signature sig = DSAEngine.getInstance().sign(baos.toByteArray(), _signingKey);
sig.writeBytes(baos); sig.writeBytes(baos);
byte encr[] = AESEngine.getInstance().safeEncrypt(baos.toByteArray(), _key, _iv, 1024); byte encr[] = AESEngine.getInstance().safeEncrypt(baos.toByteArray(), _key, _iv, 1024);
DataHelper.writeLong(_out, 2, encr.length); DataHelper.writeLong(_out, 2, encr.length);
_out.write(encr); _out.write(encr);
// we've identified ourselves, now read who they are // we've identified ourselves, now read who they are
int rlen = (int)DataHelper.readLong(_in, 2);
byte pencr[] = new byte[rlen];
int read = DataHelper.read(_in, pencr);
if (read != rlen)
throw new DataFormatException("Not enough data in peer ident");
byte decr[] = AESEngine.getInstance().safeDecrypt(pencr, _key, _iv);
if (decr == null)
throw new DataFormatException("Unable to decrypt - failed exchange?");
int rlen = (int)DataHelper.readLong(_in, 2); ByteArrayInputStream bais = new ByteArrayInputStream(decr);
byte pencr[] = new byte[rlen]; _remoteIdentity = new RouterIdentity();
int read = DataHelper.read(_in, pencr); _remoteIdentity.readBytes(bais);
if (read != rlen) Hash peerKeyHash = new Hash();
throw new DataFormatException("Not enough data in peer ident"); peerKeyHash.readBytes(bais);
byte decr[] = AESEngine.getInstance().safeDecrypt(pencr, _key, _iv);
if (decr == null)
throw new DataFormatException("Unable to decrypt - failed exchange?");
ByteArrayInputStream bais = new ByteArrayInputStream(decr); if (!peerKeyHash.equals(keyHash)) {
_remoteIdentity = new RouterIdentity(); if (_log.shouldLog(Log.ERROR))
_remoteIdentity.readBytes(bais); _log.error("Peer tried to spoof!");
Hash peerKeyHash = new Hash(); return false;
peerKeyHash.readBytes(bais); }
if (!peerKeyHash.equals(keyHash)) { Signature rsig = new Signature();
if (_log.shouldLog(Log.ERROR)) rsig.readBytes(bais);
_log.error("Peer tried to spoof!"); byte signedData[] = new byte[decr.length - rsig.getData().length];
return false; System.arraycopy(decr, 0, signedData, 0, signedData.length);
} return DSAEngine.getInstance().verifySignature(rsig, signedData, _remoteIdentity.getSigningPublicKey());
Signature rsig = new Signature();
rsig.readBytes(bais);
byte signedData[] = new byte[decr.length - rsig.getData().length];
System.arraycopy(decr, 0, signedData, 0, signedData.length);
return DSAEngine.getInstance().verifySignature(rsig, signedData, _remoteIdentity.getSigningPublicKey());
} }
protected final static int ESTABLISHMENT_TIMEOUT = 10*1000; // 10 second lag (not necessarily for the entire establish) protected final static int ESTABLISHMENT_TIMEOUT = 10*1000; // 10 second lag (not necessarily for the entire establish)
public RouterIdentity establishConnection() { public RouterIdentity establishConnection() {
BigInteger myPub = _builder.getMyPublicValue(); BigInteger myPub = _builder.getMyPublicValue();
try { try {
_socket.setSoTimeout(ESTABLISHMENT_TIMEOUT); _socket.setSoTimeout(ESTABLISHMENT_TIMEOUT);
exchangeKey(); exchangeKey();
// key exchanged. now say who we are and prove it // key exchanged. now say who we are and prove it
boolean ok = identifyStationToStation(); boolean ok = identifyStationToStation();
if (!ok) if (!ok)
throw new DataFormatException("Station to station identification failed! MITM?"); throw new DataFormatException("Station to station identification failed! MITM?");
else { else {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("TCP connection " + _id + " established with " + _remoteIdentity.getHash().toBase64()); _log.info("TCP connection " + _id + " established with "
_in = new AESInputStream(new BandwidthLimitedInputStream(_in, _remoteIdentity), _key, _iv); + _remoteIdentity.getHash().toBase64());
_out = new AESOutputStream(new BandwidthLimitedOutputStream(_out, _remoteIdentity), _key, _iv); _in = new AESInputStream(new BandwidthLimitedInputStream(_in, _remoteIdentity), _key, _iv);
_socket.setSoTimeout(0); _out = new AESOutputStream(new BandwidthLimitedOutputStream(_out, _remoteIdentity), _key, _iv);
established(); _socket.setSoTimeout(0);
return _remoteIdentity; established();
} return _remoteIdentity;
} catch (IOException ioe) { }
if (_log.shouldLog(Log.ERROR)) } catch (IOException ioe) {
_log.error("Error establishing connection", ioe); if (_log.shouldLog(Log.ERROR))
closeConnection(); _log.error("Error establishing connection", ioe);
return null; closeConnection();
} catch (DataFormatException dfe) { return null;
if (_log.shouldLog(Log.ERROR)) } catch (DataFormatException dfe) {
_log.error("Error establishing connection", dfe); if (_log.shouldLog(Log.ERROR))
closeConnection(); _log.error("Error establishing connection", dfe);
return null; closeConnection();
} catch (Throwable t) { return null;
if (_log.shouldLog(Log.ERROR)) } catch (Throwable t) {
_log.error("jrandom is paranoid so we're catching it all during establishConnection", t); if (_log.shouldLog(Log.ERROR))
closeConnection(); _log.error("jrandom is paranoid so we're catching it all during establishConnection", t);
return null; closeConnection();
} return null;
}
} }
protected void established() { _created = Clock.getInstance().now(); } protected void established() { _created = Clock.getInstance().now(); }
public void runConnection() { public void runConnection() {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Run connection"); log.debug("Run connection");
_runner = new ConnectionRunner(); _runner = new ConnectionRunner();
Thread t = new I2PThread(_runner); Thread t = new I2PThread(_runner);
t.setName("Run Conn [" + _id + "]"); t.setName("Run Conn [" + _id + "]");
t.setDaemon(true); t.setDaemon(true);
t.start(); t.start();
_reader = new I2NPMessageReader(_in, this, "TCP Read [" + _id + "]"); _reader = new I2NPMessageReader(_in, this, "TCP Read [" + _id + "]");
_reader.startReading(); _reader.startReading();
} }
public void setTransport(TCPTransport trans) { _transport = trans; } public void setTransport(TCPTransport trans) { _transport = trans; }
public void addMessage(OutNetMessage msg) { public void addMessage(OutNetMessage msg) {
msg.timestamp("TCPConnection.addMessage"); msg.timestamp("TCPConnection.addMessage");
int totalPending = 0; int totalPending = 0;
boolean fail = false; boolean fail = false;
long beforeAdd = Clock.getInstance().now(); long beforeAdd = Clock.getInstance().now();
synchronized (_toBeSent) { synchronized (_toBeSent) {
if ( (_maxQueuedMessages > 0) && (_toBeSent.size() >= _maxQueuedMessages) ) { if ( (_maxQueuedMessages > 0) && (_toBeSent.size() >= _maxQueuedMessages) ) {
fail = true; fail = true;
} else { } else {
_toBeSent.add(msg); _toBeSent.add(msg);
totalPending = _toBeSent.size(); totalPending = _toBeSent.size();
// the ConnectionRunner.processSlice does a wait() until we have messages // the ConnectionRunner.processSlice does a wait() until we have messages
} }
_toBeSent.notifyAll(); _toBeSent.notifyAll();
} }
long afterAdd = Clock.getInstance().now(); long afterAdd = Clock.getInstance().now();
StatManager.getInstance().addRateData("tcp.queueSize", totalPending-1, 0); StatManager.getInstance().addRateData("tcp.queueSize", totalPending-1, 0);
if (fail) { if (fail) {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error("too many queued messages"); _log.error("too many queued messages to " + _remoteIdentity.getHash().toBase64());
msg.timestamp("TCPConnection.addMessage exceeded max queued"); msg.timestamp("TCPConnection.addMessage exceeded max queued");
_transport.afterSend(msg, false); _transport.afterSend(msg, false);
return; return;
} }
long diff = afterAdd - beforeAdd; long diff = afterAdd - beforeAdd;
if (diff > 500) { if (diff > 500) {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error("Lock contention adding a message: " + diff + "ms"); _log.error("Lock contention adding a message: " + diff + "ms");
} }
msg.timestamp("TCPConnection.addMessage after toBeSent.add and notify"); msg.timestamp("TCPConnection.addMessage after toBeSent.add and notify");
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Add message with toBeSent.size = " + totalPending + " to " + _remoteIdentity.getHash().toBase64()); _log.debug("Add message with toBeSent.size = " + totalPending + " to " + _remoteIdentity.getHash().toBase64());
if (totalPending <= 0) { if (totalPending <= 0) {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error("WTF, total pending after adding " + msg.getMessage().getClass().getName() + " <= 0! " + msg); _log.error("WTF, total pending after adding " + msg.getMessage().getClass().getName() + " <= 0! " + msg);
} }
if (slicesTooLong()) { if (slicesTooLong()) {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR)) {
_log.error("onAdd: Slices are taking too long (" + (Clock.getInstance().now()-_lastSliceRun) +"ms) - perhaps the remote side is disconnected or hung? remote=" + _remoteIdentity.getHash().toBase64()); long sliceTime = Clock.getInstance().now()-_lastSliceRun;
closeConnection(); _log.error("onAdd: Slices are taking too long (" + sliceTime
} + "ms) - perhaps the remote side is disconnected or hung? remote="
+ _remoteIdentity.getHash().toBase64());
}
closeConnection();
}
} }
void closeConnection() { void closeConnection() {
if (_closed) return; if (_closed) return;
_closed = true; _closed = true;
if (_remoteIdentity != null) { if (_remoteIdentity != null) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Closing the connection to " + _remoteIdentity.getHash().toBase64(), new Exception("Closed by")); _log.warn("Closing the connection to " + _remoteIdentity.getHash().toBase64(),
} else { new Exception("Closed by"));
if (_socket != null) { } else {
if (_log.shouldLog(Log.WARN)) if (_socket != null) {
_log.warn("Closing the unestablished connection with " + _socket.getInetAddress().toString() + ":" + _socket.getPort(), new Exception("Closed by")); if (_log.shouldLog(Log.WARN))
} else { _log.warn("Closing the unestablished connection with "
if (_log.shouldLog(Log.WARN)) + _socket.getInetAddress().toString() + ":"
_log.warn("Closing the unestablished connection", new Exception("Closed by")); + _socket.getPort(), new Exception("Closed by"));
} } else {
} if (_log.shouldLog(Log.WARN))
if (_reader != null) _reader.stopReading(); _log.warn("Closing the unestablished connection", new Exception("Closed by"));
if (_runner != null) _runner.stopRunning(); }
if (_in != null) try { _in.close(); } catch (IOException ioe) {} }
if (_out != null) try { _out.close(); } catch (IOException ioe) {} if (_reader != null) _reader.stopReading();
if (_socket != null) try { _socket.close(); } catch (IOException ioe) {} if (_runner != null) _runner.stopRunning();
if (_toBeSent != null) { if (_in != null) try { _in.close(); } catch (IOException ioe) {}
long now = Clock.getInstance().now(); if (_out != null) try { _out.close(); } catch (IOException ioe) {}
synchronized (_toBeSent) { if (_socket != null) try { _socket.close(); } catch (IOException ioe) {}
for (Iterator iter = _toBeSent.iterator(); iter.hasNext(); ) { if (_toBeSent != null) {
OutNetMessage msg = (OutNetMessage)iter.next(); long now = Clock.getInstance().now();
msg.timestamp("TCPTransport.closeConnection caused fail"); synchronized (_toBeSent) {
if (_log.shouldLog(Log.WARN)) for (Iterator iter = _toBeSent.iterator(); iter.hasNext(); ) {
_log.warn("Connection closed while the message was sitting on the TCP Connection's queue! too slow by: " + (now-msg.getExpiration()) + "ms: " + msg); OutNetMessage msg = (OutNetMessage)iter.next();
_transport.afterSend(msg, false); msg.timestamp("TCPTransport.closeConnection caused fail");
} if (_log.shouldLog(Log.WARN))
_toBeSent.clear(); _log.warn("Connection closed while the message was sitting on the TCP Connection's queue! too slow by: "
} + (now-msg.getExpiration()) + "ms: " + msg);
} _transport.afterSend(msg, false);
_transport.connectionClosed(this); }
_toBeSent.clear();
}
}
_transport.connectionClosed(this);
} }
List getPendingMessages() { return _toBeSent; } List getPendingMessages() { return _toBeSent; }
public void disconnected(I2NPMessageReader reader) { public void disconnected(I2NPMessageReader reader) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Remote disconnected: " + _remoteIdentity.getHash().toBase64()); _log.warn("Remote disconnected: " + _remoteIdentity.getHash().toBase64());
closeConnection(); closeConnection();
} }
public void messageReceived(I2NPMessageReader reader, I2NPMessage message, long msToReceive) { public void messageReceived(I2NPMessageReader reader, I2NPMessage message, long msToReceive) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Message received from " + _remoteIdentity.getHash().toBase64()); _log.debug("Message received from " + _remoteIdentity.getHash().toBase64());
try { try {
ByteArrayOutputStream baos = new ByteArrayOutputStream(32*1024); ByteArrayOutputStream baos = new ByteArrayOutputStream(32*1024);
message.writeBytes(baos); message.writeBytes(baos);
int size = baos.size(); int size = baos.size();
// this is called by the I2NPMessageReader's thread, so it delays the reading from this peer only // this is called by the I2NPMessageReader's thread, so it delays the reading from this peer only
//_log.debug("Delaying inbound for size " + size); //_log.debug("Delaying inbound for size " + size);
//BandwidthLimiter.getInstance().delayInbound(_remoteIdentity, size); //BandwidthLimiter.getInstance().delayInbound(_remoteIdentity, size);
_transport.messageReceived(message, _remoteIdentity, null, msToReceive, size); _transport.messageReceived(message, _remoteIdentity, null, msToReceive, size);
} catch (DataFormatException dfe) { } catch (DataFormatException dfe) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("How did we read a message that is poorly formatted...", dfe); _log.warn("How did we read a message that is poorly formatted...", dfe);
} catch (IOException ioe) { } catch (IOException ioe) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("How did we read a message that can't be written to memory...", ioe); _log.warn("How did we read a message that can't be written to memory...", ioe);
} }
if (slicesTooLong()) { if (slicesTooLong()) {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR)) {
_log.error("onReceive: Slices are taking too long (" + (Clock.getInstance().now()-_lastSliceRun) +"ms) - perhaps the remote side is disconnected or hung? peer = " + _remoteIdentity.getHash().toBase64()); long sliceTime = Clock.getInstance().now()-_lastSliceRun;
closeConnection(); _log.error("onReceive: Slices are taking too long (" + sliceTime
} + "ms) - perhaps the remote side is disconnected or hung? peer = "
+ _remoteIdentity.getHash().toBase64());
}
closeConnection();
}
} }
public void readError(I2NPMessageReader reader, Exception error) { public void readError(I2NPMessageReader reader, Exception error) {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error("Error reading from stream to " + _remoteIdentity.getHash().toBase64()); _log.error("Error reading from stream to " + _remoteIdentity.getHash().toBase64());
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Error reading from stream to " + _remoteIdentity.getHash().toBase64(), error); _log.debug("Error reading from stream to " + _remoteIdentity.getHash().toBase64(), error);
} }
/** /**
@ -402,132 +416,148 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
* *
*/ */
private boolean slicesTooLong() { private boolean slicesTooLong() {
if (_lastSliceRun <= 0) return false; if (_lastSliceRun <= 0) return false;
long diff = Clock.getInstance().now() - _lastSliceRun; long diff = Clock.getInstance().now() - _lastSliceRun;
return (diff > MAX_SLICE_DURATION); return (diff > MAX_SLICE_DURATION);
} }
class ConnectionRunner implements Runnable { class ConnectionRunner implements Runnable {
private boolean _running; private boolean _running;
public void run() { public void run() {
_running = true; _running = true;
try { try {
while (_running) { while (_running) {
long startSlice = Clock.getInstance().now(); long startSlice = Clock.getInstance().now();
_lastSliceRun = startSlice; _lastSliceRun = startSlice;
processSlice(); processSlice();
long endSlice = Clock.getInstance().now(); long endSlice = Clock.getInstance().now();
} }
} catch (IOException ioe) { } catch (IOException ioe) {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error("Connection runner failed with an IO exception to " + _remoteIdentity.getHash().toBase64(), ioe); _log.error("Connection runner failed with an IO exception to "
closeConnection(); + _remoteIdentity.getHash().toBase64(), ioe);
} catch (Throwable t) { closeConnection();
if (_log.shouldLog(Log.ERROR)) } catch (Throwable t) {
_log.error("Somehow we ran into an uncaught exception running the connection!", t); if (_log.shouldLog(Log.ERROR))
closeConnection(); _log.error("Somehow we ran into an uncaught exception running the connection!", t);
} closeConnection();
} }
}
private void processSlice() throws IOException { private void processSlice() throws IOException {
long start = Clock.getInstance().now(); long start = Clock.getInstance().now();
OutNetMessage msg = null; OutNetMessage msg = null;
int remaining = 0; int remaining = 0;
List timedOut = new LinkedList(); List timedOut = new LinkedList();
synchronized (_toBeSent) { synchronized (_toBeSent) {
// loop through, dropping expired messages, waiting until a non-expired // loop through, dropping expired messages, waiting until a non-expired
// one is added, or 30 seconds have passed (catchall in case things bork) // one is added, or 30 seconds have passed (catchall in case things bork)
while (msg == null) { while (msg == null) {
if (_toBeSent.size() <= 0) { if (_toBeSent.size() <= 0) {
try { try {
_toBeSent.wait(30*1000); _toBeSent.wait(30*1000);
} catch (InterruptedException ie) {} } catch (InterruptedException ie) {}
} }
remaining = _toBeSent.size(); remaining = _toBeSent.size();
if (remaining <= 0) return; if (remaining <= 0) return;
msg = (OutNetMessage)_toBeSent.remove(0); msg = (OutNetMessage)_toBeSent.remove(0);
remaining--; remaining--;
if ( (msg.getExpiration() > 0) && (msg.getExpiration() < start) ) { if ( (msg.getExpiration() > 0) && (msg.getExpiration() < start) ) {
timedOut.add(msg); timedOut.add(msg);
msg = null; // keep looking msg = null; // keep looking
} }
} }
} }
for (Iterator iter = timedOut.iterator(); iter.hasNext(); ) { for (Iterator iter = timedOut.iterator(); iter.hasNext(); ) {
OutNetMessage failed = (OutNetMessage)iter.next(); OutNetMessage failed = (OutNetMessage)iter.next();
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Message timed out while sitting on the TCP Connection's queue! was too slow by: " + (start-msg.getExpiration()) + "ms to " + _remoteIdentity.getHash().toBase64() + ": " + msg); _log.warn("Message timed out while sitting on the TCP Connection's queue! was too slow by: "
msg.timestamp("TCPConnection.runner.processSlice expired"); + (start-msg.getExpiration()) + "ms to "
_transport.afterSend(msg, false); + _remoteIdentity.getHash().toBase64() + ": " + msg);
return; msg.timestamp("TCPConnection.runner.processSlice expired");
} _transport.afterSend(msg, false);
return;
}
if (remaining > 0) { if (remaining > 0) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("After pulling off a pending message, there are still " + remaining + _log.info("After pulling off a pending message, there are still " + remaining +
" messages queued up for sending to " + _remoteIdentity.getHash().toBase64()); " messages queued up for sending to " + _remoteIdentity.getHash().toBase64());
} }
long afterExpire = Clock.getInstance().now(); long afterExpire = Clock.getInstance().now();
if (msg != null) { if (msg != null) {
msg.timestamp("TCPConnection.runner.processSlice fetched"); msg.timestamp("TCPConnection.runner.processSlice fetched");
//_log.debug("Processing slice - msg to be sent"); //_log.debug("Processing slice - msg to be sent");
try { try {
byte data[] = msg.getMessageData(); byte data[] = msg.getMessageData();
msg.timestamp("TCPConnection.runner.processSlice before sending " + data.length + " bytes"); msg.timestamp("TCPConnection.runner.processSlice before sending "
if (_log.shouldLog(Log.DEBUG)) + data.length + " bytes");
_log.debug("Sending " + data.length + " bytes in the slice... to " + _remoteIdentity.getHash().toBase64()); if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending " + data.length + " bytes in the slice... to "
+ _remoteIdentity.getHash().toBase64());
synchronized (_out) { synchronized (_out) {
_out.write(data); _out.write(data);
_out.flush(); _out.flush();
} }
msg.timestamp("TCPConnection.runner.processSlice sent and flushed"); msg.timestamp("TCPConnection.runner.processSlice sent and flushed");
long end = Clock.getInstance().now(); long end = Clock.getInstance().now();
long timeLeft = msg.getMessage().getMessageExpiration().getTime() - end; long timeLeft = msg.getMessage().getMessageExpiration().getTime() - end;
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Message " + msg.getMessage().getClass().getName() + " (expiring in " + timeLeft + "ms) sent to " + _remoteIdentity.getHash().toBase64() + " from " + _myIdentity.getHash().toBase64() + _log.info("Message " + msg.getMessage().getClass().getName()
" over connection " + _id + " with " + data.length + " bytes in " + (end - start) + "ms"); + " (expiring in " + timeLeft + "ms) sent to "
if (timeLeft < 10*1000) { + _remoteIdentity.getHash().toBase64() + " from "
if (_log.shouldLog(Log.DEBUG)) + _myIdentity.getHash().toBase64()
_log.warn("Very little time left... time to send [" + (end-start) + "] time left [" + timeLeft + "] to " + + " over connection " + _id + " with " + data.length
_remoteIdentity.getHash().toBase64() + "\n" + msg.toString(), msg.getCreatedBy()); + " bytes in " + (end - start) + "ms");
} if (timeLeft < 10*1000) {
if (_log.shouldLog(Log.DEBUG))
_log.warn("Very little time left... time to send [" + (end-start)
+ "] time left [" + timeLeft + "] to "
+ _remoteIdentity.getHash().toBase64() + "\n" + msg.toString(),
msg.getCreatedBy());
}
long lifetime = msg.getLifetime(); long lifetime = msg.getLifetime();
if (lifetime > 10*1000) { if (lifetime > 10*1000) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("The processing of the message took way too long (" + lifetime + "ms) - time to send (" + (end-start) + "), time left (" + timeLeft + ") to " + _remoteIdentity.getHash().toBase64() + "\n" + msg.toString()); _log.warn("The processing of the message took way too long (" + lifetime
} + "ms) - time to send (" + (end-start) + "), time left (" + timeLeft
_transport.afterSend(msg, true); + ") to " + _remoteIdentity.getHash().toBase64() + "\n" + msg.toString());
}
_transport.afterSend(msg, true);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Processing slice - message sent completely: " + msg.getMessage().getClass().getName() + " to " + _remoteIdentity.getHash().toBase64()); _log.debug("Processing slice - message sent completely: "
if (end - afterExpire > 1000) { + msg.getMessage().getClass().getName() + " to "
if (_log.shouldLog(Log.WARN)) + _remoteIdentity.getHash().toBase64());
_log.warn("Actual sending took too long ( " + (end-afterExpire) + "ms) sending " + data.length + " bytes to " + _remoteIdentity.getHash().toBase64()); if (end - afterExpire > 1000) {
} if (_log.shouldLog(Log.WARN))
} catch (IOException ioe) { _log.warn("Actual sending took too long ( " + (end-afterExpire)
msg.timestamp("TCPConnection.runner.processSlice failed to send/flushflushed"); + "ms) sending " + data.length + " bytes to "
_transport.afterSend(msg, false); + _remoteIdentity.getHash().toBase64());
throw ioe; }
} } catch (IOException ioe) {
} msg.timestamp("TCPConnection.runner.processSlice failed to send/flushflushed");
} _transport.afterSend(msg, false);
throw ioe;
}
}
}
public void stopRunning() { public void stopRunning() {
_running = false; _running = false;
// stop the wait(...) // stop the wait(...)
synchronized (_toBeSent) { synchronized (_toBeSent) {
_toBeSent.notifyAll(); _toBeSent.notifyAll();
} }
} }
} }
} }