* Streaming, I2CP, Client Message sending:

Pass message timeout through new I2CP message
       SendMessageExpiresMessage, so that the router
       uses the same expiration as the streaming lib.
       Should help reliability.
     * I2CP:
       Implement new I2CP message ReconfigureSessionMessage.
       Will be used for tunnel reduction.
This commit is contained in:
zzz
2009-01-20 17:22:56 +00:00
parent ab92206b77
commit 6be54942ec
14 changed files with 343 additions and 39 deletions

View File

@ -27,6 +27,7 @@ public class ClientMessage {
private SessionConfig _senderConfig;
private Hash _destinationHash;
private MessageId _messageId;
private long _expiration;
public ClientMessage() {
setPayload(null);
@ -36,6 +37,7 @@ public class ClientMessage {
setSenderConfig(null);
setDestinationHash(null);
setMessageId(null);
setExpiration(0);
}
/**
@ -91,4 +93,12 @@ public class ClientMessage {
*/
public SessionConfig getSenderConfig() { return _senderConfig; }
public void setSenderConfig(SessionConfig config) { _senderConfig = config; }
/**
* Expiration requested by the client that sent the message. This will only be available
* for locally originated messages.
*
*/
public long getExpiration() { return _expiration; }
public void setExpiration(long e) { _expiration = e; }
}

View File

@ -29,6 +29,7 @@ import net.i2p.data.i2cp.I2CPMessageReader;
import net.i2p.data.i2cp.MessageId;
import net.i2p.data.i2cp.MessageStatusMessage;
import net.i2p.data.i2cp.SendMessageMessage;
import net.i2p.data.i2cp.SendMessageExpiresMessage;
import net.i2p.data.i2cp.SessionConfig;
import net.i2p.data.i2cp.SessionId;
import net.i2p.router.Job;
@ -270,6 +271,9 @@ public class ClientConnectionRunner {
Destination dest = message.getDestination();
MessageId id = new MessageId();
id.setMessageId(getNextMessageId());
long expiration = 0;
if (message instanceof SendMessageExpiresMessage)
expiration = ((SendMessageExpiresMessage) message).getExpiration().getTime();
long beforeLock = _context.clock().now();
long inLock = 0;
synchronized (_acceptedPending) {
@ -291,7 +295,7 @@ public class ClientConnectionRunner {
// the following blocks as described above
SessionConfig cfg = _config;
if (cfg != null)
_manager.distributeMessage(cfg.getDestination(), dest, payload, id);
_manager.distributeMessage(cfg.getDestination(), dest, payload, id, expiration);
long timeToDistribute = _context.clock().now() - beforeDistribute;
if (_log.shouldLog(Log.DEBUG))
_log.warn("Time to distribute in the manager to "

View File

@ -140,7 +140,7 @@ public class ClientManager {
}
}
void distributeMessage(Destination fromDest, Destination toDest, Payload payload, MessageId msgId) {
void distributeMessage(Destination fromDest, Destination toDest, Payload payload, MessageId msgId, long expiration) {
// check if there is a runner for it
ClientConnectionRunner runner = getRunner(toDest);
if (runner != null) {
@ -168,6 +168,7 @@ public class ClientManager {
msg.setSenderConfig(runner.getConfig());
msg.setFromDestination(runner.getConfig().getDestination());
msg.setMessageId(msgId);
msg.setExpiration(expiration);
_ctx.clientMessagePool().add(msg, true);
}
}

View File

@ -21,7 +21,9 @@ import net.i2p.data.i2cp.MessageId;
import net.i2p.data.i2cp.MessagePayloadMessage;
import net.i2p.data.i2cp.ReceiveMessageBeginMessage;
import net.i2p.data.i2cp.ReceiveMessageEndMessage;
import net.i2p.data.i2cp.ReconfigureSessionMessage;
import net.i2p.data.i2cp.SendMessageMessage;
import net.i2p.data.i2cp.SendMessageExpiresMessage;
import net.i2p.data.i2cp.SessionId;
import net.i2p.data.i2cp.SessionStatusMessage;
import net.i2p.data.i2cp.SetDateMessage;
@ -67,6 +69,9 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
case SendMessageMessage.MESSAGE_TYPE:
handleSendMessage(reader, (SendMessageMessage)message);
break;
case SendMessageExpiresMessage.MESSAGE_TYPE:
handleSendMessage(reader, (SendMessageExpiresMessage)message);
break;
case ReceiveMessageBeginMessage.MESSAGE_TYPE:
handleReceiveBegin(reader, (ReceiveMessageBeginMessage)message);
break;
@ -237,6 +242,17 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
_context.jobQueue().addJob(new LookupDestJob(_context, _runner, message.getHash()));
}
/**
* Message's Session ID ignored. This doesn't support removing previously set options.
* Nor do we bother with message.getSessionConfig().verifySignature() ... should we?
*
*/
private void handleReconfigureSession(I2CPMessageReader reader, ReconfigureSessionMessage message) {
if (_log.shouldLog(Log.INFO))
_log.info("Updating options - session " + _runner.getSessionId());
_runner.getConfig().getOptions().putAll(message.getSessionConfig().getOptions());
}
// this *should* be mod 65536, but UnsignedInteger is still b0rked. FIXME
private final static int MAX_SESSION_ID = 32767;

View File

@ -69,6 +69,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
*/
public final static String OVERALL_TIMEOUT_MS_PARAM = "clientMessageTimeout";
private final static long OVERALL_TIMEOUT_MS_DEFAULT = 60*1000;
private final static long OVERALL_TIMEOUT_MS_MIN = 5*1000;
/** priority of messages, that might get honored some day... */
private final static int SEND_PRIORITY = 500;
@ -125,23 +126,34 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
_to = msg.getDestination();
_toString = _to.calculateHash().toBase64().substring(0,4);
_leaseSetLookupBegin = -1;
String param = msg.getSenderConfig().getOptions().getProperty(OVERALL_TIMEOUT_MS_PARAM);
if (param == null)
param = ctx.router().getConfigSetting(OVERALL_TIMEOUT_MS_PARAM);
if (param != null) {
try {
timeoutMs = Long.parseLong(param);
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid client message timeout specified [" + param
+ "], defaulting to " + OVERALL_TIMEOUT_MS_DEFAULT, nfe);
timeoutMs = OVERALL_TIMEOUT_MS_DEFAULT;
}
}
_start = getContext().clock().now();
_overallExpiration = timeoutMs + _start;
// use expiration requested by client if available, otherwise session config,
// otherwise router config, otherwise default
_overallExpiration = msg.getExpiration();
if (_overallExpiration > 0) {
_overallExpiration = Math.max(_overallExpiration, _start + OVERALL_TIMEOUT_MS_MIN);
_overallExpiration = Math.min(_overallExpiration, _start + OVERALL_TIMEOUT_MS_DEFAULT);
if (_log.shouldLog(Log.WARN))
_log.warn("Message Expiration (ms): " + (_overallExpiration - _start));
} else {
String param = msg.getSenderConfig().getOptions().getProperty(OVERALL_TIMEOUT_MS_PARAM);
if (param == null)
param = ctx.router().getConfigSetting(OVERALL_TIMEOUT_MS_PARAM);
if (param != null) {
try {
timeoutMs = Long.parseLong(param);
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid client message timeout specified [" + param
+ "], defaulting to " + OVERALL_TIMEOUT_MS_DEFAULT, nfe);
timeoutMs = OVERALL_TIMEOUT_MS_DEFAULT;
}
}
_overallExpiration = timeoutMs + _start;
if (_log.shouldLog(Log.WARN))
_log.warn("Default Expiration (ms): " + timeoutMs);
}
_finished = false;
}
@ -445,6 +457,10 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
}
boolean wantACK = true;
int existingTags = GarlicMessageBuilder.estimateAvailableTags(getContext(), _leaseSet.getEncryptionKey());
// what's the point of 5% random? possible improvements or replacements:
// - wantACK if we changed their inbound lease
// - wantACK if we changed our outbound tunnel (requires moving selectOutboundTunnel() before this)
// - wantACK if we haven't in last 1m (requires a new static cache probably)
if ( (existingTags > 30) && (getContext().random().nextInt(100) >= 5) )
wantACK = false;