diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java b/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java index e0d26a1a41..185ce9de7c 100644 --- a/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java +++ b/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java @@ -79,311 +79,325 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener { private final static int DEFAULT_MAX_QUEUED_MESSAGES = 20; static { - StatManager.getInstance().createRateStat("tcp.queueSize", "How many messages were already in the queue when a new message was added?", "TCP Transport", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); + StatManager.getInstance().createRateStat("tcp.queueSize", "How many messages were already in the queue when a new message was added?", + "TCP Transport", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); } public TCPConnection(Socket s, RouterIdentity myIdent, SigningPrivateKey signingKey, boolean locallyInitiated) { - _id = ++_idCounter; - _weInitiated = locallyInitiated; - _closed = false; - _socket = s; - _myIdentity = myIdent; - _signingKey = signingKey; - _created = -1; - _toBeSent = new ArrayList(); - try { - _in = _socket.getInputStream(); - _out = _socket.getOutputStream(); - } catch (IOException ioe) { - _log.error("Error getting streams for the connection", ioe); - } - _builder = new DHSessionKeyBuilder(); - _extraBytes = null; - _lastSliceRun = -1; - - - _log.info("Connected with peer: " + s.getInetAddress() + ":" + s.getPort()); - updateMaxQueuedMessages(); + _id = ++_idCounter; + _weInitiated = locallyInitiated; + _closed = false; + _socket = s; + _myIdentity = myIdent; + _signingKey = signingKey; + _created = -1; + _toBeSent = new ArrayList(); + try { + _in = _socket.getInputStream(); + _out = _socket.getOutputStream(); + } catch (IOException ioe) { + _log.error("Error getting streams for the connection", ioe); + } + _builder = new DHSessionKeyBuilder(); + _extraBytes = null; + _lastSliceRun = -1; + + if (_log.shouldLog(Log.INFO)) + _log.info("Connected with peer: " + s.getInetAddress() + ":" + s.getPort()); + updateMaxQueuedMessages(); } /** how long has this connection been around for, or -1 if it isn't established yet */ public long getLifetime() { - if (_created > 0) - return Clock.getInstance().now() - _created; - else - return -1; + if (_created > 0) + return Clock.getInstance().now() - _created; + else + return -1; } protected boolean weInitiatedConnection() { return _weInitiated; } private void updateMaxQueuedMessages() { - String str = Router.getInstance().getConfigSetting(PARAM_MAX_QUEUED_MESSAGES); - if ( (str != null) && (str.trim().length() > 0) ) { - try { - int max = Integer.parseInt(str); - _maxQueuedMessages = max; - return; - } catch (NumberFormatException nfe) { - _log.warn("Invalid max queued messages [" + str + "]"); - } - } - _maxQueuedMessages = DEFAULT_MAX_QUEUED_MESSAGES; + String str = Router.getInstance().getConfigSetting(PARAM_MAX_QUEUED_MESSAGES); + if ( (str != null) && (str.trim().length() > 0) ) { + try { + int max = Integer.parseInt(str); + _maxQueuedMessages = max; + return; + } catch (NumberFormatException nfe) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Invalid max queued messages [" + str + "]"); + } + } + _maxQueuedMessages = DEFAULT_MAX_QUEUED_MESSAGES; } public RouterIdentity getRemoteRouterIdentity() { return _remoteIdentity; } int getId() { return _id; } int getPendingMessageCount() { - synchronized (_toBeSent) { - return _toBeSent.size(); - } + synchronized (_toBeSent) { + return _toBeSent.size(); + } } protected void exchangeKey() throws IOException, DataFormatException { - BigInteger myPub = _builder.getMyPublicValue(); - byte myPubBytes[] = myPub.toByteArray(); - DataHelper.writeLong(_out, 2, myPubBytes.length); - _out.write(myPubBytes); + BigInteger myPub = _builder.getMyPublicValue(); + byte myPubBytes[] = myPub.toByteArray(); + DataHelper.writeLong(_out, 2, myPubBytes.length); + _out.write(myPubBytes); - int rlen = (int)DataHelper.readLong(_in, 2); - byte peerPubBytes[] = new byte[rlen]; - int read = DataHelper.read(_in, peerPubBytes); + int rlen = (int)DataHelper.readLong(_in, 2); + byte peerPubBytes[] = new byte[rlen]; + int read = DataHelper.read(_in, peerPubBytes); + + if (_log.shouldLog(Log.DEBUG)) + _log.debug("rlen: " + rlen + " peerBytes: " + DataHelper.toString(peerPubBytes) + " read: " + read); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("rlen: " + rlen + " peerBytes: " + DataHelper.toString(peerPubBytes) + " read: " + read); - - BigInteger peerPub = new BigInteger(1, peerPubBytes); - _builder.setPeerPublicValue(peerPub); + BigInteger peerPub = new BigInteger(1, peerPubBytes); + _builder.setPeerPublicValue(peerPub); - _key = _builder.getSessionKey(); - _extraBytes = _builder.getExtraBytes(); - _iv = new byte[16]; - System.arraycopy(_extraBytes.getData(), 0, _iv, 0, Math.min(_extraBytes.getData().length, _iv.length)); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Session key: " + _key.toBase64() + " extra bytes: " + _extraBytes.getData().length); + _key = _builder.getSessionKey(); + _extraBytes = _builder.getExtraBytes(); + _iv = new byte[16]; + System.arraycopy(_extraBytes.getData(), 0, _iv, 0, Math.min(_extraBytes.getData().length, _iv.length)); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Session key: " + _key.toBase64() + " extra bytes: " + _extraBytes.getData().length); } protected boolean identifyStationToStation() throws IOException, DataFormatException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(512); - _myIdentity.writeBytes(baos); - Hash keyHash = SHA256Generator.getInstance().calculateHash(_key.getData()); - keyHash.writeBytes(baos); - Signature sig = DSAEngine.getInstance().sign(baos.toByteArray(), _signingKey); - sig.writeBytes(baos); + ByteArrayOutputStream baos = new ByteArrayOutputStream(512); + _myIdentity.writeBytes(baos); + Hash keyHash = SHA256Generator.getInstance().calculateHash(_key.getData()); + keyHash.writeBytes(baos); + Signature sig = DSAEngine.getInstance().sign(baos.toByteArray(), _signingKey); + sig.writeBytes(baos); + + byte encr[] = AESEngine.getInstance().safeEncrypt(baos.toByteArray(), _key, _iv, 1024); + DataHelper.writeLong(_out, 2, encr.length); + _out.write(encr); - byte encr[] = AESEngine.getInstance().safeEncrypt(baos.toByteArray(), _key, _iv, 1024); - DataHelper.writeLong(_out, 2, encr.length); - _out.write(encr); + // we've identified ourselves, now read who they are + int rlen = (int)DataHelper.readLong(_in, 2); + byte pencr[] = new byte[rlen]; + int read = DataHelper.read(_in, pencr); + if (read != rlen) + throw new DataFormatException("Not enough data in peer ident"); + byte decr[] = AESEngine.getInstance().safeDecrypt(pencr, _key, _iv); + if (decr == null) + throw new DataFormatException("Unable to decrypt - failed exchange?"); - // we've identified ourselves, now read who they are + ByteArrayInputStream bais = new ByteArrayInputStream(decr); + _remoteIdentity = new RouterIdentity(); + _remoteIdentity.readBytes(bais); + Hash peerKeyHash = new Hash(); + peerKeyHash.readBytes(bais); - int rlen = (int)DataHelper.readLong(_in, 2); - byte pencr[] = new byte[rlen]; - int read = DataHelper.read(_in, pencr); - if (read != rlen) - throw new DataFormatException("Not enough data in peer ident"); - byte decr[] = AESEngine.getInstance().safeDecrypt(pencr, _key, _iv); - if (decr == null) - throw new DataFormatException("Unable to decrypt - failed exchange?"); - - ByteArrayInputStream bais = new ByteArrayInputStream(decr); - _remoteIdentity = new RouterIdentity(); - _remoteIdentity.readBytes(bais); - Hash peerKeyHash = new Hash(); - peerKeyHash.readBytes(bais); - - if (!peerKeyHash.equals(keyHash)) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Peer tried to spoof!"); - return false; - } + if (!peerKeyHash.equals(keyHash)) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Peer tried to spoof!"); + return false; + } - Signature rsig = new Signature(); - rsig.readBytes(bais); - byte signedData[] = new byte[decr.length - rsig.getData().length]; - System.arraycopy(decr, 0, signedData, 0, signedData.length); - return DSAEngine.getInstance().verifySignature(rsig, signedData, _remoteIdentity.getSigningPublicKey()); + Signature rsig = new Signature(); + rsig.readBytes(bais); + byte signedData[] = new byte[decr.length - rsig.getData().length]; + System.arraycopy(decr, 0, signedData, 0, signedData.length); + return DSAEngine.getInstance().verifySignature(rsig, signedData, _remoteIdentity.getSigningPublicKey()); } protected final static int ESTABLISHMENT_TIMEOUT = 10*1000; // 10 second lag (not necessarily for the entire establish) public RouterIdentity establishConnection() { - BigInteger myPub = _builder.getMyPublicValue(); - try { - _socket.setSoTimeout(ESTABLISHMENT_TIMEOUT); - exchangeKey(); - // key exchanged. now say who we are and prove it - boolean ok = identifyStationToStation(); - - if (!ok) - throw new DataFormatException("Station to station identification failed! MITM?"); - else { - if (_log.shouldLog(Log.INFO)) - _log.info("TCP connection " + _id + " established with " + _remoteIdentity.getHash().toBase64()); - _in = new AESInputStream(new BandwidthLimitedInputStream(_in, _remoteIdentity), _key, _iv); - _out = new AESOutputStream(new BandwidthLimitedOutputStream(_out, _remoteIdentity), _key, _iv); - _socket.setSoTimeout(0); - established(); - return _remoteIdentity; - } - } catch (IOException ioe) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Error establishing connection", ioe); - closeConnection(); - return null; - } catch (DataFormatException dfe) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Error establishing connection", dfe); - closeConnection(); - return null; - } catch (Throwable t) { - if (_log.shouldLog(Log.ERROR)) - _log.error("jrandom is paranoid so we're catching it all during establishConnection", t); - closeConnection(); - return null; - } + BigInteger myPub = _builder.getMyPublicValue(); + try { + _socket.setSoTimeout(ESTABLISHMENT_TIMEOUT); + exchangeKey(); + // key exchanged. now say who we are and prove it + boolean ok = identifyStationToStation(); + + if (!ok) + throw new DataFormatException("Station to station identification failed! MITM?"); + else { + if (_log.shouldLog(Log.INFO)) + _log.info("TCP connection " + _id + " established with " + + _remoteIdentity.getHash().toBase64()); + _in = new AESInputStream(new BandwidthLimitedInputStream(_in, _remoteIdentity), _key, _iv); + _out = new AESOutputStream(new BandwidthLimitedOutputStream(_out, _remoteIdentity), _key, _iv); + _socket.setSoTimeout(0); + established(); + return _remoteIdentity; + } + } catch (IOException ioe) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error establishing connection", ioe); + closeConnection(); + return null; + } catch (DataFormatException dfe) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error establishing connection", dfe); + closeConnection(); + return null; + } catch (Throwable t) { + if (_log.shouldLog(Log.ERROR)) + _log.error("jrandom is paranoid so we're catching it all during establishConnection", t); + closeConnection(); + return null; + } } protected void established() { _created = Clock.getInstance().now(); } public void runConnection() { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Run connection"); - _runner = new ConnectionRunner(); - Thread t = new I2PThread(_runner); - t.setName("Run Conn [" + _id + "]"); - t.setDaemon(true); - t.start(); - _reader = new I2NPMessageReader(_in, this, "TCP Read [" + _id + "]"); - _reader.startReading(); + if (_log.shouldLog(Log.DEBUG)) + log.debug("Run connection"); + _runner = new ConnectionRunner(); + Thread t = new I2PThread(_runner); + t.setName("Run Conn [" + _id + "]"); + t.setDaemon(true); + t.start(); + _reader = new I2NPMessageReader(_in, this, "TCP Read [" + _id + "]"); + _reader.startReading(); } public void setTransport(TCPTransport trans) { _transport = trans; } public void addMessage(OutNetMessage msg) { - msg.timestamp("TCPConnection.addMessage"); - int totalPending = 0; - boolean fail = false; - long beforeAdd = Clock.getInstance().now(); - synchronized (_toBeSent) { - if ( (_maxQueuedMessages > 0) && (_toBeSent.size() >= _maxQueuedMessages) ) { - fail = true; - } else { - _toBeSent.add(msg); - totalPending = _toBeSent.size(); - // the ConnectionRunner.processSlice does a wait() until we have messages - } - _toBeSent.notifyAll(); - } - long afterAdd = Clock.getInstance().now(); + msg.timestamp("TCPConnection.addMessage"); + int totalPending = 0; + boolean fail = false; + long beforeAdd = Clock.getInstance().now(); + synchronized (_toBeSent) { + if ( (_maxQueuedMessages > 0) && (_toBeSent.size() >= _maxQueuedMessages) ) { + fail = true; + } else { + _toBeSent.add(msg); + totalPending = _toBeSent.size(); + // the ConnectionRunner.processSlice does a wait() until we have messages + } + _toBeSent.notifyAll(); + } + long afterAdd = Clock.getInstance().now(); + + StatManager.getInstance().addRateData("tcp.queueSize", totalPending-1, 0); + + if (fail) { + if (_log.shouldLog(Log.ERROR)) + _log.error("too many queued messages to " + _remoteIdentity.getHash().toBase64()); + + msg.timestamp("TCPConnection.addMessage exceeded max queued"); + _transport.afterSend(msg, false); + return; + } - StatManager.getInstance().addRateData("tcp.queueSize", totalPending-1, 0); - - if (fail) { - if (_log.shouldLog(Log.ERROR)) - _log.error("too many queued messages"); - - msg.timestamp("TCPConnection.addMessage exceeded max queued"); - _transport.afterSend(msg, false); - return; - } - - long diff = afterAdd - beforeAdd; - if (diff > 500) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Lock contention adding a message: " + diff + "ms"); - } - - msg.timestamp("TCPConnection.addMessage after toBeSent.add and notify"); - - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Add message with toBeSent.size = " + totalPending + " to " + _remoteIdentity.getHash().toBase64()); - if (totalPending <= 0) { - if (_log.shouldLog(Log.ERROR)) - _log.error("WTF, total pending after adding " + msg.getMessage().getClass().getName() + " <= 0! " + msg); - } - - if (slicesTooLong()) { - if (_log.shouldLog(Log.ERROR)) - _log.error("onAdd: Slices are taking too long (" + (Clock.getInstance().now()-_lastSliceRun) +"ms) - perhaps the remote side is disconnected or hung? remote=" + _remoteIdentity.getHash().toBase64()); - closeConnection(); - } + long diff = afterAdd - beforeAdd; + if (diff > 500) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Lock contention adding a message: " + diff + "ms"); + } + + msg.timestamp("TCPConnection.addMessage after toBeSent.add and notify"); + + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Add message with toBeSent.size = " + totalPending + " to " + _remoteIdentity.getHash().toBase64()); + if (totalPending <= 0) { + if (_log.shouldLog(Log.ERROR)) + _log.error("WTF, total pending after adding " + msg.getMessage().getClass().getName() + " <= 0! " + msg); + } + + if (slicesTooLong()) { + if (_log.shouldLog(Log.ERROR)) { + long sliceTime = Clock.getInstance().now()-_lastSliceRun; + _log.error("onAdd: Slices are taking too long (" + sliceTime + + "ms) - perhaps the remote side is disconnected or hung? remote=" + + _remoteIdentity.getHash().toBase64()); + } + closeConnection(); + } } void closeConnection() { - if (_closed) return; - _closed = true; - if (_remoteIdentity != null) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Closing the connection to " + _remoteIdentity.getHash().toBase64(), new Exception("Closed by")); - } else { - if (_socket != null) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Closing the unestablished connection with " + _socket.getInetAddress().toString() + ":" + _socket.getPort(), new Exception("Closed by")); - } else { - if (_log.shouldLog(Log.WARN)) - _log.warn("Closing the unestablished connection", new Exception("Closed by")); - } - } - if (_reader != null) _reader.stopReading(); - if (_runner != null) _runner.stopRunning(); - if (_in != null) try { _in.close(); } catch (IOException ioe) {} - if (_out != null) try { _out.close(); } catch (IOException ioe) {} - if (_socket != null) try { _socket.close(); } catch (IOException ioe) {} - if (_toBeSent != null) { - long now = Clock.getInstance().now(); - synchronized (_toBeSent) { - for (Iterator iter = _toBeSent.iterator(); iter.hasNext(); ) { - OutNetMessage msg = (OutNetMessage)iter.next(); - msg.timestamp("TCPTransport.closeConnection caused fail"); - if (_log.shouldLog(Log.WARN)) - _log.warn("Connection closed while the message was sitting on the TCP Connection's queue! too slow by: " + (now-msg.getExpiration()) + "ms: " + msg); - _transport.afterSend(msg, false); - } - _toBeSent.clear(); - } - } - _transport.connectionClosed(this); + if (_closed) return; + _closed = true; + if (_remoteIdentity != null) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Closing the connection to " + _remoteIdentity.getHash().toBase64(), + new Exception("Closed by")); + } else { + if (_socket != null) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Closing the unestablished connection with " + + _socket.getInetAddress().toString() + ":" + + _socket.getPort(), new Exception("Closed by")); + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn("Closing the unestablished connection", new Exception("Closed by")); + } + } + if (_reader != null) _reader.stopReading(); + if (_runner != null) _runner.stopRunning(); + if (_in != null) try { _in.close(); } catch (IOException ioe) {} + if (_out != null) try { _out.close(); } catch (IOException ioe) {} + if (_socket != null) try { _socket.close(); } catch (IOException ioe) {} + if (_toBeSent != null) { + long now = Clock.getInstance().now(); + synchronized (_toBeSent) { + for (Iterator iter = _toBeSent.iterator(); iter.hasNext(); ) { + OutNetMessage msg = (OutNetMessage)iter.next(); + msg.timestamp("TCPTransport.closeConnection caused fail"); + if (_log.shouldLog(Log.WARN)) + _log.warn("Connection closed while the message was sitting on the TCP Connection's queue! too slow by: " + + (now-msg.getExpiration()) + "ms: " + msg); + _transport.afterSend(msg, false); + } + _toBeSent.clear(); + } + } + _transport.connectionClosed(this); } List getPendingMessages() { return _toBeSent; } public void disconnected(I2NPMessageReader reader) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Remote disconnected: " + _remoteIdentity.getHash().toBase64()); - closeConnection(); + if (_log.shouldLog(Log.WARN)) + _log.warn("Remote disconnected: " + _remoteIdentity.getHash().toBase64()); + closeConnection(); } public void messageReceived(I2NPMessageReader reader, I2NPMessage message, long msToReceive) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Message received from " + _remoteIdentity.getHash().toBase64()); - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(32*1024); - message.writeBytes(baos); - int size = baos.size(); - // this is called by the I2NPMessageReader's thread, so it delays the reading from this peer only - //_log.debug("Delaying inbound for size " + size); - //BandwidthLimiter.getInstance().delayInbound(_remoteIdentity, size); - _transport.messageReceived(message, _remoteIdentity, null, msToReceive, size); - } catch (DataFormatException dfe) { - if (_log.shouldLog(Log.WARN)) - _log.warn("How did we read a message that is poorly formatted...", dfe); - } catch (IOException ioe) { - if (_log.shouldLog(Log.WARN)) - _log.warn("How did we read a message that can't be written to memory...", ioe); - } - - if (slicesTooLong()) { - if (_log.shouldLog(Log.ERROR)) - _log.error("onReceive: Slices are taking too long (" + (Clock.getInstance().now()-_lastSliceRun) +"ms) - perhaps the remote side is disconnected or hung? peer = " + _remoteIdentity.getHash().toBase64()); - closeConnection(); - } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Message received from " + _remoteIdentity.getHash().toBase64()); + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(32*1024); + message.writeBytes(baos); + int size = baos.size(); + // this is called by the I2NPMessageReader's thread, so it delays the reading from this peer only + //_log.debug("Delaying inbound for size " + size); + //BandwidthLimiter.getInstance().delayInbound(_remoteIdentity, size); + _transport.messageReceived(message, _remoteIdentity, null, msToReceive, size); + } catch (DataFormatException dfe) { + if (_log.shouldLog(Log.WARN)) + _log.warn("How did we read a message that is poorly formatted...", dfe); + } catch (IOException ioe) { + if (_log.shouldLog(Log.WARN)) + _log.warn("How did we read a message that can't be written to memory...", ioe); + } + + if (slicesTooLong()) { + if (_log.shouldLog(Log.ERROR)) { + long sliceTime = Clock.getInstance().now()-_lastSliceRun; + _log.error("onReceive: Slices are taking too long (" + sliceTime + + "ms) - perhaps the remote side is disconnected or hung? peer = " + + _remoteIdentity.getHash().toBase64()); + } + closeConnection(); + } } public void readError(I2NPMessageReader reader, Exception error) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Error reading from stream to " + _remoteIdentity.getHash().toBase64()); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Error reading from stream to " + _remoteIdentity.getHash().toBase64(), error); + if (_log.shouldLog(Log.ERROR)) + _log.error("Error reading from stream to " + _remoteIdentity.getHash().toBase64()); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Error reading from stream to " + _remoteIdentity.getHash().toBase64(), error); } /** @@ -402,132 +416,148 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener { * */ private boolean slicesTooLong() { - if (_lastSliceRun <= 0) return false; - long diff = Clock.getInstance().now() - _lastSliceRun; - return (diff > MAX_SLICE_DURATION); + if (_lastSliceRun <= 0) return false; + long diff = Clock.getInstance().now() - _lastSliceRun; + return (diff > MAX_SLICE_DURATION); } - class ConnectionRunner implements Runnable { - private boolean _running; - public void run() { - _running = true; - try { - while (_running) { - long startSlice = Clock.getInstance().now(); - _lastSliceRun = startSlice; - processSlice(); - long endSlice = Clock.getInstance().now(); - } - } catch (IOException ioe) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Connection runner failed with an IO exception to " + _remoteIdentity.getHash().toBase64(), ioe); - closeConnection(); - } catch (Throwable t) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Somehow we ran into an uncaught exception running the connection!", t); - closeConnection(); - } - } - - private void processSlice() throws IOException { - long start = Clock.getInstance().now(); - - OutNetMessage msg = null; - int remaining = 0; - List timedOut = new LinkedList(); - - synchronized (_toBeSent) { - // loop through, dropping expired messages, waiting until a non-expired - // one is added, or 30 seconds have passed (catchall in case things bork) - while (msg == null) { - if (_toBeSent.size() <= 0) { - try { - _toBeSent.wait(30*1000); - } catch (InterruptedException ie) {} - } - remaining = _toBeSent.size(); - if (remaining <= 0) return; - msg = (OutNetMessage)_toBeSent.remove(0); - remaining--; - if ( (msg.getExpiration() > 0) && (msg.getExpiration() < start) ) { - timedOut.add(msg); - msg = null; // keep looking - } - } - } - - for (Iterator iter = timedOut.iterator(); iter.hasNext(); ) { - OutNetMessage failed = (OutNetMessage)iter.next(); - if (_log.shouldLog(Log.WARN)) - _log.warn("Message timed out while sitting on the TCP Connection's queue! was too slow by: " + (start-msg.getExpiration()) + "ms to " + _remoteIdentity.getHash().toBase64() + ": " + msg); - msg.timestamp("TCPConnection.runner.processSlice expired"); - _transport.afterSend(msg, false); - return; - } - - if (remaining > 0) { - if (_log.shouldLog(Log.INFO)) - _log.info("After pulling off a pending message, there are still " + remaining + - " messages queued up for sending to " + _remoteIdentity.getHash().toBase64()); - } - - long afterExpire = Clock.getInstance().now(); - - if (msg != null) { - msg.timestamp("TCPConnection.runner.processSlice fetched"); - //_log.debug("Processing slice - msg to be sent"); - - try { - byte data[] = msg.getMessageData(); - msg.timestamp("TCPConnection.runner.processSlice before sending " + data.length + " bytes"); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Sending " + data.length + " bytes in the slice... to " + _remoteIdentity.getHash().toBase64()); - - synchronized (_out) { - _out.write(data); - _out.flush(); - } - - msg.timestamp("TCPConnection.runner.processSlice sent and flushed"); - long end = Clock.getInstance().now(); - long timeLeft = msg.getMessage().getMessageExpiration().getTime() - end; - if (_log.shouldLog(Log.INFO)) - _log.info("Message " + msg.getMessage().getClass().getName() + " (expiring in " + timeLeft + "ms) sent to " + _remoteIdentity.getHash().toBase64() + " from " + _myIdentity.getHash().toBase64() + - " over connection " + _id + " with " + data.length + " bytes in " + (end - start) + "ms"); - if (timeLeft < 10*1000) { - if (_log.shouldLog(Log.DEBUG)) - _log.warn("Very little time left... time to send [" + (end-start) + "] time left [" + timeLeft + "] to " + - _remoteIdentity.getHash().toBase64() + "\n" + msg.toString(), msg.getCreatedBy()); - } + private boolean _running; + public void run() { + _running = true; + try { + while (_running) { + long startSlice = Clock.getInstance().now(); + _lastSliceRun = startSlice; + processSlice(); + long endSlice = Clock.getInstance().now(); + } + } catch (IOException ioe) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Connection runner failed with an IO exception to " + + _remoteIdentity.getHash().toBase64(), ioe); + closeConnection(); + } catch (Throwable t) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Somehow we ran into an uncaught exception running the connection!", t); + closeConnection(); + } + } - long lifetime = msg.getLifetime(); - if (lifetime > 10*1000) { - if (_log.shouldLog(Log.WARN)) - _log.warn("The processing of the message took way too long (" + lifetime + "ms) - time to send (" + (end-start) + "), time left (" + timeLeft + ") to " + _remoteIdentity.getHash().toBase64() + "\n" + msg.toString()); - } - _transport.afterSend(msg, true); - - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Processing slice - message sent completely: " + msg.getMessage().getClass().getName() + " to " + _remoteIdentity.getHash().toBase64()); - if (end - afterExpire > 1000) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Actual sending took too long ( " + (end-afterExpire) + "ms) sending " + data.length + " bytes to " + _remoteIdentity.getHash().toBase64()); - } - } catch (IOException ioe) { - msg.timestamp("TCPConnection.runner.processSlice failed to send/flushflushed"); - _transport.afterSend(msg, false); - throw ioe; - } - } - } - - public void stopRunning() { - _running = false; - // stop the wait(...) - synchronized (_toBeSent) { - _toBeSent.notifyAll(); - } - } + private void processSlice() throws IOException { + long start = Clock.getInstance().now(); + + OutNetMessage msg = null; + int remaining = 0; + List timedOut = new LinkedList(); + + synchronized (_toBeSent) { + // loop through, dropping expired messages, waiting until a non-expired + // one is added, or 30 seconds have passed (catchall in case things bork) + while (msg == null) { + if (_toBeSent.size() <= 0) { + try { + _toBeSent.wait(30*1000); + } catch (InterruptedException ie) {} + } + remaining = _toBeSent.size(); + if (remaining <= 0) return; + msg = (OutNetMessage)_toBeSent.remove(0); + remaining--; + if ( (msg.getExpiration() > 0) && (msg.getExpiration() < start) ) { + timedOut.add(msg); + msg = null; // keep looking + } + } + } + + for (Iterator iter = timedOut.iterator(); iter.hasNext(); ) { + OutNetMessage failed = (OutNetMessage)iter.next(); + if (_log.shouldLog(Log.WARN)) + _log.warn("Message timed out while sitting on the TCP Connection's queue! was too slow by: " + + (start-msg.getExpiration()) + "ms to " + + _remoteIdentity.getHash().toBase64() + ": " + msg); + msg.timestamp("TCPConnection.runner.processSlice expired"); + _transport.afterSend(msg, false); + return; + } + + if (remaining > 0) { + if (_log.shouldLog(Log.INFO)) + _log.info("After pulling off a pending message, there are still " + remaining + + " messages queued up for sending to " + _remoteIdentity.getHash().toBase64()); + } + + long afterExpire = Clock.getInstance().now(); + + if (msg != null) { + msg.timestamp("TCPConnection.runner.processSlice fetched"); + //_log.debug("Processing slice - msg to be sent"); + + try { + byte data[] = msg.getMessageData(); + msg.timestamp("TCPConnection.runner.processSlice before sending " + + data.length + " bytes"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Sending " + data.length + " bytes in the slice... to " + + _remoteIdentity.getHash().toBase64()); + + synchronized (_out) { + _out.write(data); + _out.flush(); + } + + msg.timestamp("TCPConnection.runner.processSlice sent and flushed"); + long end = Clock.getInstance().now(); + long timeLeft = msg.getMessage().getMessageExpiration().getTime() - end; + if (_log.shouldLog(Log.INFO)) + _log.info("Message " + msg.getMessage().getClass().getName() + + " (expiring in " + timeLeft + "ms) sent to " + + _remoteIdentity.getHash().toBase64() + " from " + + _myIdentity.getHash().toBase64() + + " over connection " + _id + " with " + data.length + + " bytes in " + (end - start) + "ms"); + if (timeLeft < 10*1000) { + if (_log.shouldLog(Log.DEBUG)) + _log.warn("Very little time left... time to send [" + (end-start) + + "] time left [" + timeLeft + "] to " + + _remoteIdentity.getHash().toBase64() + "\n" + msg.toString(), + msg.getCreatedBy()); + } + + long lifetime = msg.getLifetime(); + if (lifetime > 10*1000) { + if (_log.shouldLog(Log.WARN)) + _log.warn("The processing of the message took way too long (" + lifetime + + "ms) - time to send (" + (end-start) + "), time left (" + timeLeft + + ") to " + _remoteIdentity.getHash().toBase64() + "\n" + msg.toString()); + } + _transport.afterSend(msg, true); + + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Processing slice - message sent completely: " + + msg.getMessage().getClass().getName() + " to " + + _remoteIdentity.getHash().toBase64()); + if (end - afterExpire > 1000) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Actual sending took too long ( " + (end-afterExpire) + + "ms) sending " + data.length + " bytes to " + + _remoteIdentity.getHash().toBase64()); + } + } catch (IOException ioe) { + msg.timestamp("TCPConnection.runner.processSlice failed to send/flushflushed"); + _transport.afterSend(msg, false); + throw ioe; + } + } + } + + public void stopRunning() { + _running = false; + // stop the wait(...) + synchronized (_toBeSent) { + _toBeSent.notifyAll(); + } + } } }