diff --git a/router/java/src/net/i2p/router/ClientMessagePool.java b/router/java/src/net/i2p/router/ClientMessagePool.java index f5b33d717..816fa91fb 100644 --- a/router/java/src/net/i2p/router/ClientMessagePool.java +++ b/router/java/src/net/i2p/router/ClientMessagePool.java @@ -35,12 +35,26 @@ public class ClientMessagePool { * */ public void add(ClientMessage msg) { - if ( (_context.clientManager().isLocal(msg.getDestination())) || + add(msg, false); + } + /** + * If we're coming from the client subsystem itself, we already know whether + * the target is definitely remote and as such don't need to recheck + * ourselves, but if we aren't certain, we want it to check for us. + * + * @param isDefinitelyRemote true if we know for sure that the target is not local + * + */ + public void add(ClientMessage msg, boolean isDefinitelyRemote) { + if ( !isDefinitelyRemote || + (_context.clientManager().isLocal(msg.getDestination())) || (_context.clientManager().isLocal(msg.getDestinationHash())) ) { - _log.debug("Adding message for local delivery"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Adding message for local delivery"); _context.clientManager().messageReceived(msg); } else { - _log.debug("Adding message for remote delivery"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Adding message for remote delivery"); _context.jobQueue().addJob(new OutboundClientMessageJob(_context, msg)); } } diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java index 9d95d6c86..5cdaac9c1 100644 --- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java +++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java @@ -13,7 +13,7 @@ import java.io.OutputStream; import java.net.Socket; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; @@ -35,6 +35,7 @@ import net.i2p.router.JobImpl; import net.i2p.router.RouterContext; import net.i2p.util.Log; import net.i2p.util.RandomSource; +import net.i2p.util.I2PThread; /** * Bridge the router and the client - managing state for a client. @@ -68,6 +69,7 @@ public class ClientConnectionRunner { * delivered to the client (so that we can be sure only to update when necessary) */ private List _alreadyProcessed; + private ClientWriterRunner _writer; /** are we, uh, dead */ private boolean _dead; @@ -82,11 +84,12 @@ public class ClientConnectionRunner { _socket = socket; _config = null; _messages = new HashMap(); - _alreadyProcessed = new LinkedList(); + _alreadyProcessed = new ArrayList(); _acceptedPending = new HashSet(); _dead = false; } + private static int __id = 0; /** * Actually run the connection - listen for I2CP messages and respond. This * is the main driver for this class, though it gets all its meat from the @@ -96,6 +99,12 @@ public class ClientConnectionRunner { public void startRunning() { try { _reader = new I2CPMessageReader(_socket.getInputStream(), new ClientMessageEventListener(_context, this)); + _writer = new ClientWriterRunner(); + I2PThread t = new I2PThread(_writer); + t.setName("Writer " + ++__id); + t.setDaemon(true); + t.setPriority(I2PThread.MIN_PRIORITY); + t.start(); _out = _socket.getOutputStream(); _reader.startReading(); } catch (IOException ioe) { @@ -112,6 +121,7 @@ public class ClientConnectionRunner { _dead = true; // we need these keys to unpublish the leaseSet if (_reader != null) _reader.stopReading(); + if (_writer != null) _writer.stopWriting(); if (_socket != null) try { _socket.close(); } catch (IOException ioe) { } synchronized (_messages) { _messages.clear(); @@ -143,9 +153,50 @@ public class ClientConnectionRunner { /** already closed? */ boolean isDead() { return _dead; } /** message body */ - Payload getPayload(MessageId id) { synchronized (_messages) { return (Payload)_messages.get(id); } } - void setPayload(MessageId id, Payload payload) { synchronized (_messages) { _messages.put(id, payload); } } - void removePayload(MessageId id) { synchronized (_messages) { _messages.remove(id); } } + Payload getPayload(MessageId id) { + Payload rv = null; + long beforeLock = _context.clock().now(); + long inLock = 0; + synchronized (_messages) { + inLock = _context.clock().now(); + rv = (Payload)_messages.get(id); + } + long afterLock = _context.clock().now(); + + if (afterLock - beforeLock > 50) { + _log.warn("alreadyAccepted.locking took too long: " + (afterLock-beforeLock) + + " overall, synchronized took " + (inLock - beforeLock)); + } + return rv; + } + void setPayload(MessageId id, Payload payload) { + long beforeLock = _context.clock().now(); + long inLock = 0; + synchronized (_messages) { + inLock = _context.clock().now(); + _messages.put(id, payload); + } + long afterLock = _context.clock().now(); + + if (afterLock - beforeLock > 50) { + _log.warn("setPayload.locking took too long: " + (afterLock-beforeLock) + + " overall, synchronized took " + (inLock - beforeLock)); + } + } + void removePayload(MessageId id) { + long beforeLock = _context.clock().now(); + long inLock = 0; + synchronized (_messages) { + inLock = _context.clock().now(); + _messages.remove(id); + } + long afterLock = _context.clock().now(); + + if (afterLock - beforeLock > 50) { + _log.warn("removePayload.locking took too long: " + (afterLock-beforeLock) + + " overall, synchronized took " + (inLock - beforeLock)); + } + } void sessionEstablished(SessionConfig config) { if (_log.shouldLog(Log.DEBUG)) @@ -183,8 +234,6 @@ public class ClientConnectionRunner { doSend(msg); } catch (I2CPMessageException ime) { _log.error("Error writing out the disconnect message", ime); - } catch (IOException ioe) { - _log.error("Error writing out the disconnect message", ioe); } stopRunning(); } @@ -200,15 +249,31 @@ public class ClientConnectionRunner { Destination dest = message.getDestination(); MessageId id = new MessageId(); id.setMessageId(getNextMessageId()); + long beforeLock = _context.clock().now(); + long inLock = 0; synchronized (_acceptedPending) { + inLock = _context.clock().now(); _acceptedPending.add(id); } + long afterLock = _context.clock().now(); + + if (afterLock - beforeLock > 50) { + _log.warn("distributeMessage.locking took too long: " + (afterLock-beforeLock) + + " overall, synchronized took " + (inLock - beforeLock)); + } + if (_log.shouldLog(Log.DEBUG)) _log.debug("** Recieving message [" + id.getMessageId() + "] with payload of size [" + payload.getSize() + "]" + " for session [" + _sessionId.getSessionId() + "]"); + long beforeDistribute = _context.clock().now(); // the following blocks as described above _manager.distributeMessage(_config.getDestination(), message.getDestination(), message.getPayload(), id); + long timeToDistribute = _context.clock().now() - beforeDistribute; + if (timeToDistribute > 50) + _log.warn("Took too long to distribute in the manager to " + + message.getDestination().calculateHash().toBase64() + ": " + + timeToDistribute); return id; } @@ -229,13 +294,20 @@ public class ClientConnectionRunner { status.setStatus(MessageStatusMessage.STATUS_SEND_ACCEPTED); try { doSend(status); + long beforeLock = _context.clock().now(); + long inLock = 0; synchronized (_acceptedPending) { + inLock = _context.clock().now(); _acceptedPending.remove(id); } + long afterLock = _context.clock().now(); + + if (afterLock - beforeLock > 50) { + _log.warn("ackSendMessage.locking took too long: " + (afterLock-beforeLock) + + " overall, synchronized took " + (inLock - beforeLock)); + } } catch (I2CPMessageException ime) { _log.error("Error writing out the message status message", ime); - } catch (IOException ioe) { - _log.error("Error writing out the message status message", ioe); } } @@ -282,11 +354,96 @@ public class ClientConnectionRunner { //// //// + /** + * Async writer class so that if a client app hangs, they wont take down the + * whole router with them (since otherwise the JobQueue would block until + * the client reads from their i2cp socket, causing all sorts of bad shit to + * happen) + * + */ + private class ClientWriterRunner implements Runnable { + private List _messagesToWrite; + public ClientWriterRunner() { + _messagesToWrite = new ArrayList(2); + } + + /** + * Add this message to the writer's queue + * + */ + public void addMessage(I2CPMessage msg) { + synchronized (_messagesToWrite) { + _messagesToWrite.add(msg); + _messagesToWrite.notify(); + } + } + + /** + * No more messages - dont even try to send what we have + * + */ + public void stopWriting() { + synchronized (_messagesToWrite) { + _messagesToWrite.notify(); + } + } + public void run() { + while (!_dead) { + I2CPMessage msg = null; + synchronized (_messagesToWrite) { + if (_messagesToWrite.size() > 0) { + // we do this test before and after wait, in case more than + // one message gets enqueued + msg = (I2CPMessage)_messagesToWrite.remove(0); + } else { + try { + _messagesToWrite.wait(); + } catch (InterruptedException ie) {} + + if (_messagesToWrite.size() > 0) + msg = (I2CPMessage)_messagesToWrite.remove(0); + } + } + if (msg != null) + writeMessage(msg); + } + } + + private void writeMessage(I2CPMessage msg) { + long before = _context.clock().now(); + try { + synchronized (_out) { + msg.writeMessage(_out); + _out.flush(); + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("after doSend of a "+ msg.getClass().getName() + " message"); + } catch (I2CPMessageException ime) { + _log.error("Message exception sending I2CP message", ime); + stopRunning(); + } catch (IOException ioe) { + _log.error("IO exception sending I2CP message", ioe); + stopRunning(); + } catch (Throwable t) { + _log.log(Log.CRIT, "Unhandled exception sending I2CP message", t); + stopRunning(); + } finally { + long after = _context.clock().now(); + long lag = after - before; + if (lag > 300) { + if (_log.shouldLog(Log.WARN)) + _log.warn("synchronization on the i2cp message send took too long (" + lag + + "ms): " + msg); + } + } + } + } + /** * Actually send the I2CPMessage to the peer through the socket * */ - void doSend(I2CPMessage msg) throws I2CPMessageException, IOException { + void doSend(I2CPMessage msg) throws I2CPMessageException { if (_out == null) throw new I2CPMessageException("Output stream is not initialized"); if (msg == null) throw new I2CPMessageException("Null message?!"); if (_log.shouldLog(Log.DEBUG)) { @@ -298,32 +455,8 @@ public class ClientConnectionRunner { + " message on for " + _config.getDestination().calculateHash().toBase64()); } - long before = _context.clock().now(); - try { - synchronized (_out) { - msg.writeMessage(_out); - _out.flush(); - } - if (_log.shouldLog(Log.DEBUG)) - _log.debug("after doSend of a "+ msg.getClass().getName() + " message"); - } catch (I2CPMessageException ime) { - _log.error("Message exception sending I2CP message", ime); - throw ime; - } catch (IOException ioe) { - _log.error("IO exception sending I2CP message", ioe); - throw ioe; - } catch (Throwable t) { - _log.log(Log.CRIT, "Unhandled exception sending I2CP message", t); - throw new IOException("Unhandled exception sending I2CP message: " + t.getMessage()); - } finally { - long after = _context.clock().now(); - long lag = after - before; - if (lag > 300) { - if (_log.shouldLog(Log.WARN)) - _log.warn("synchronization on the i2cp message send took too long (" + lag - + "ms): " + msg); - } - } + _writer.addMessage(msg); + } // this *should* be mod 65536, but UnsignedInteger is still b0rked. FIXME @@ -350,12 +483,21 @@ public class ClientConnectionRunner { boolean isPending = false; int pending = 0; String buf = null; + long beforeLock = _context.clock().now(); + long inLock = 0; synchronized (_acceptedPending) { + inLock = _context.clock().now(); if (_acceptedPending.contains(id)) isPending = true; pending = _acceptedPending.size(); buf = _acceptedPending.toString(); } + long afterLock = _context.clock().now(); + + if (afterLock - beforeLock > 50) { + _log.warn("alreadyAccepted.locking took too long: " + (afterLock-beforeLock) + + " overall, synchronized took " + (inLock - beforeLock)); + } if (pending >= 1) { _log.warn("Pending acks: " + pending + ": " + buf); } @@ -405,16 +547,28 @@ public class ClientConnectionRunner { return; } + boolean alreadyProcessed = false; + long beforeLock = MessageDeliveryStatusUpdate.this._context.clock().now(); + long inLock = 0; synchronized (_alreadyProcessed) { + inLock = MessageDeliveryStatusUpdate.this._context.clock().now(); if (_alreadyProcessed.contains(_messageId)) { _log.warn("Status already updated"); - return; + alreadyProcessed = true; } else { _alreadyProcessed.add(_messageId); while (_alreadyProcessed.size() > 10) _alreadyProcessed.remove(0); } } + long afterLock = MessageDeliveryStatusUpdate.this._context.clock().now(); + + if (afterLock - beforeLock > 50) { + _log.warn("MessageDeliveryStatusUpdate.locking took too long: " + (afterLock-beforeLock) + + " overall, synchronized took " + (inLock - beforeLock)); + } + + if (alreadyProcessed) return; if (_lastTried > 0) { if (_log.shouldLog(Log.DEBUG)) @@ -435,8 +589,6 @@ public class ClientConnectionRunner { doSend(msg); } catch (I2CPMessageException ime) { _log.warn("Error updating the status for message ID " + _messageId, ime); - } catch (IOException ioe) { - _log.warn("Error updating the status for message ID " + _messageId, ioe); } } } diff --git a/router/java/src/net/i2p/router/client/ClientManager.java b/router/java/src/net/i2p/router/client/ClientManager.java index ad5a9266f..f1064de1f 100644 --- a/router/java/src/net/i2p/router/client/ClientManager.java +++ b/router/java/src/net/i2p/router/client/ClientManager.java @@ -116,7 +116,8 @@ public class ClientManager { // check if there is a runner for it ClientConnectionRunner runner = getRunner(toDest); if (runner != null) { - _log.debug("Message " + msgId + " is targeting a local destination. distribute it as such"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Message " + msgId + " is targeting a local destination. distribute it as such"); runner.receiveMessage(toDest, fromDest, payload); if (fromDest != null) { ClientConnectionRunner sender = getRunner(fromDest); @@ -128,7 +129,8 @@ public class ClientManager { } } else { // remote. w00t - _log.debug("Message " + msgId + " is targeting a REMOTE destination! Added to the client message pool"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Message " + msgId + " is targeting a REMOTE destination! Added to the client message pool"); runner = getRunner(fromDest); ClientMessage msg = new ClientMessage(); msg.setDestination(toDest); @@ -137,7 +139,7 @@ public class ClientManager { msg.setSenderConfig(runner.getConfig()); msg.setFromDestination(runner.getConfig().getDestination()); msg.setMessageId(msgId); - _context.clientMessagePool().add(msg); + _context.clientMessagePool().add(msg, true); } } @@ -169,16 +171,35 @@ public class ClientManager { public boolean isLocal(Destination dest) { + boolean rv = false; + long beforeLock = _context.clock().now(); + long inLock = 0; synchronized (_runners) { - return (_runners.containsKey(dest)); + inLock = _context.clock().now(); + rv = _runners.containsKey(dest); } + long afterLock = _context.clock().now(); + + if (afterLock - beforeLock > 50) { + _log.warn("isLocal(Destination).locking took too long: " + (afterLock-beforeLock) + + " overall, synchronized took " + (inLock - beforeLock)); + } + return rv; } public boolean isLocal(Hash destHash) { if (destHash == null) return false; Set dests = new HashSet(); + long beforeLock = _context.clock().now(); + long inLock = 0; synchronized (_runners) { + inLock = _context.clock().now(); dests.addAll(_runners.keySet()); } + long afterLock = _context.clock().now(); + if (afterLock - beforeLock > 50) { + _log.warn("isLocal(Hash).locking took too long: " + (afterLock-beforeLock) + + " overall, synchronized took " + (inLock - beforeLock)); + } for (Iterator iter = dests.iterator(); iter.hasNext();) { Destination d = (Destination)iter.next(); if (d.calculateHash().equals(destHash)) return true; @@ -187,9 +208,19 @@ public class ClientManager { } private ClientConnectionRunner getRunner(Destination dest) { + ClientConnectionRunner rv = null; + long beforeLock = _context.clock().now(); + long inLock = 0; synchronized (_runners) { - return (ClientConnectionRunner)_runners.get(dest); + inLock = _context.clock().now(); + rv = (ClientConnectionRunner)_runners.get(dest); } + long afterLock = _context.clock().now(); + if (afterLock - beforeLock > 50) { + _log.warn("getRunner(Dest).locking took too long: " + (afterLock-beforeLock) + + " overall, synchronized took " + (inLock - beforeLock)); + } + return rv; } /** @@ -208,9 +239,18 @@ public class ClientManager { if (destHash == null) return null; Set dests = new HashSet(); + long beforeLock = _context.clock().now(); + long inLock = 0; synchronized (_runners) { + inLock = _context.clock().now(); dests.addAll(_runners.keySet()); } + long afterLock = _context.clock().now(); + if (afterLock - beforeLock > 50) { + _log.warn("getRunner(Hash).locking took too long: " + (afterLock-beforeLock) + + " overall, synchronized took " + (inLock - beforeLock)); + } + for (Iterator iter = dests.iterator(); iter.hasNext(); ) { Destination d = (Destination)iter.next(); if (d.calculateHash().equals(destHash)) @@ -235,9 +275,18 @@ public class ClientManager { private Set getRunnerDestinations() { Set dests = new HashSet(); + long beforeLock = _context.clock().now(); + long inLock = 0; synchronized (_runners) { + inLock = _context.clock().now(); dests.addAll(_runners.keySet()); } + long afterLock = _context.clock().now(); + if (afterLock - beforeLock > 50) { + _log.warn("getRunnerDestinations().locking took too long: " + (afterLock-beforeLock) + + " overall, synchronized took " + (inLock - beforeLock)); + } + return dests; } diff --git a/router/java/src/net/i2p/router/client/ClientMessageEventListener.java b/router/java/src/net/i2p/router/client/ClientMessageEventListener.java index 228fcd4b3..a86c7b2e9 100644 --- a/router/java/src/net/i2p/router/client/ClientMessageEventListener.java +++ b/router/java/src/net/i2p/router/client/ClientMessageEventListener.java @@ -104,8 +104,6 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi _runner.doSend(new SetDateMessage()); } catch (I2CPMessageException ime) { _log.error("Error writing out the setDate message", ime); - } catch (IOException ioe) { - _log.error("Error writing out the setDate message", ioe); } } private void handleSetDate(I2CPMessageReader reader, SetDateMessage message) { @@ -119,7 +117,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi */ private void handleCreateSession(I2CPMessageReader reader, CreateSessionMessage message) { if (message.getSessionConfig().verifySignature()) { - _log.debug("Signature verified correctly on create session message"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Signature verified correctly on create session message"); } else { _log.error("Signature verification *FAILED* on a create session message. Hijack attempt?"); _runner.disconnectClient("Invalid signature on CreateSessionMessage"); @@ -143,8 +142,6 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi _log.debug("after sessionEstablished for " + message.getSessionConfig().getDestination().calculateHash().toBase64()); } catch (I2CPMessageException ime) { _log.error("Error writing out the session status message", ime); - } catch (IOException ioe) { - _log.error("Error writing out the session status message", ioe); } _context.jobQueue().addJob(new CreateSessionJob(_context, _runner)); @@ -158,8 +155,12 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi */ private void handleSendMessage(I2CPMessageReader reader, SendMessageMessage message) { _log.debug("handleSendMessage called"); + long beforeDistribute = _context.clock().now(); MessageId id = _runner.distributeMessage(message); + long timeToDistribute = _context.clock().now() - beforeDistribute; _runner.ackSendMessage(id, message.getNonce()); + if (timeToDistribute > 50) + _log.warn("Took too long to distribute the message (which holds up the ack): " + timeToDistribute); } @@ -182,8 +183,6 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi msg.setPayload(payload); try { _runner.doSend(msg); - } catch (IOException ioe) { - _log.error("Error delivering the payload", ioe); } catch (I2CPMessageException ime) { _log.error("Error delivering the payload", ime); } diff --git a/router/java/src/net/i2p/router/client/MessageReceivedJob.java b/router/java/src/net/i2p/router/client/MessageReceivedJob.java index 7b76a64ac..f12fb42ea 100644 --- a/router/java/src/net/i2p/router/client/MessageReceivedJob.java +++ b/router/java/src/net/i2p/router/client/MessageReceivedJob.java @@ -65,8 +65,6 @@ class MessageReceivedJob extends JobImpl { _runner.doSend(msg); } catch (I2CPMessageException ime) { _log.error("Error writing out the message status message", ime); - } catch (IOException ioe) { - _log.error("Error writing out the message status message", ioe); } } } diff --git a/router/java/src/net/i2p/router/client/ReportAbuseJob.java b/router/java/src/net/i2p/router/client/ReportAbuseJob.java index e3c280f8a..2d68ce585 100644 --- a/router/java/src/net/i2p/router/client/ReportAbuseJob.java +++ b/router/java/src/net/i2p/router/client/ReportAbuseJob.java @@ -51,8 +51,6 @@ class ReportAbuseJob extends JobImpl { _runner.doSend(msg); } catch (I2CPMessageException ime) { _log.error("Error reporting abuse", ime); - } catch (IOException ioe) { - _log.error("Error reporting abuse", ioe); } } } diff --git a/router/java/src/net/i2p/router/client/RequestLeaseSetJob.java b/router/java/src/net/i2p/router/client/RequestLeaseSetJob.java index e75705453..8195ef4a4 100644 --- a/router/java/src/net/i2p/router/client/RequestLeaseSetJob.java +++ b/router/java/src/net/i2p/router/client/RequestLeaseSetJob.java @@ -85,12 +85,6 @@ class RequestLeaseSetJob extends JobImpl { _runner.setLeaseRequest(null); _runner.disconnectClient("I2CP error requesting leaseSet"); return; - } catch (IOException ioe) { - _log.error("Error sending I2CP message requesting the lease set", ioe); - state.setIsSuccessful(false); - _runner.setLeaseRequest(null); - _runner.disconnectClient("IO error requesting leaseSet"); - return; } }