diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java index 2d22226d3..a56e7753d 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java @@ -82,7 +82,16 @@ class PacketQueue { // this should not block! begin = _context.clock().now(); - sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent); + long expires = 0; + Connection.ResendPacketEvent rpe = (Connection.ResendPacketEvent) packet.getResendEvent(); + if (rpe != null) + // we want the router to expire it a little before we do, + // so if we retransmit it will use a new tunnel/lease combo + expires = rpe.getNextSendTime() - 500; + if (expires > 0) + sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent, expires); + else + sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent); end = _context.clock().now(); if ( (end-begin > 1000) && (_log.shouldLog(Log.WARN)) ) diff --git a/core/java/src/net/i2p/client/I2CPMessageProducer.java b/core/java/src/net/i2p/client/I2CPMessageProducer.java index 9af1fbd19..5b45ee7a3 100644 --- a/core/java/src/net/i2p/client/I2CPMessageProducer.java +++ b/core/java/src/net/i2p/client/I2CPMessageProducer.java @@ -9,6 +9,7 @@ package net.i2p.client; * */ +import java.util.Date; import java.util.Set; import net.i2p.I2PAppContext; @@ -28,6 +29,7 @@ import net.i2p.data.i2cp.DestroySessionMessage; import net.i2p.data.i2cp.MessageId; import net.i2p.data.i2cp.ReportAbuseMessage; import net.i2p.data.i2cp.SendMessageMessage; +import net.i2p.data.i2cp.SendMessageExpiresMessage; import net.i2p.data.i2cp.SessionConfig; import net.i2p.util.Log; @@ -91,8 +93,13 @@ class I2CPMessageProducer { * */ public void sendMessage(I2PSessionImpl session, Destination dest, long nonce, byte[] payload, SessionTag tag, - SessionKey key, Set tags, SessionKey newKey) throws I2PSessionException { - SendMessageMessage msg = new SendMessageMessage(); + SessionKey key, Set tags, SessionKey newKey, long expires) throws I2PSessionException { + SendMessageMessage msg; + if (expires > 0) { + msg = new SendMessageExpiresMessage(); + ((SendMessageExpiresMessage)msg).setExpiration(new Date(expires)); + } else + msg = new SendMessageMessage(); msg.setDestination(dest); msg.setSessionId(session.getSessionId()); msg.setNonce(nonce); diff --git a/core/java/src/net/i2p/client/I2PSession.java b/core/java/src/net/i2p/client/I2PSession.java index 627d1775a..d8c64f222 100644 --- a/core/java/src/net/i2p/client/I2PSession.java +++ b/core/java/src/net/i2p/client/I2PSession.java @@ -70,6 +70,7 @@ public interface I2PSession { */ public boolean sendMessage(Destination dest, byte[] payload, SessionKey keyUsed, Set tagsSent) throws I2PSessionException; public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent) throws I2PSessionException; + public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire) throws I2PSessionException; /** Receive a message that the router has notified the client about, returning * the payload. diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index 78f4ba763..a57957107 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -550,10 +550,10 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa * Pass off the error to the listener */ void propogateError(String msg, Throwable error) { - if (_log.shouldLog(Log.WARN)) - _log.warn(getPrefix() + "Error occurred: " + msg + " - " + error.getMessage()); - if (_log.shouldLog(Log.WARN)) - _log.warn(getPrefix() + " cause", error); + if (_log.shouldLog(Log.ERROR)) + _log.error(getPrefix() + "Error occurred: " + msg + " - " + error.getMessage()); + if (_log.shouldLog(Log.ERROR)) + _log.error(getPrefix() + " cause", error); if (_sessionListener != null) _sessionListener.errorOccurred(this, msg, error); } diff --git a/core/java/src/net/i2p/client/I2PSessionImpl2.java b/core/java/src/net/i2p/client/I2PSessionImpl2.java index 81c6ef22f..6a90952a5 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl2.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl2.java @@ -107,15 +107,19 @@ class I2PSessionImpl2 extends I2PSessionImpl { return sendMessage(dest, payload, 0, payload.length); } public boolean sendMessage(Destination dest, byte[] payload, int offset, int size) throws I2PSessionException { - return sendMessage(dest, payload, offset, size, new SessionKey(), new HashSet(64)); + return sendMessage(dest, payload, offset, size, new SessionKey(), new HashSet(64), 0); } @Override public boolean sendMessage(Destination dest, byte[] payload, SessionKey keyUsed, Set tagsSent) throws I2PSessionException { - return sendMessage(dest, payload, 0, payload.length, keyUsed, tagsSent); + return sendMessage(dest, payload, 0, payload.length, keyUsed, tagsSent, 0); } public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent) throws I2PSessionException { + return sendMessage(dest, payload, offset, size, keyUsed, tagsSent, 0); + } + public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expires) + throws I2PSessionException { if (_log.shouldLog(Log.DEBUG)) _log.debug("sending message"); if (isClosed()) throw new I2PSessionException("Already closed"); @@ -142,7 +146,7 @@ class I2PSessionImpl2 extends I2PSessionImpl { } _context.statManager().addRateData("i2cp.tx.msgCompressed", compressed, 0); _context.statManager().addRateData("i2cp.tx.msgExpanded", size, 0); - return sendBestEffort(dest, payload, keyUsed, tagsSent); + return sendBestEffort(dest, payload, keyUsed, tagsSent, expires); } /** @@ -168,7 +172,7 @@ class I2PSessionImpl2 extends I2PSessionImpl { private static final int NUM_TAGS = 50; - private boolean sendBestEffort(Destination dest, byte payload[], SessionKey keyUsed, Set tagsSent) + private boolean sendBestEffort(Destination dest, byte payload[], SessionKey keyUsed, Set tagsSent, long expires) throws I2PSessionException { SessionKey key = null; SessionKey newKey = null; @@ -176,6 +180,7 @@ class I2PSessionImpl2 extends I2PSessionImpl { Set sentTags = null; int oldTags = 0; long begin = _context.clock().now(); + /*********** if (I2CPMessageProducer.END_TO_END_CRYPTO) { if (_log.shouldLog(Log.DEBUG)) _log.debug("begin sendBestEffort"); key = _context.sessionKeyManager().getCurrentKey(dest.getPublicKey()); @@ -220,6 +225,7 @@ class I2PSessionImpl2 extends I2PSessionImpl { } else { // not using end to end crypto, so don't ever bundle any tags } + **********/ if (_log.shouldLog(Log.DEBUG)) _log.debug("before creating nonce"); @@ -233,14 +239,14 @@ class I2PSessionImpl2 extends I2PSessionImpl { if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Setting key = " + key); if (keyUsed != null) { - if (I2CPMessageProducer.END_TO_END_CRYPTO) { - if (newKey != null) - keyUsed.setData(newKey.getData()); - else - keyUsed.setData(key.getData()); - } else { + //if (I2CPMessageProducer.END_TO_END_CRYPTO) { + // if (newKey != null) + // keyUsed.setData(newKey.getData()); + // else + // keyUsed.setData(key.getData()); + //} else { keyUsed.setData(SessionKey.INVALID_KEY.getData()); - } + //} } if (tagsSent != null) { if (sentTags != null) { @@ -261,7 +267,7 @@ class I2PSessionImpl2 extends I2PSessionImpl { + state.getNonce() + " for best effort " + " sync took " + (inSendingSync-beforeSendingSync) + " add took " + (afterSendingSync-inSendingSync)); - _producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey); + _producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey, expires); // since this is 'best effort', all we're waiting for is a status update // saying that the router received it - in theory, that should come back diff --git a/core/java/src/net/i2p/client/RequestLeaseSetMessageHandler.java b/core/java/src/net/i2p/client/RequestLeaseSetMessageHandler.java index 7d6d816c1..6163771e3 100644 --- a/core/java/src/net/i2p/client/RequestLeaseSetMessageHandler.java +++ b/core/java/src/net/i2p/client/RequestLeaseSetMessageHandler.java @@ -21,6 +21,7 @@ import net.i2p.data.Lease; import net.i2p.data.LeaseSet; import net.i2p.data.PrivateKey; import net.i2p.data.PublicKey; +import net.i2p.data.SessionKey; import net.i2p.data.SigningPrivateKey; import net.i2p.data.SigningPublicKey; import net.i2p.data.i2cp.I2CPMessage; @@ -78,6 +79,17 @@ class RequestLeaseSetMessageHandler extends HandlerImpl { leaseSet.setEncryptionKey(li.getPublicKey()); leaseSet.setSigningKey(li.getSigningPublicKey()); + String sk = session.getOptions().getProperty("i2cp.sessionKey"); + if (sk != null) { + SessionKey key = new SessionKey(); + try { + key.fromBase64(sk); + leaseSet.encrypt(key); + _context.keyRing().put(session.getMyDestination().calculateHash(), key); + } catch (DataFormatException dfe) { + _log.error("Bad session key: " + sk); + } + } try { leaseSet.sign(session.getPrivateKey()); session.getProducer().createLeaseSet(session, leaseSet, li.getSigningPrivateKey(), li.getPrivateKey()); @@ -137,4 +149,4 @@ class RequestLeaseSetMessageHandler extends HandlerImpl { && DataHelper.eq(_signingPrivKey, li.getSigningPrivateKey()); } } -} \ No newline at end of file +} diff --git a/core/java/src/net/i2p/data/i2cp/I2CPMessageHandler.java b/core/java/src/net/i2p/data/i2cp/I2CPMessageHandler.java index 128c312dc..15045028a 100644 --- a/core/java/src/net/i2p/data/i2cp/I2CPMessageHandler.java +++ b/core/java/src/net/i2p/data/i2cp/I2CPMessageHandler.java @@ -18,7 +18,7 @@ import net.i2p.data.DataHelper; import net.i2p.util.Log; /** - * Handle messages from the server for the client + * Handle messages from the server for the client or vice versa * */ public class I2CPMessageHandler { @@ -75,6 +75,8 @@ public class I2CPMessageHandler { return new RequestLeaseSetMessage(); case SendMessageMessage.MESSAGE_TYPE: return new SendMessageMessage(); + case SendMessageExpiresMessage.MESSAGE_TYPE: + return new SendMessageExpiresMessage(); case SessionStatusMessage.MESSAGE_TYPE: return new SessionStatusMessage(); case GetDateMessage.MESSAGE_TYPE: diff --git a/core/java/src/net/i2p/data/i2cp/ReconfigureSessionMessage.java b/core/java/src/net/i2p/data/i2cp/ReconfigureSessionMessage.java new file mode 100644 index 000000000..7165f6d32 --- /dev/null +++ b/core/java/src/net/i2p/data/i2cp/ReconfigureSessionMessage.java @@ -0,0 +1,103 @@ +package net.i2p.data.i2cp; + +/* + * free (adj.): unencumbered; not under the control of others + * Written by jrandom in 2003 and released into the public domain + * with no warranty of any kind, either expressed or implied. + * It probably won't make your computer catch on fire, or eat + * your children, but it might. Use at your own risk. + * + */ + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; + +import net.i2p.data.DataFormatException; +import net.i2p.data.DataHelper; +import net.i2p.util.Log; + +/** + * Defines the message a client sends to a router when + * updating the config on an existing session. + * + * @author zzz + */ +public class ReconfigureSessionMessage extends I2CPMessageImpl { + private final static Log _log = new Log(ReconfigureSessionMessage.class); + public final static int MESSAGE_TYPE = 2; + private SessionId _sessionId; + private SessionConfig _sessionConfig; + + public ReconfigureSessionMessage() { + _sessionId = null; + _sessionConfig = null; + } + + public SessionId getSessionId() { + return _sessionId; + } + + public void setSessionId(SessionId id) { + _sessionId = id; + } + + public SessionConfig getSessionConfig() { + return _sessionConfig; + } + + public void setSessionConfig(SessionConfig config) { + _sessionConfig = config; + } + + @Override + protected void doReadMessage(InputStream in, int size) throws I2CPMessageException, IOException { + try { + _sessionId = new SessionId(); + _sessionId.readBytes(in); + _sessionConfig = new SessionConfig(); + _sessionConfig.readBytes(in); + } catch (DataFormatException dfe) { + throw new I2CPMessageException("Unable to load the message data", dfe); + } + } + + @Override + protected byte[] doWriteMessage() throws I2CPMessageException, IOException { + if (_sessionId == null || _sessionConfig == null) + throw new I2CPMessageException("Unable to write out the message as there is not enough data"); + ByteArrayOutputStream os = new ByteArrayOutputStream(64); + try { + _sessionId.writeBytes(os); + _sessionConfig.writeBytes(os); + } catch (DataFormatException dfe) { + throw new I2CPMessageException("Error writing out the message data", dfe); + } + return os.toByteArray(); + } + + public int getType() { + return MESSAGE_TYPE; + } + + @Override + public boolean equals(Object object) { + if ((object != null) && (object instanceof ReconfigureSessionMessage)) { + ReconfigureSessionMessage msg = (ReconfigureSessionMessage) object; + return DataHelper.eq(getSessionId(), msg.getSessionId()) + && DataHelper.eq(getSessionConfig(), msg.getSessionConfig()); + } + + return false; + } + + @Override + public String toString() { + StringBuffer buf = new StringBuffer(); + buf.append("[ReconfigureSessionMessage: "); + buf.append("\n\tSessionId: ").append(getSessionId()); + buf.append("\n\tSessionConfig: ").append(getSessionConfig()); + buf.append("]"); + return buf.toString(); + } +} diff --git a/core/java/src/net/i2p/data/i2cp/SendMessageExpiresMessage.java b/core/java/src/net/i2p/data/i2cp/SendMessageExpiresMessage.java new file mode 100644 index 000000000..d15c1979c --- /dev/null +++ b/core/java/src/net/i2p/data/i2cp/SendMessageExpiresMessage.java @@ -0,0 +1,117 @@ +package net.i2p.data.i2cp; + +/* + * free (adj.): unencumbered; not under the control of others + * Written by jrandom in 2003 and released into the public domain + * with no warranty of any kind, either expressed or implied. + * It probably won't make your computer catch on fire, or eat + * your children, but it might. Use at your own risk. + * + */ + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Date; + +import net.i2p.data.DataFormatException; +import net.i2p.data.DataHelper; +import net.i2p.data.Destination; +import net.i2p.data.Payload; +import net.i2p.util.Log; + +/** + * Same as SendMessageMessage, but with an expiration to be passed to the router + * + * @author zzz + */ +public class SendMessageExpiresMessage extends SendMessageMessage { + private final static Log _log = new Log(SendMessageExpiresMessage.class); + public final static int MESSAGE_TYPE = 36; + private SessionId _sessionId; + private Destination _destination; + private Payload _payload; + private Date _expiration; + + public SendMessageExpiresMessage() { + super(); + setExpiration(null); + } + + public Date getExpiration() { + return _expiration; + } + + public void setExpiration(Date d) { + _expiration = d; + } + + /** + * Read the body into the data structures + * + * @throws IOException + */ + @Override + public void readMessage(InputStream in, int length, int type) throws I2CPMessageException, IOException { + super.readMessage(in, length, type); + + try { + _expiration = DataHelper.readDate(in); + } catch (DataFormatException dfe) { + throw new I2CPMessageException("Unable to load the message data", dfe); + } + } + + /** + * Write out the full message to the stream, including the 4 byte size and 1 + * byte type header. Override the parent so we can be more mem efficient + * + * @throws IOException + */ + @Override + public void writeMessage(OutputStream out) throws I2CPMessageException, IOException { + if ((getSessionId() == null) || (getDestination() == null) || (getPayload() == null) || (getNonce() <= 0) || (_expiration == null)) + throw new I2CPMessageException("Unable to write out the message as there is not enough data"); + int len = 2 + getDestination().size() + getPayload().getSize() + 4 + 4 + DataHelper.DATE_LENGTH; + + try { + DataHelper.writeLong(out, 4, len); + DataHelper.writeLong(out, 1, getType()); + getSessionId().writeBytes(out); + getDestination().writeBytes(out); + getPayload().writeBytes(out); + DataHelper.writeLong(out, 4, getNonce()); + DataHelper.writeDate(out, _expiration); + } catch (DataFormatException dfe) { + throw new I2CPMessageException("Error writing the msg", dfe); + } + } + + public int getType() { + return MESSAGE_TYPE; + } + + @Override + public boolean equals(Object object) { + if ((object != null) && (object instanceof SendMessageExpiresMessage)) { + SendMessageExpiresMessage msg = (SendMessageExpiresMessage) object; + return super.equals(object) + && DataHelper.eq(getExpiration(), msg.getExpiration()); + } + + return false; + } + + @Override + public String toString() { + StringBuffer buf = new StringBuffer(); + buf.append("[SendMessageMessage: "); + buf.append("\n\tSessionId: ").append(getSessionId()); + buf.append("\n\tNonce: ").append(getNonce()); + buf.append("\n\tDestination: ").append(getDestination()); + buf.append("\n\tExpiration: ").append(getExpiration()); + buf.append("\n\tPayload: ").append(getPayload()); + buf.append("]"); + return buf.toString(); + } +} diff --git a/router/java/src/net/i2p/router/ClientMessage.java b/router/java/src/net/i2p/router/ClientMessage.java index 005f69a2d..ec7820d69 100644 --- a/router/java/src/net/i2p/router/ClientMessage.java +++ b/router/java/src/net/i2p/router/ClientMessage.java @@ -27,6 +27,7 @@ public class ClientMessage { private SessionConfig _senderConfig; private Hash _destinationHash; private MessageId _messageId; + private long _expiration; public ClientMessage() { setPayload(null); @@ -36,6 +37,7 @@ public class ClientMessage { setSenderConfig(null); setDestinationHash(null); setMessageId(null); + setExpiration(0); } /** @@ -91,4 +93,12 @@ public class ClientMessage { */ public SessionConfig getSenderConfig() { return _senderConfig; } public void setSenderConfig(SessionConfig config) { _senderConfig = config; } + + /** + * Expiration requested by the client that sent the message. This will only be available + * for locally originated messages. + * + */ + public long getExpiration() { return _expiration; } + public void setExpiration(long e) { _expiration = e; } } diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java index 544badcad..133ad142c 100644 --- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java +++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java @@ -29,6 +29,7 @@ import net.i2p.data.i2cp.I2CPMessageReader; import net.i2p.data.i2cp.MessageId; import net.i2p.data.i2cp.MessageStatusMessage; import net.i2p.data.i2cp.SendMessageMessage; +import net.i2p.data.i2cp.SendMessageExpiresMessage; import net.i2p.data.i2cp.SessionConfig; import net.i2p.data.i2cp.SessionId; import net.i2p.router.Job; @@ -270,6 +271,9 @@ public class ClientConnectionRunner { Destination dest = message.getDestination(); MessageId id = new MessageId(); id.setMessageId(getNextMessageId()); + long expiration = 0; + if (message instanceof SendMessageExpiresMessage) + expiration = ((SendMessageExpiresMessage) message).getExpiration().getTime(); long beforeLock = _context.clock().now(); long inLock = 0; synchronized (_acceptedPending) { @@ -291,7 +295,7 @@ public class ClientConnectionRunner { // the following blocks as described above SessionConfig cfg = _config; if (cfg != null) - _manager.distributeMessage(cfg.getDestination(), dest, payload, id); + _manager.distributeMessage(cfg.getDestination(), dest, payload, id, expiration); long timeToDistribute = _context.clock().now() - beforeDistribute; if (_log.shouldLog(Log.DEBUG)) _log.warn("Time to distribute in the manager to " diff --git a/router/java/src/net/i2p/router/client/ClientManager.java b/router/java/src/net/i2p/router/client/ClientManager.java index d9838ef7b..18c9c7742 100644 --- a/router/java/src/net/i2p/router/client/ClientManager.java +++ b/router/java/src/net/i2p/router/client/ClientManager.java @@ -140,7 +140,7 @@ public class ClientManager { } } - void distributeMessage(Destination fromDest, Destination toDest, Payload payload, MessageId msgId) { + void distributeMessage(Destination fromDest, Destination toDest, Payload payload, MessageId msgId, long expiration) { // check if there is a runner for it ClientConnectionRunner runner = getRunner(toDest); if (runner != null) { @@ -168,6 +168,7 @@ public class ClientManager { msg.setSenderConfig(runner.getConfig()); msg.setFromDestination(runner.getConfig().getDestination()); msg.setMessageId(msgId); + msg.setExpiration(expiration); _ctx.clientMessagePool().add(msg, true); } } diff --git a/router/java/src/net/i2p/router/client/ClientMessageEventListener.java b/router/java/src/net/i2p/router/client/ClientMessageEventListener.java index 033e28f2f..d36d26401 100644 --- a/router/java/src/net/i2p/router/client/ClientMessageEventListener.java +++ b/router/java/src/net/i2p/router/client/ClientMessageEventListener.java @@ -21,7 +21,9 @@ import net.i2p.data.i2cp.MessageId; import net.i2p.data.i2cp.MessagePayloadMessage; import net.i2p.data.i2cp.ReceiveMessageBeginMessage; import net.i2p.data.i2cp.ReceiveMessageEndMessage; +import net.i2p.data.i2cp.ReconfigureSessionMessage; import net.i2p.data.i2cp.SendMessageMessage; +import net.i2p.data.i2cp.SendMessageExpiresMessage; import net.i2p.data.i2cp.SessionId; import net.i2p.data.i2cp.SessionStatusMessage; import net.i2p.data.i2cp.SetDateMessage; @@ -67,6 +69,9 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi case SendMessageMessage.MESSAGE_TYPE: handleSendMessage(reader, (SendMessageMessage)message); break; + case SendMessageExpiresMessage.MESSAGE_TYPE: + handleSendMessage(reader, (SendMessageExpiresMessage)message); + break; case ReceiveMessageBeginMessage.MESSAGE_TYPE: handleReceiveBegin(reader, (ReceiveMessageBeginMessage)message); break; @@ -237,6 +242,17 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi _context.jobQueue().addJob(new LookupDestJob(_context, _runner, message.getHash())); } + /** + * Message's Session ID ignored. This doesn't support removing previously set options. + * Nor do we bother with message.getSessionConfig().verifySignature() ... should we? + * + */ + private void handleReconfigureSession(I2CPMessageReader reader, ReconfigureSessionMessage message) { + if (_log.shouldLog(Log.INFO)) + _log.info("Updating options - session " + _runner.getSessionId()); + _runner.getConfig().getOptions().putAll(message.getSessionConfig().getOptions()); + } + // this *should* be mod 65536, but UnsignedInteger is still b0rked. FIXME private final static int MAX_SESSION_ID = 32767; diff --git a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java index ccef8192a..e7c369b1e 100644 --- a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java +++ b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java @@ -69,6 +69,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { */ public final static String OVERALL_TIMEOUT_MS_PARAM = "clientMessageTimeout"; private final static long OVERALL_TIMEOUT_MS_DEFAULT = 60*1000; + private final static long OVERALL_TIMEOUT_MS_MIN = 5*1000; /** priority of messages, that might get honored some day... */ private final static int SEND_PRIORITY = 500; @@ -125,23 +126,34 @@ public class OutboundClientMessageOneShotJob extends JobImpl { _to = msg.getDestination(); _toString = _to.calculateHash().toBase64().substring(0,4); _leaseSetLookupBegin = -1; - - String param = msg.getSenderConfig().getOptions().getProperty(OVERALL_TIMEOUT_MS_PARAM); - if (param == null) - param = ctx.router().getConfigSetting(OVERALL_TIMEOUT_MS_PARAM); - if (param != null) { - try { - timeoutMs = Long.parseLong(param); - } catch (NumberFormatException nfe) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Invalid client message timeout specified [" + param - + "], defaulting to " + OVERALL_TIMEOUT_MS_DEFAULT, nfe); - timeoutMs = OVERALL_TIMEOUT_MS_DEFAULT; - } - } - _start = getContext().clock().now(); - _overallExpiration = timeoutMs + _start; + + // use expiration requested by client if available, otherwise session config, + // otherwise router config, otherwise default + _overallExpiration = msg.getExpiration(); + if (_overallExpiration > 0) { + _overallExpiration = Math.max(_overallExpiration, _start + OVERALL_TIMEOUT_MS_MIN); + _overallExpiration = Math.min(_overallExpiration, _start + OVERALL_TIMEOUT_MS_DEFAULT); + if (_log.shouldLog(Log.WARN)) + _log.warn("Message Expiration (ms): " + (_overallExpiration - _start)); + } else { + String param = msg.getSenderConfig().getOptions().getProperty(OVERALL_TIMEOUT_MS_PARAM); + if (param == null) + param = ctx.router().getConfigSetting(OVERALL_TIMEOUT_MS_PARAM); + if (param != null) { + try { + timeoutMs = Long.parseLong(param); + } catch (NumberFormatException nfe) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Invalid client message timeout specified [" + param + + "], defaulting to " + OVERALL_TIMEOUT_MS_DEFAULT, nfe); + timeoutMs = OVERALL_TIMEOUT_MS_DEFAULT; + } + } + _overallExpiration = timeoutMs + _start; + if (_log.shouldLog(Log.WARN)) + _log.warn("Default Expiration (ms): " + timeoutMs); + } _finished = false; } @@ -445,6 +457,10 @@ public class OutboundClientMessageOneShotJob extends JobImpl { } boolean wantACK = true; int existingTags = GarlicMessageBuilder.estimateAvailableTags(getContext(), _leaseSet.getEncryptionKey()); + // what's the point of 5% random? possible improvements or replacements: + // - wantACK if we changed their inbound lease + // - wantACK if we changed our outbound tunnel (requires moving selectOutboundTunnel() before this) + // - wantACK if we haven't in last 1m (requires a new static cache probably) if ( (existingTags > 30) && (getContext().random().nextInt(100) >= 5) ) wantACK = false;