diff --git a/core/java/src/net/i2p/client/I2PClient.java b/core/java/src/net/i2p/client/I2PClient.java index d0f4ac0500..521ed35b5b 100644 --- a/core/java/src/net/i2p/client/I2PClient.java +++ b/core/java/src/net/i2p/client/I2PClient.java @@ -40,6 +40,24 @@ public interface I2PClient { /** @since 0.8.1 */ public final static String PROP_RELIABILITY_NONE = "none"; + /** + * For router->client payloads. + * + * If false, the router will send the MessageStatus, + * the client must respond with a ReceiveMessageBegin, + * the router will send the MessagePayload, + * and the client respond with a ReceiveMessageEnd. + * + * If true, the router will send the MessagePayload immediately, + * and will not send a MessageStatus. + * The client will not send ReceiveMessageBegin or ReceiveMessageEnd. + * + * Default false, but the implementation in this package sets to true. + * + * @since 0.9.4 + */ + public final static String PROP_FAST_RECEIVE = "i2cp.fastReceive"; + /** protocol flag that must be sent when opening the i2cp connection to the router */ public final static int PROTOCOL_BYTE = 0x2A; diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index 010092cfd6..f710d623e4 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -136,6 +136,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa private long _lastActivity; private boolean _isReduced; + private final boolean _fastReceive; /** * @since 0.8.9 @@ -168,6 +169,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa if (options == null) options = (Properties) System.getProperties().clone(); loadConfig(options); + _fastReceive = Boolean.parseBoolean(_options.getProperty(I2PClient.PROP_FAST_RECEIVE)); } /** @@ -228,6 +230,10 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa _options.setProperty("i2cp.password", configPW); } } + if (_options.getProperty(I2PClient.PROP_FAST_RECEIVE) == null) + _options.setProperty(I2PClient.PROP_FAST_RECEIVE, "true"); + if (_options.getProperty(I2PClient.PROP_RELIABILITY) == null) + _options.setProperty(I2PClient.PROP_RELIABILITY, "none"); } /** save some memory, don't pass along the pointless properties */ @@ -280,6 +286,13 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa } catch (I2PSessionException ise) {} } + /** + * @since 0.9.4 + */ + public boolean getFastReceive() { + return _fastReceive; + } + void setLeaseSet(LeaseSet ls) { _leaseSet = ls; if (ls != null) { diff --git a/core/java/src/net/i2p/client/MessagePayloadMessageHandler.java b/core/java/src/net/i2p/client/MessagePayloadMessageHandler.java index 7b294e3b6e..214a5609bd 100644 --- a/core/java/src/net/i2p/client/MessagePayloadMessageHandler.java +++ b/core/java/src/net/i2p/client/MessagePayloadMessageHandler.java @@ -40,10 +40,19 @@ class MessagePayloadMessageHandler extends HandlerImpl { decryptPayload(msg, session); session.addNewMessage(msg); - ReceiveMessageEndMessage m = new ReceiveMessageEndMessage(); - m.setMessageId(id); - m.setSessionId(msg.getSessionId()); - session.sendMessage(m); + // Small chance of this, but + // if we are a new I2P lib talking to an old router + // and we don't send this, the router will OOM as it has + // no cleaner for old messages. + // TODO after 0.9.4 is out, check router version from handshake + // and send it all the time if 0.9.3 or less + // (needs router version saving support in SetDateMessageHandler) + //if (!session.getFastReceive()) { + ReceiveMessageEndMessage m = new ReceiveMessageEndMessage(); + m.setMessageId(id); + m.setSessionId(msg.getSessionId()); + session.sendMessage(m); + //} } catch (DataFormatException dfe) { session.propogateError("Error handling a new payload message", dfe); } catch (I2PSessionException ise) { diff --git a/history.txt b/history.txt index 8cd011371e..2b94ee3531 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,20 @@ +2012-11-02 zzz + * configstats: Fix group sorting, translate groups + * I2CP: + - Better fix for logging dropped messages (ticket #758) + - Implement fast receive to reduce per-message handshakes + - Make messageReliability=none the default + * i2psnark: + - Split buckets correctly + - More exploration fixes + * i2ptunnel: + - Better privkey backup file name + - Revert increment of privkey tunnel name + - Move deleted privkeys to backup dir + - Fix jsp build dependencies + - Fix layout issue on Chrome (ticket #757) + * KeyManager: Eliminate races, buffer I/O, eliminate periodic syncing + 2012-10-31 zzz * FIFOBandwidthRefiller: Reduce refill interval to smooth output * I2CP: Reduce log level when outbound queue is full (ticket #758) diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 0725033fa6..f6e6df3a5c 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 3; + public final static long BUILD = 4; /** for example "-test" */ public final static String EXTRA = ""; diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java index 446ccd3c07..cfe23fa986 100644 --- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java +++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java @@ -90,6 +90,8 @@ class ClientConnectionRunner { private volatile boolean _dead; /** For outbound traffic. true if i2cp.messageReliability = "none"; @since 0.8.1 */ private boolean _dontSendMSM; + /** For inbound traffic. true if i2cp.fastReceive = "true"; @since 0.9.4 */ + private boolean _dontSendMSMOnReceive; private final AtomicInteger _messageId; // messageId counter // Was 32767 since the beginning (04-2004). @@ -113,6 +115,7 @@ class ClientConnectionRunner { _log = _context.logManager().getLog(ClientConnectionRunner.class); _manager = manager; _socket = socket; + // unused for fastReceive _messages = new ConcurrentHashMap(); _alreadyProcessed = new ArrayList(); _acceptedPending = new ConcurrentHashSet(); @@ -136,7 +139,6 @@ class ClientConnectionRunner { I2PThread t = new I2PThread(_writer); t.setName("I2CP Writer " + __id.incrementAndGet()); t.setDaemon(true); - t.setPriority(I2PThread.MAX_PRIORITY); t.start(); _out = new BufferedOutputStream(_socket.getOutputStream()); _reader.startReading(); @@ -200,15 +202,24 @@ class ClientConnectionRunner { /** already closed? */ boolean isDead() { return _dead; } - /** message body */ + /** + * Only call if _dontSendMSMOnReceive is false, otherwise will always be null + */ Payload getPayload(MessageId id) { return _messages.get(id); } + /** + * Only call if _dontSendMSMOnReceive is false + */ void setPayload(MessageId id, Payload payload) { - _messages.put(id, payload); + if (!_dontSendMSMOnReceive) + _messages.put(id, payload); } + /** + * Only call if _dontSendMSMOnReceive is false + */ void removePayload(MessageId id) { _messages.remove(id); } @@ -221,8 +232,10 @@ class ClientConnectionRunner { // We process a few options here, but most are handled by the tunnel manager. // The ones here can't be changed later. Properties opts = config.getOptions(); - if (opts != null) - _dontSendMSM = "none".equals(config.getOptions().getProperty(I2PClient.PROP_RELIABILITY, "").toLowerCase(Locale.US)); + if (opts != null) { + _dontSendMSM = "none".equals(opts.getProperty(I2PClient.PROP_RELIABILITY, "").toLowerCase(Locale.US)); + _dontSendMSMOnReceive = Boolean.parseBoolean(opts.getProperty(I2PClient.PROP_FAST_RECEIVE)); + } // per-destination session key manager to prevent rather easy correlation if (_sessionKeyManager == null) { int tags = TransientSessionKeyManager.DEFAULT_TAGS; @@ -306,7 +319,7 @@ class ClientConnectionRunner { doSend(msg); } catch (I2CPMessageException ime) { if (_log.shouldLog(Log.WARN)) - _log.warn("Error writing out the disconnect message: " + ime); + _log.warn("Error writing out the disconnect message", ime); } // give it a little time to get sent out... // even better would be to have stopRunning() flush it? @@ -378,7 +391,8 @@ class ClientConnectionRunner { doSend(status); _acceptedPending.remove(id); } catch (I2CPMessageException ime) { - _log.error("Error writing out the message status message: " + ime); + if (_log.shouldLog(Log.WARN)) + _log.warn("Error writing out the message status message", ime); } } @@ -388,7 +402,7 @@ class ClientConnectionRunner { */ void receiveMessage(Destination toDest, Destination fromDest, Payload payload) { if (_dead) return; - MessageReceivedJob j = new MessageReceivedJob(_context, this, toDest, fromDest, payload); + MessageReceivedJob j = new MessageReceivedJob(_context, this, toDest, fromDest, payload, _dontSendMSMOnReceive); // This is fast and non-blocking, run in-line //_context.jobQueue().addJob(j); j.runJob(); @@ -680,7 +694,8 @@ class ClientConnectionRunner { try { doSend(msg); } catch (I2CPMessageException ime) { - _log.warn("Error updating the status for message ID " + _messageId, ime); + if (_log.shouldLog(Log.WARN)) + _log.warn("Error updating the status for message ID " + _messageId, ime); } } } diff --git a/router/java/src/net/i2p/router/client/ClientMessageEventListener.java b/router/java/src/net/i2p/router/client/ClientMessageEventListener.java index 23f60fb13a..9a895af734 100644 --- a/router/java/src/net/i2p/router/client/ClientMessageEventListener.java +++ b/router/java/src/net/i2p/router/client/ClientMessageEventListener.java @@ -69,8 +69,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi */ public void messageReceived(I2CPMessageReader reader, I2CPMessage message) { if (_runner.isDead()) return; - if (_log.shouldLog(Log.INFO)) - _log.info("Message recieved: \n" + message); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Message received: \n" + message); switch (message.getType()) { case GetDateMessage.MESSAGE_TYPE: handleGetDate(reader, (GetDateMessage)message); @@ -225,7 +225,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi MessageId id = _runner.distributeMessage(message); long timeToDistribute = _context.clock().now() - beforeDistribute; _runner.ackSendMessage(id, message.getNonce()); - _context.statManager().addRateData("client.distributeTime", timeToDistribute, timeToDistribute); + _context.statManager().addRateData("client.distributeTime", timeToDistribute); if ( (timeToDistribute > 50) && (_log.shouldLog(Log.WARN)) ) _log.warn("Took too long to distribute the message (which holds up the ack): " + timeToDistribute); } @@ -253,7 +253,9 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi try { _runner.doSend(msg); } catch (I2CPMessageException ime) { - _log.error("Error delivering the payload", ime); + if (_log.shouldLog(Log.WARN)) + _log.warn("Error delivering the payload", ime); + _runner.removePayload(new MessageId(message.getMessageId())); } } @@ -330,7 +332,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi try { _runner.doSend(msg); } catch (I2CPMessageException ime) { - _log.error("Error writing out the session status message", ime); + if (_log.shouldLog(Log.WARN)) + _log.warn("Error writing out the session status message", ime); } } @@ -348,7 +351,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi try { _runner.doSend(msg); } catch (I2CPMessageException ime) { - _log.error("Error writing out the session status message", ime); + if (_log.shouldLog(Log.WARN)) + _log.warn("Error writing out the session status message", ime); } } diff --git a/router/java/src/net/i2p/router/client/MessageReceivedJob.java b/router/java/src/net/i2p/router/client/MessageReceivedJob.java index 8b16153de0..0ee148282a 100644 --- a/router/java/src/net/i2p/router/client/MessageReceivedJob.java +++ b/router/java/src/net/i2p/router/client/MessageReceivedJob.java @@ -12,40 +12,51 @@ import net.i2p.data.Destination; import net.i2p.data.Payload; import net.i2p.data.i2cp.I2CPMessageException; import net.i2p.data.i2cp.MessageId; +import net.i2p.data.i2cp.MessagePayloadMessage; import net.i2p.data.i2cp.MessageStatusMessage; import net.i2p.router.JobImpl; import net.i2p.router.RouterContext; import net.i2p.util.Log; /** - * Async job to notify the client that a new message is available for them + * Async job to notify the client that a new message is available for them, + * or just send it directly if specified. * */ class MessageReceivedJob extends JobImpl { private final Log _log; private final ClientConnectionRunner _runner; private final Payload _payload; + private final boolean _sendDirect; - public MessageReceivedJob(RouterContext ctx, ClientConnectionRunner runner, Destination toDest, Destination fromDest, Payload payload) { + public MessageReceivedJob(RouterContext ctx, ClientConnectionRunner runner, Destination toDest, + Destination fromDest, Payload payload, boolean sendDirect) { super(ctx); _log = ctx.logManager().getLog(MessageReceivedJob.class); _runner = runner; _payload = payload; + _sendDirect = sendDirect; } public String getName() { return "Deliver New Message"; } public void runJob() { if (_runner.isDead()) return; - MessageId id = new MessageId(); - id.setMessageId(_runner.getNextMessageId()); - _runner.setPayload(id, _payload); + MessageId id = null; + long nextID = _runner.getNextMessageId(); try { - messageAvailable(id, _payload.getSize()); + if (_sendDirect) { + sendMessage(nextID); + } else { + id = new MessageId(nextID); + _runner.setPayload(id, _payload); + messageAvailable(id, _payload.getSize()); + } } catch (I2CPMessageException ime) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Error writing out the message status message", ime); - _runner.removePayload(id); + if (_log.shouldLog(Log.WARN)) + _log.warn("Error writing out the message", ime); + if (!_sendDirect) + _runner.removePayload(id); } } @@ -65,4 +76,16 @@ class MessageReceivedJob extends JobImpl { msg.setStatus(MessageStatusMessage.STATUS_AVAILABLE); _runner.doSend(msg); } + + /** + * Deliver the message directly, skip notification + * @since 0.9.4 + */ + private void sendMessage(long id) throws I2CPMessageException { + MessagePayloadMessage msg = new MessagePayloadMessage(); + msg.setMessageId(id); + msg.setSessionId(_runner.getSessionId().getSessionId()); + msg.setPayload(_payload); + _runner.doSend(msg); + } } diff --git a/router/java/src/net/i2p/router/client/QueuedClientConnectionRunner.java b/router/java/src/net/i2p/router/client/QueuedClientConnectionRunner.java index aa2e9415c3..82849a4ac0 100644 --- a/router/java/src/net/i2p/router/client/QueuedClientConnectionRunner.java +++ b/router/java/src/net/i2p/router/client/QueuedClientConnectionRunner.java @@ -60,16 +60,13 @@ class QueuedClientConnectionRunner extends ClientConnectionRunner { /** * Actually send the I2CPMessage to the client. * Nonblocking. + * @throws I2CPMessageException if queue full or on other errors */ @Override void doSend(I2CPMessage msg) throws I2CPMessageException { - // This will never fail, for now, as the router uses unbounded queues - // Perhaps in the future we may want to use bounded queues, - // with non-blocking writes for the router - // and blocking writes for the client? boolean success = queue.offer(msg); - if (!success && _log.shouldLog(Log.WARN)) - _log.warn("I2CP write to queue failed: " + msg); + if (!success) + throw new I2CPMessageException("I2CP write to queue failed"); } }