forked from I2P_Developers/i2p.i2p
* Router:
- Look for DeliveryStatusMessages beyond the message expiration, so we don't throw out a tagset that gets acked late - Allow re-adding of a "failed" tagset to the SKM - Extend max message age in MessageValidator - Remove unused and confusing timeout param when registering a selector - Log tweaks, javadocs, cleanups
This commit is contained in:
@ -318,8 +318,7 @@ public class TransientSessionKeyManager extends SessionKeyManager {
|
|||||||
private OutboundSession createAndReturnSession(PublicKey target, SessionKey key) {
|
private OutboundSession createAndReturnSession(PublicKey target, SessionKey key) {
|
||||||
if (_log.shouldLog(Log.INFO))
|
if (_log.shouldLog(Log.INFO))
|
||||||
_log.info("New OB session, sesskey: " + key + " target: " + toString(target));
|
_log.info("New OB session, sesskey: " + key + " target: " + toString(target));
|
||||||
OutboundSession sess = new OutboundSession(_context, _log, target);
|
OutboundSession sess = new OutboundSession(_context, _log, target, key);
|
||||||
sess.setCurrentKey(key);
|
|
||||||
addSession(sess);
|
addSession(sess);
|
||||||
return sess;
|
return sess;
|
||||||
}
|
}
|
||||||
@ -476,6 +475,7 @@ public class TransientSessionKeyManager extends SessionKeyManager {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Mark these tags as acked, start to use them (if we haven't already)
|
* Mark these tags as acked, start to use them (if we haven't already)
|
||||||
|
* If the set was previously failed, it will be added back in.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void tagsAcked(PublicKey target, SessionKey key, TagSetHandle ts) {
|
public void tagsAcked(PublicKey target, SessionKey key, TagSetHandle ts) {
|
||||||
@ -822,11 +822,13 @@ public class TransientSessionKeyManager extends SessionKeyManager {
|
|||||||
* on the callers to call failTags() or ackTags() to remove them from this list.
|
* on the callers to call failTags() or ackTags() to remove them from this list.
|
||||||
* Actually we now do a failsafe expire.
|
* Actually we now do a failsafe expire.
|
||||||
* Synch on _tagSets to access this.
|
* Synch on _tagSets to access this.
|
||||||
|
* No particular order.
|
||||||
*/
|
*/
|
||||||
private final List<TagSet> _unackedTagSets;
|
private final Set<TagSet> _unackedTagSets;
|
||||||
/**
|
/**
|
||||||
* As tagsets are acked, they go here.
|
* As tagsets are acked, they go here.
|
||||||
* After the first ack, new tagsets go here (i.e. presumed acked)
|
* After the first ack, new tagsets go here (i.e. presumed acked)
|
||||||
|
* In order, earliest first.
|
||||||
*/
|
*/
|
||||||
private final List<TagSet> _tagSets;
|
private final List<TagSet> _tagSets;
|
||||||
/**
|
/**
|
||||||
@ -844,20 +846,15 @@ public class TransientSessionKeyManager extends SessionKeyManager {
|
|||||||
|
|
||||||
private static final int MAX_FAILS = 2;
|
private static final int MAX_FAILS = 2;
|
||||||
|
|
||||||
public OutboundSession(I2PAppContext ctx, Log log, PublicKey target) {
|
public OutboundSession(I2PAppContext ctx, Log log, PublicKey target, SessionKey key) {
|
||||||
this(ctx, log, target, null, ctx.clock().now(), ctx.clock().now(), new ArrayList<TagSet>());
|
|
||||||
}
|
|
||||||
|
|
||||||
OutboundSession(I2PAppContext ctx, Log log, PublicKey target, SessionKey curKey,
|
|
||||||
long established, long lastUsed, List<TagSet> tagSets) {
|
|
||||||
_context = ctx;
|
_context = ctx;
|
||||||
_log = log;
|
_log = log;
|
||||||
_target = target;
|
_target = target;
|
||||||
_currentKey = curKey;
|
_currentKey = key;
|
||||||
_established = established;
|
_established = ctx.clock().now();
|
||||||
_lastUsed = lastUsed;
|
_lastUsed = _established;
|
||||||
_unackedTagSets = tagSets;
|
_unackedTagSets = new HashSet<TagSet>(4);
|
||||||
_tagSets = new ArrayList<TagSet>();
|
_tagSets = new ArrayList<TagSet>(6);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -878,6 +875,7 @@ public class TransientSessionKeyManager extends SessionKeyManager {
|
|||||||
* got an ack for these tags
|
* got an ack for these tags
|
||||||
* For tagsets delivered after the session was acked, this is a nop
|
* For tagsets delivered after the session was acked, this is a nop
|
||||||
* because the tagset was originally placed directly on the acked list.
|
* because the tagset was originally placed directly on the acked list.
|
||||||
|
* If the set was previously failed, it will be added back in.
|
||||||
*/
|
*/
|
||||||
void ackTags(TagSet set) {
|
void ackTags(TagSet set) {
|
||||||
synchronized (_tagSets) {
|
synchronized (_tagSets) {
|
||||||
@ -885,10 +883,13 @@ public class TransientSessionKeyManager extends SessionKeyManager {
|
|||||||
// we could perhaps use it even if not previuosly in unacked,
|
// we could perhaps use it even if not previuosly in unacked,
|
||||||
// i.e. it was expired already, but _tagSets is a list not a set...
|
// i.e. it was expired already, but _tagSets is a list not a set...
|
||||||
_tagSets.add(set);
|
_tagSets.add(set);
|
||||||
} else if (_log.shouldLog(Log.WARN)) {
|
} else if (!_tagSets.contains(set)) {
|
||||||
if(!_tagSets.contains(set))
|
// add back (sucess after fail)
|
||||||
_log.warn("Ack of unknown tagset: " + set);
|
_tagSets.add(set);
|
||||||
else if (set.getAcked())
|
if (_log.shouldLog(Log.WARN))
|
||||||
|
_log.warn("Ack of unknown (previously failed?) tagset: " + set);
|
||||||
|
} else if (set.getAcked()) {
|
||||||
|
if (_log.shouldLog(Log.WARN))
|
||||||
_log.warn("Dup ack of tagset: " + set);
|
_log.warn("Dup ack of tagset: " + set);
|
||||||
}
|
}
|
||||||
_acked = true;
|
_acked = true;
|
||||||
@ -1196,6 +1197,7 @@ public class TransientSessionKeyManager extends SessionKeyManager {
|
|||||||
buf.append("TagSet #").append(_id).append(" created: ").append(new Date(_date));
|
buf.append("TagSet #").append(_id).append(" created: ").append(new Date(_date));
|
||||||
buf.append(" Session key: ").append(_key);
|
buf.append(" Session key: ").append(_key);
|
||||||
buf.append(" Size: ").append(_sessionTags.size());
|
buf.append(" Size: ").append(_sessionTags.size());
|
||||||
|
buf.append('/').append(_origSize);
|
||||||
buf.append(" Acked? ").append(_acked);
|
buf.append(" Acked? ").append(_acked);
|
||||||
return buf.toString();
|
return buf.toString();
|
||||||
}
|
}
|
||||||
|
@ -1,3 +1,11 @@
|
|||||||
|
2014-03-07 zzz
|
||||||
|
* Router:
|
||||||
|
- Look for DeliveryStatusMessages beyond the message expiration,
|
||||||
|
so we don't throw out a tagset that gets acked late
|
||||||
|
- Allow re-adding of a "failed" tagset to the SKM
|
||||||
|
- Extend max message age in MessageValidator
|
||||||
|
- Remove unused and confusing timeout param when registering a selector
|
||||||
|
|
||||||
2014-03-06 zzz
|
2014-03-06 zzz
|
||||||
* Router: Encrypt DeliveryStatusMessages sent in garlics (ticket #1217)
|
* Router: Encrypt DeliveryStatusMessages sent in garlics (ticket #1217)
|
||||||
|
|
||||||
|
@ -50,12 +50,13 @@ public class MessageValidator {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Only check the expiration for the message
|
* Only check the expiration for the message
|
||||||
*/
|
*/
|
||||||
public String validateMessage(long expiration) {
|
public String validateMessage(long expiration) {
|
||||||
long now = _context.clock().now();
|
long now = _context.clock().now();
|
||||||
if (now - Router.CLOCK_FUDGE_FACTOR >= expiration) {
|
if (now - (Router.CLOCK_FUDGE_FACTOR * 3 / 2) >= expiration) {
|
||||||
if (_log.shouldLog(Log.INFO))
|
if (_log.shouldLog(Log.INFO))
|
||||||
_log.info("Rejecting message because it expired " + (now-expiration) + "ms ago");
|
_log.info("Rejecting message because it expired " + (now-expiration) + "ms ago");
|
||||||
_context.statManager().addRateData("router.invalidMessageTime", (now-expiration), 0);
|
_context.statManager().addRateData("router.invalidMessageTime", (now-expiration), 0);
|
||||||
|
@ -81,11 +81,11 @@ public class OutNetMessage implements CDPQEntry {
|
|||||||
public static final int PRIORITY_LOWEST = 100;
|
public static final int PRIORITY_LOWEST = 100;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Null msg and target (used in OutboundMessageRegistry only)
|
* Null msg and target, zero expiration (used in OutboundMessageRegistry only)
|
||||||
* @since 0.9.9
|
* @since 0.9.9
|
||||||
*/
|
*/
|
||||||
public OutNetMessage(RouterContext context, long expiration) {
|
public OutNetMessage(RouterContext context) {
|
||||||
this(context, null, expiration, -1, null);
|
this(context, null, 0, -1, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -182,13 +182,13 @@ public class OutNetMessage implements CDPQEntry {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Specifies the router to which the message should be delivered.
|
* Specifies the router to which the message should be delivered.
|
||||||
*
|
* Generally non-null but may be null in special cases.
|
||||||
*/
|
*/
|
||||||
public RouterInfo getTarget() { return _target; }
|
public RouterInfo getTarget() { return _target; }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Specifies the message to be sent
|
* Specifies the message to be sent.
|
||||||
*
|
* Generally non-null but may be null in special cases.
|
||||||
*/
|
*/
|
||||||
public I2NPMessage getMessage() { return _message; }
|
public I2NPMessage getMessage() { return _message; }
|
||||||
|
|
||||||
|
@ -18,7 +18,7 @@ public class RouterVersion {
|
|||||||
/** deprecated */
|
/** deprecated */
|
||||||
public final static String ID = "Monotone";
|
public final static String ID = "Monotone";
|
||||||
public final static String VERSION = CoreVersion.VERSION;
|
public final static String VERSION = CoreVersion.VERSION;
|
||||||
public final static long BUILD = 14;
|
public final static long BUILD = 15;
|
||||||
|
|
||||||
/** for example "-test" */
|
/** for example "-test" */
|
||||||
public final static String EXTRA = "";
|
public final static String EXTRA = "";
|
||||||
|
@ -2,15 +2,16 @@ package net.i2p.router.message;
|
|||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.Date;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
|
|
||||||
import net.i2p.client.SendMessageOptions;
|
import net.i2p.client.SendMessageOptions;
|
||||||
import net.i2p.crypto.SessionKeyManager;
|
import net.i2p.crypto.SessionKeyManager;
|
||||||
import net.i2p.crypto.TagSetHandle;
|
import net.i2p.crypto.TagSetHandle;
|
||||||
import net.i2p.data.Certificate;
|
import net.i2p.data.Certificate;
|
||||||
|
import net.i2p.data.DataHelper;
|
||||||
import net.i2p.data.Destination;
|
import net.i2p.data.Destination;
|
||||||
import net.i2p.data.Hash;
|
import net.i2p.data.Hash;
|
||||||
import net.i2p.data.Lease;
|
import net.i2p.data.Lease;
|
||||||
@ -56,10 +57,10 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
|||||||
private LeaseSet _leaseSet;
|
private LeaseSet _leaseSet;
|
||||||
/** Actual lease the message is being routed through */
|
/** Actual lease the message is being routed through */
|
||||||
private Lease _lease;
|
private Lease _lease;
|
||||||
private PayloadGarlicConfig _clove;
|
|
||||||
private long _cloveId;
|
|
||||||
private final long _start;
|
private final long _start;
|
||||||
private final AtomicBoolean _finished = new AtomicBoolean();
|
/** note we can succeed after failure, but not vice versa */
|
||||||
|
private enum Result {NONE, FAIL, SUCCESS}
|
||||||
|
private Result _finished = Result.NONE;
|
||||||
private long _leaseSetLookupBegin;
|
private long _leaseSetLookupBegin;
|
||||||
private TunnelInfo _outTunnel;
|
private TunnelInfo _outTunnel;
|
||||||
private TunnelInfo _inTunnel;
|
private TunnelInfo _inTunnel;
|
||||||
@ -70,6 +71,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
|||||||
*/
|
*/
|
||||||
private final OutboundCache.HashPair _hashPair;
|
private final OutboundCache.HashPair _hashPair;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* final timeout (in milliseconds) that the outbound message will fail in.
|
* final timeout (in milliseconds) that the outbound message will fail in.
|
||||||
* This can be overridden in the router.config or the client's session config
|
* This can be overridden in the router.config or the client's session config
|
||||||
@ -78,6 +80,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
|||||||
public final static String OVERALL_TIMEOUT_MS_PARAM = "clientMessageTimeout";
|
public final static String OVERALL_TIMEOUT_MS_PARAM = "clientMessageTimeout";
|
||||||
private final static long OVERALL_TIMEOUT_MS_DEFAULT = 60*1000;
|
private final static long OVERALL_TIMEOUT_MS_DEFAULT = 60*1000;
|
||||||
private final static long OVERALL_TIMEOUT_MS_MIN = 8*1000;
|
private final static long OVERALL_TIMEOUT_MS_MIN = 8*1000;
|
||||||
|
private final static long REPLY_TIMEOUT_MS_MIN = OVERALL_TIMEOUT_MS_DEFAULT - 5*1000;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* NOTE: Changed as of 0.9.2.
|
* NOTE: Changed as of 0.9.2.
|
||||||
@ -210,7 +213,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
|||||||
SendJob success = new SendJob(getContext());
|
SendJob success = new SendJob(getContext());
|
||||||
_leaseSet = getContext().netDb().lookupLeaseSetLocally(key);
|
_leaseSet = getContext().netDb().lookupLeaseSetLocally(key);
|
||||||
if (_leaseSet != null) {
|
if (_leaseSet != null) {
|
||||||
getContext().statManager().addRateData("client.leaseSetFoundLocally", 1, 0);
|
getContext().statManager().addRateData("client.leaseSetFoundLocally", 1);
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug(getJobId() + ": Send outbound client message - leaseSet found locally for " + _toString);
|
_log.debug(getJobId() + ": Send outbound client message - leaseSet found locally for " + _toString);
|
||||||
success.runJob();
|
success.runJob();
|
||||||
@ -252,16 +255,22 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
|||||||
return newLS;
|
return newLS;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** send a message to a lease */
|
/**
|
||||||
|
* Send a message to a lease.
|
||||||
|
* Note: This is generally run inline by runJob() above.
|
||||||
|
* It is only run on the job queue after a LS lookup.
|
||||||
|
*/
|
||||||
private class SendJob extends JobImpl {
|
private class SendJob extends JobImpl {
|
||||||
public SendJob(RouterContext enclosingContext) {
|
public SendJob(RouterContext enclosingContext) {
|
||||||
super(enclosingContext);
|
super(enclosingContext);
|
||||||
}
|
}
|
||||||
public String getName() { return "Outbound client message send"; }
|
|
||||||
|
public String getName() { return "Outbound client message delayed send"; }
|
||||||
|
|
||||||
public void runJob() {
|
public void runJob() {
|
||||||
if (_leaseSetLookupBegin > 0) {
|
if (_leaseSetLookupBegin > 0) {
|
||||||
long lookupTime = getContext().clock().now() - _leaseSetLookupBegin;
|
long lookupTime = getContext().clock().now() - _leaseSetLookupBegin;
|
||||||
getContext().statManager().addRateData("client.leaseSetFoundRemoteTime", lookupTime, 0);
|
getContext().statManager().addRateData("client.leaseSetFoundRemoteTime", lookupTime);
|
||||||
}
|
}
|
||||||
_wantACK = false;
|
_wantACK = false;
|
||||||
boolean ok = getNextLease();
|
boolean ok = getNextLease();
|
||||||
@ -407,10 +416,10 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
|||||||
getContext().statManager().addRateData("client.leaseSetFailedRemoteTime", lookupTime, lookupTime);
|
getContext().statManager().addRateData("client.leaseSetFailedRemoteTime", lookupTime, lookupTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!_finished.get()) {
|
//if (_finished == Result.NONE) {
|
||||||
if (_log.shouldLog(Log.WARN))
|
if (_log.shouldLog(Log.WARN))
|
||||||
_log.warn("Unable to send to " + _toString + " because we couldn't find their leaseSet");
|
_log.warn("Unable to send to " + _toString + " because we couldn't find their leaseSet");
|
||||||
}
|
//}
|
||||||
|
|
||||||
dieFatal(MessageStatusMessage.STATUS_SEND_FAILURE_NO_LEASESET);
|
dieFatal(MessageStatusMessage.STATUS_SEND_FAILURE_NO_LEASESET);
|
||||||
}
|
}
|
||||||
@ -425,7 +434,14 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
|||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
private void send() {
|
private void send() {
|
||||||
if (_finished.get()) return;
|
synchronized(this) {
|
||||||
|
if (_finished != Result.NONE) {
|
||||||
|
if (_log.shouldLog(Log.WARN))
|
||||||
|
_log.warn(OutboundClientMessageOneShotJob.this.getJobId()
|
||||||
|
+ ": SEND-AFTER-" + _finished);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
long now = getContext().clock().now();
|
long now = getContext().clock().now();
|
||||||
if (now >= _overallExpiration) {
|
if (now >= _overallExpiration) {
|
||||||
dieFatal(MessageStatusMessage.STATUS_SEND_FAILURE_EXPIRED);
|
dieFatal(MessageStatusMessage.STATUS_SEND_FAILURE_EXPIRED);
|
||||||
@ -433,6 +449,14 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
|||||||
}
|
}
|
||||||
|
|
||||||
_outTunnel = selectOutboundTunnel(_to);
|
_outTunnel = selectOutboundTunnel(_to);
|
||||||
|
if (_outTunnel == null) {
|
||||||
|
if (_log.shouldLog(Log.WARN))
|
||||||
|
_log.warn(getJobId() + ": Could not find any outbound tunnels to send the payload through... this might take a while");
|
||||||
|
getContext().statManager().addRateData("client.dispatchNoTunnels", now - _start);
|
||||||
|
dieFatal(MessageStatusMessage.STATUS_SEND_FAILURE_NO_TUNNELS);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// boolean wantACK = _wantACK || existingTags <= 30 || getContext().random().nextInt(100) < 5;
|
// boolean wantACK = _wantACK || existingTags <= 30 || getContext().random().nextInt(100) < 5;
|
||||||
// what's the point of 5% random? possible improvements or replacements:
|
// what's the point of 5% random? possible improvements or replacements:
|
||||||
// DONE (getNextLease() is called before this): wantACK if we changed their inbound lease (getNextLease() sets _wantACK)
|
// DONE (getNextLease() is called before this): wantACK if we changed their inbound lease (getNextLease() sets _wantACK)
|
||||||
@ -451,10 +475,6 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
|||||||
GarlicMessageBuilder.needsTags(getContext(), _leaseSet.getEncryptionKey(),
|
GarlicMessageBuilder.needsTags(getContext(), _leaseSet.getEncryptionKey(),
|
||||||
_from.calculateHash(), tagsRequired);
|
_from.calculateHash(), tagsRequired);
|
||||||
|
|
||||||
PublicKey key = _leaseSet.getEncryptionKey();
|
|
||||||
SessionKey sessKey = new SessionKey();
|
|
||||||
Set<SessionTag> tags = new HashSet<SessionTag>();
|
|
||||||
|
|
||||||
LeaseSet replyLeaseSet;
|
LeaseSet replyLeaseSet;
|
||||||
// Per-message flag == false overrides session option which is default true
|
// Per-message flag == false overrides session option which is default true
|
||||||
String allow = _clientMessage.getSenderConfig().getOptions().getProperty(BUNDLE_REPLY_LEASESET);
|
String allow = _clientMessage.getSenderConfig().getOptions().getProperty(BUNDLE_REPLY_LEASESET);
|
||||||
@ -482,19 +502,24 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
|||||||
token = -1;
|
token = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean ok = buildClove();
|
PayloadGarlicConfig clove = buildClove();
|
||||||
if (!ok) {
|
if (clove == null) {
|
||||||
dieFatal(MessageStatusMessage.STATUS_SEND_FAILURE_UNSUPPORTED_ENCRYPTION);
|
dieFatal(MessageStatusMessage.STATUS_SEND_FAILURE_UNSUPPORTED_ENCRYPTION);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
//if (_log.shouldLog(Log.DEBUG))
|
//if (_log.shouldLog(Log.DEBUG))
|
||||||
// _log.debug(getJobId() + ": Clove built to " + _toString);
|
// _log.debug(getJobId() + ": Clove built to " + _toString);
|
||||||
|
|
||||||
|
PublicKey key = _leaseSet.getEncryptionKey();
|
||||||
|
SessionKey sessKey = new SessionKey();
|
||||||
|
Set<SessionTag> tags = new HashSet<SessionTag>();
|
||||||
|
|
||||||
long msgExpiration = _overallExpiration; // getContext().clock().now() + OVERALL_TIMEOUT_MS_DEFAULT;
|
long msgExpiration = _overallExpiration; // getContext().clock().now() + OVERALL_TIMEOUT_MS_DEFAULT;
|
||||||
// Per-message flag > 0 overrides per-session option
|
// Per-message flag > 0 overrides per-session option
|
||||||
int tagsToSend = SendMessageOptions.getTagsToSend(sendFlags);
|
int tagsToSend = SendMessageOptions.getTagsToSend(sendFlags);
|
||||||
GarlicMessage msg = OutboundClientMessageJobHelper.createGarlicMessage(getContext(), token,
|
GarlicMessage msg = OutboundClientMessageJobHelper.createGarlicMessage(getContext(), token,
|
||||||
msgExpiration, key,
|
msgExpiration, key,
|
||||||
_clove, _from.calculateHash(),
|
clove, _from.calculateHash(),
|
||||||
_to, _inTunnel, tagsToSend,
|
_to, _inTunnel, tagsToSend,
|
||||||
tagsRequired, sessKey, tags,
|
tagsRequired, sessKey, tags,
|
||||||
wantACK, replyLeaseSet);
|
wantACK, replyLeaseSet);
|
||||||
@ -504,7 +529,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
|||||||
// we dont receive the reply? hmm...)
|
// we dont receive the reply? hmm...)
|
||||||
if (_log.shouldLog(Log.WARN))
|
if (_log.shouldLog(Log.WARN))
|
||||||
_log.warn(getJobId() + ": Unable to create the garlic message (no tunnels left or too lagged) to " + _toString);
|
_log.warn(getJobId() + ": Unable to create the garlic message (no tunnels left or too lagged) to " + _toString);
|
||||||
getContext().statManager().addRateData("client.dispatchNoTunnels", now - _start, 0);
|
getContext().statManager().addRateData("client.dispatchNoTunnels", now - _start);
|
||||||
dieFatal(MessageStatusMessage.STATUS_SEND_FAILURE_NO_TUNNELS);
|
dieFatal(MessageStatusMessage.STATUS_SEND_FAILURE_NO_TUNNELS);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -524,71 +549,95 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
|||||||
}
|
}
|
||||||
onReply = new SendSuccessJob(getContext(), sessKey, tsh);
|
onReply = new SendSuccessJob(getContext(), sessKey, tsh);
|
||||||
onFail = new SendTimeoutJob(getContext(), sessKey, tsh);
|
onFail = new SendTimeoutJob(getContext(), sessKey, tsh);
|
||||||
selector = new ReplySelector(token);
|
long expiration = Math.max(_overallExpiration, _start + REPLY_TIMEOUT_MS_MIN);
|
||||||
|
selector = new ReplySelector(token, expiration);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug(getJobId() + ": GarlicMessage in new tunnel msg for "
|
|
||||||
+ _toString + " at "
|
|
||||||
+ _lease.getTunnelId() + " on "
|
|
||||||
+ _lease.getGateway());
|
|
||||||
|
|
||||||
if (_outTunnel != null) {
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug(getJobId() + ": Sending msg out " + _outTunnel.getSendTunnelId(0) + " to "
|
_log.debug(getJobId() + ": Sending msg out " + _outTunnel.getSendTunnelId(0) + " to "
|
||||||
+ _toString + " at "
|
+ _toString + " at "
|
||||||
+ _lease.getTunnelId() + " on "
|
+ _lease.getTunnelId() + " on "
|
||||||
+ _lease.getGateway());
|
+ _lease.getGateway());
|
||||||
|
|
||||||
DispatchJob dispatchJob = new DispatchJob(getContext(), msg, selector, onReply, onFail, (int)(_overallExpiration-getContext().clock().now()));
|
DispatchJob dispatchJob = new DispatchJob(getContext(), msg, selector, onReply, onFail);
|
||||||
//if (false) // dispatch may take 100+ms, so toss it in its own job
|
//if (false) // dispatch may take 100+ms, so toss it in its own job
|
||||||
// getContext().jobQueue().addJob(dispatchJob);
|
// getContext().jobQueue().addJob(dispatchJob);
|
||||||
//else
|
//else
|
||||||
dispatchJob.runJob();
|
dispatchJob.runJob();
|
||||||
} else {
|
getContext().statManager().addRateData("client.dispatchPrepareTime", now - _start);
|
||||||
if (_log.shouldLog(Log.WARN))
|
|
||||||
_log.warn(getJobId() + ": Could not find any outbound tunnels to send the payload through... this might take a while");
|
|
||||||
getContext().statManager().addRateData("client.dispatchNoTunnels", now - _start, 0);
|
|
||||||
dieFatal(MessageStatusMessage.STATUS_SEND_FAILURE_NO_TUNNELS);
|
|
||||||
}
|
|
||||||
_clove = null;
|
|
||||||
getContext().statManager().addRateData("client.dispatchPrepareTime", now - _start, 0);
|
|
||||||
if (!wantACK)
|
if (!wantACK)
|
||||||
getContext().statManager().addRateData("client.dispatchNoACK", 1, 0);
|
getContext().statManager().addRateData("client.dispatchNoACK", 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Note: This is run inline by send(), not on the job queue.
|
||||||
|
* TODO replace with a method
|
||||||
|
*/
|
||||||
private class DispatchJob extends JobImpl {
|
private class DispatchJob extends JobImpl {
|
||||||
private final GarlicMessage _msg;
|
private final GarlicMessage _msg;
|
||||||
private final ReplySelector _selector;
|
private final ReplySelector _selector;
|
||||||
private final SendSuccessJob _replyFound;
|
private final SendSuccessJob _replyFound;
|
||||||
private final SendTimeoutJob _replyTimeout;
|
private final SendTimeoutJob _replyTimeout;
|
||||||
private final int _timeoutMs;
|
|
||||||
|
|
||||||
public DispatchJob(RouterContext ctx, GarlicMessage msg, ReplySelector sel, SendSuccessJob success, SendTimeoutJob timeout, int timeoutMs) {
|
/**
|
||||||
|
* @param sel may be null
|
||||||
|
* @param success non-null if sel non-null
|
||||||
|
* @param timeout non-null if sel non-null
|
||||||
|
*/
|
||||||
|
public DispatchJob(RouterContext ctx, GarlicMessage msg, ReplySelector sel,
|
||||||
|
SendSuccessJob success, SendTimeoutJob timeout) {
|
||||||
super(ctx);
|
super(ctx);
|
||||||
_msg = msg;
|
_msg = msg;
|
||||||
_selector = sel;
|
_selector = sel;
|
||||||
_replyFound = success;
|
_replyFound = success;
|
||||||
_replyTimeout = timeout;
|
_replyTimeout = timeout;
|
||||||
_timeoutMs = timeoutMs;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getName() { return "Outbound client message dispatch"; }
|
public String getName() { return "Outbound client message dispatch"; }
|
||||||
|
|
||||||
public void runJob() {
|
public void runJob() {
|
||||||
if (_selector != null)
|
if (_selector != null) {
|
||||||
getContext().messageRegistry().registerPending(_selector, _replyFound, _replyTimeout, _timeoutMs);
|
if (_overallExpiration >= _selector.getExpiration()) {
|
||||||
|
// We use the Message Registry to call our timeout when the selector expires
|
||||||
|
// Either the success or timeout job will fire, never both.
|
||||||
|
getContext().messageRegistry().registerPending(_selector, _replyFound, _replyTimeout);
|
||||||
|
if (_log.shouldLog(Log.INFO))
|
||||||
|
_log.info(OutboundClientMessageOneShotJob.this.getJobId() +
|
||||||
|
": Reply selector expires " +
|
||||||
|
DataHelper.formatDuration(_overallExpiration - _selector.getExpiration()) +
|
||||||
|
" before message, using selector only");
|
||||||
|
} else {
|
||||||
|
// We put our own timeout on the job queue before the selector expires,
|
||||||
|
// so we can keep waiting for the reply and restore the tags (success-after-failure)
|
||||||
|
// The timeout job will always fire, even after success.
|
||||||
|
// We don't bother cancelling the timeout job as JobQueue.removeJob() is a linear search
|
||||||
|
getContext().messageRegistry().registerPending(_selector, _replyFound, null);
|
||||||
|
_replyTimeout.getTiming().setStartAfter(_overallExpiration);
|
||||||
|
getContext().jobQueue().addJob(_replyTimeout);
|
||||||
|
if (_log.shouldLog(Log.INFO))
|
||||||
|
_log.info(OutboundClientMessageOneShotJob.this.getJobId() +
|
||||||
|
": Reply selector expires " +
|
||||||
|
DataHelper.formatDuration(_selector.getExpiration() - _overallExpiration) +
|
||||||
|
" after message, queueing separate timeout job");
|
||||||
|
}
|
||||||
|
}
|
||||||
if (_log.shouldLog(Log.INFO))
|
if (_log.shouldLog(Log.INFO))
|
||||||
_log.info(OutboundClientMessageOneShotJob.this.getJobId() +
|
_log.info(OutboundClientMessageOneShotJob.this.getJobId() +
|
||||||
": Dispatching message to " + _toString + ": " + _msg);
|
": Dispatching message to " + _toString + ": " + _msg);
|
||||||
long before = getContext().clock().now();
|
long before = getContext().clock().now();
|
||||||
|
|
||||||
|
// Note we do not have a first hop fail job, or a success job, here,
|
||||||
|
// as we do in e.g. build handler.
|
||||||
|
// Nor do we ever send a STATUS_SEND_BEST_EFFORT_SUCCESS (when no selector)
|
||||||
getContext().tunnelDispatcher().dispatchOutbound(_msg, _outTunnel.getSendTunnelId(0), _lease.getTunnelId(), _lease.getGateway());
|
getContext().tunnelDispatcher().dispatchOutbound(_msg, _outTunnel.getSendTunnelId(0), _lease.getTunnelId(), _lease.getGateway());
|
||||||
long dispatchSendTime = getContext().clock().now() - before;
|
long dispatchSendTime = getContext().clock().now() - before;
|
||||||
//if (_log.shouldLog(Log.INFO))
|
//if (_log.shouldLog(Log.INFO))
|
||||||
// _log.info(OutboundClientMessageOneShotJob.this.getJobId() +
|
// _log.info(OutboundClientMessageOneShotJob.this.getJobId() +
|
||||||
// ": Dispatching message to " + _toString + " complete");
|
// ": Dispatching message to " + _toString + " complete");
|
||||||
getContext().statManager().addRateData("client.dispatchTime", getContext().clock().now() - _start, 0);
|
// avg. 6 ms on a 2005-era PC
|
||||||
getContext().statManager().addRateData("client.dispatchSendTime", dispatchSendTime, 0);
|
getContext().statManager().addRateData("client.dispatchTime", getContext().clock().now() - _start);
|
||||||
|
// avg. 1 ms on a 2005-era PC
|
||||||
|
getContext().statManager().addRateData("client.dispatchSendTime", dispatchSendTime);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -690,10 +739,19 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
|||||||
* give up the ghost, this message just aint going through. tell the client.
|
* give up the ghost, this message just aint going through. tell the client.
|
||||||
*
|
*
|
||||||
* this is safe to call multiple times (only tells the client once)
|
* this is safe to call multiple times (only tells the client once)
|
||||||
|
* We may still succeed later.
|
||||||
*/
|
*/
|
||||||
private void dieFatal(int status) {
|
private void dieFatal(int status) {
|
||||||
if (_finished.getAndSet(true))
|
// never fail twice or fail after success
|
||||||
|
synchronized(this) {
|
||||||
|
if (_finished != Result.NONE) {
|
||||||
|
if (_log.shouldLog(Log.WARN))
|
||||||
|
_log.warn(OutboundClientMessageOneShotJob.this.getJobId()
|
||||||
|
+ ": FAIL-AFTER-" + _finished);
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
_finished = Result.FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
long sendTime = getContext().clock().now() - _start;
|
long sendTime = getContext().clock().now() - _start;
|
||||||
if (_log.shouldLog(Log.WARN))
|
if (_log.shouldLog(Log.WARN))
|
||||||
@ -713,11 +771,15 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
|||||||
getContext().messageHistory().sendPayloadMessage(_clientMessageId.getMessageId(), false, sendTime);
|
getContext().messageHistory().sendPayloadMessage(_clientMessageId.getMessageId(), false, sendTime);
|
||||||
getContext().clientManager().messageDeliveryStatusUpdate(_from, _clientMessageId, status);
|
getContext().clientManager().messageDeliveryStatusUpdate(_from, _clientMessageId, status);
|
||||||
getContext().statManager().updateFrequency("client.sendMessageFailFrequency");
|
getContext().statManager().updateFrequency("client.sendMessageFailFrequency");
|
||||||
_clove = null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** build the payload clove that will be used for all of the messages, placing the clove in the status structure */
|
/**
|
||||||
private boolean buildClove() {
|
* Build the payload clove that will be used for all of the messages,
|
||||||
|
* placing the clove in the status structure.
|
||||||
|
*
|
||||||
|
* @return null on failure
|
||||||
|
*/
|
||||||
|
private PayloadGarlicConfig buildClove() {
|
||||||
PayloadGarlicConfig clove = new PayloadGarlicConfig();
|
PayloadGarlicConfig clove = new PayloadGarlicConfig();
|
||||||
|
|
||||||
DeliveryInstructions instructions = new DeliveryInstructions();
|
DeliveryInstructions instructions = new DeliveryInstructions();
|
||||||
@ -737,10 +799,10 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
|||||||
DataMessage msg = new DataMessage(getContext());
|
DataMessage msg = new DataMessage(getContext());
|
||||||
Payload p = _clientMessage.getPayload();
|
Payload p = _clientMessage.getPayload();
|
||||||
if (p == null)
|
if (p == null)
|
||||||
return false;
|
return null;
|
||||||
byte d[] = p.getEncryptedData();
|
byte d[] = p.getEncryptedData();
|
||||||
if (d == null)
|
if (d == null)
|
||||||
return false;
|
return null;
|
||||||
msg.setData(d);
|
msg.setData(d);
|
||||||
msg.setMessageExpiration(clove.getExpiration());
|
msg.setMessageExpiration(clove.getExpiration());
|
||||||
|
|
||||||
@ -749,12 +811,10 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
|||||||
//clove.setRecipientPublicKey(null);
|
//clove.setRecipientPublicKey(null);
|
||||||
//clove.setRequestAck(false);
|
//clove.setRequestAck(false);
|
||||||
|
|
||||||
_clove = clove;
|
|
||||||
_cloveId = _clove.getId();
|
|
||||||
|
|
||||||
//if (_log.shouldLog(Log.DEBUG))
|
//if (_log.shouldLog(Log.DEBUG))
|
||||||
// _log.debug(getJobId() + ": Built payload clove with id " + clove.getId());
|
// _log.debug(getJobId() + ": Built payload clove with id " + clove.getId());
|
||||||
return true;
|
return clove;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -762,23 +822,20 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
|||||||
* sent down the various tunnels to deliver this message
|
* sent down the various tunnels to deliver this message
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
private class ReplySelector implements MessageSelector {
|
private static class ReplySelector implements MessageSelector {
|
||||||
private final long _pendingToken;
|
private final long _pendingToken;
|
||||||
|
private final long _expiration;
|
||||||
|
|
||||||
public ReplySelector(long token) {
|
public ReplySelector(long token, long expiration) {
|
||||||
_pendingToken = token;
|
_pendingToken = token;
|
||||||
//if (_log.shouldLog(Log.INFO))
|
_expiration = expiration;
|
||||||
// _log.info(OutboundClientMessageOneShotJob.this.getJobId()
|
|
||||||
// + ": Reply selector for client message: token=" + token);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean continueMatching() {
|
public boolean continueMatching() {
|
||||||
//if (_log.shouldLog(Log.DEBUG))
|
|
||||||
// _log.debug(OutboundClientMessageOneShotJob.this.getJobId()
|
|
||||||
// + ": dont continue matching for token=" + _pendingToken);
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
public long getExpiration() { return _overallExpiration; }
|
|
||||||
|
public long getExpiration() { return _expiration; }
|
||||||
|
|
||||||
public boolean isMatch(I2NPMessage inMsg) {
|
public boolean isMatch(I2NPMessage inMsg) {
|
||||||
if (inMsg.getType() == DeliveryStatusMessage.MESSAGE_TYPE) {
|
if (inMsg.getType() == DeliveryStatusMessage.MESSAGE_TYPE) {
|
||||||
@ -793,14 +850,13 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "sending " + _toString + " waiting for token " + _pendingToken
|
return "OCMOSJ.RS waiting for token " + _pendingToken + " until " + new Date(_expiration);
|
||||||
+ " for cloveId " + _cloveId;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called after we get a confirmation that the message was delivered safely
|
* Called after we get a confirmation that the message was delivered safely.
|
||||||
* (hoo-ray!)
|
* This may be run after failure.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
private class SendSuccessJob extends JobImpl implements ReplyJob {
|
private class SendSuccessJob extends JobImpl implements ReplyJob {
|
||||||
@ -810,7 +866,10 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
|||||||
/**
|
/**
|
||||||
* Create a new success job that will be fired when the message encrypted with
|
* Create a new success job that will be fired when the message encrypted with
|
||||||
* the given session key and bearing the specified tags are confirmed delivered.
|
* the given session key and bearing the specified tags are confirmed delivered.
|
||||||
|
* This is only instantiated if we are expecting a reply.
|
||||||
*
|
*
|
||||||
|
* @param key may be null
|
||||||
|
* @param tags may be null
|
||||||
*/
|
*/
|
||||||
public SendSuccessJob(RouterContext enclosingContext, SessionKey key, TagSetHandle tags) {
|
public SendSuccessJob(RouterContext enclosingContext, SessionKey key, TagSetHandle tags) {
|
||||||
super(enclosingContext);
|
super(enclosingContext);
|
||||||
@ -820,24 +879,44 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
|||||||
|
|
||||||
public String getName() { return "Outbound client message send success"; }
|
public String getName() { return "Outbound client message send success"; }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* May be run after SendTimeoutJob, will re-add the tags.
|
||||||
|
*/
|
||||||
public void runJob() {
|
public void runJob() {
|
||||||
// do we leak tags here?
|
// do we leak tags here?
|
||||||
if (_finished.getAndSet(true))
|
Result old;
|
||||||
|
// never succeed twice but we can succeed after fail
|
||||||
|
synchronized(OutboundClientMessageOneShotJob.this) {
|
||||||
|
old = _finished;
|
||||||
|
if (old == Result.SUCCESS) {
|
||||||
|
if (_log.shouldLog(Log.WARN))
|
||||||
|
_log.warn(OutboundClientMessageOneShotJob.this.getJobId()
|
||||||
|
+ ": SUCCESS-AFTER-SUCCESS");
|
||||||
return;
|
return;
|
||||||
long sendTime = getContext().clock().now() - _start;
|
}
|
||||||
if (_log.shouldLog(Log.INFO))
|
_finished = Result.SUCCESS;
|
||||||
_log.info(OutboundClientMessageOneShotJob.this.getJobId()
|
// in sync block so we don't race with SendTimeoutJob
|
||||||
+ ": SUCCESS! msg " + _clientMessageId
|
|
||||||
+ " acked by DSM after " + sendTime + "ms");
|
|
||||||
|
|
||||||
if (_key != null && _tags != null && _leaseSet != null) {
|
if (_key != null && _tags != null && _leaseSet != null) {
|
||||||
SessionKeyManager skm = getContext().clientManager().getClientSessionKeyManager(_from.calculateHash());
|
SessionKeyManager skm = getContext().clientManager().getClientSessionKeyManager(_from.calculateHash());
|
||||||
if (skm != null)
|
if (skm != null)
|
||||||
skm.tagsAcked(_leaseSet.getEncryptionKey(), _key, _tags);
|
skm.tagsAcked(_leaseSet.getEncryptionKey(), _key, _tags);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
long dataMsgId = _cloveId;
|
long sendTime = getContext().clock().now() - _start;
|
||||||
getContext().messageHistory().sendPayloadMessage(dataMsgId, true, sendTime);
|
if (old == Result.FAIL) {
|
||||||
|
if (_log.shouldLog(Log.WARN))
|
||||||
|
_log.warn(OutboundClientMessageOneShotJob.this.getJobId()
|
||||||
|
+ ": SUCCESS-AFTER-TIMEOUT " + _clientMessageId
|
||||||
|
+ " acked by DSM after " + sendTime + "ms");
|
||||||
|
} else if (_log.shouldLog(Log.INFO)) {
|
||||||
|
_log.info(OutboundClientMessageOneShotJob.this.getJobId()
|
||||||
|
+ ": SUCCESS " + _clientMessageId
|
||||||
|
+ " acked by DSM after " + sendTime + "ms");
|
||||||
|
}
|
||||||
|
|
||||||
|
//long dataMsgId = _cloveId; // fake ID 99999
|
||||||
|
getContext().messageHistory().sendPayloadMessage(99999, true, sendTime);
|
||||||
getContext().clientManager().messageDeliveryStatusUpdate(_from, _clientMessageId,
|
getContext().clientManager().messageDeliveryStatusUpdate(_from, _clientMessageId,
|
||||||
MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS);
|
MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS);
|
||||||
// unused
|
// unused
|
||||||
@ -845,7 +924,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
|||||||
|
|
||||||
int size = _clientMessageSize;
|
int size = _clientMessageSize;
|
||||||
|
|
||||||
getContext().statManager().addRateData("client.sendAckTime", sendTime, 0);
|
getContext().statManager().addRateData("client.sendAckTime", sendTime);
|
||||||
getContext().statManager().addRateData("client.sendMessageSize", _clientMessageSize, sendTime);
|
getContext().statManager().addRateData("client.sendMessageSize", _clientMessageSize, sendTime);
|
||||||
if (_outTunnel != null) {
|
if (_outTunnel != null) {
|
||||||
if (_outTunnel.getLength() > 0)
|
if (_outTunnel.getLength() > 0)
|
||||||
@ -874,6 +953,13 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
|||||||
private final SessionKey _key;
|
private final SessionKey _key;
|
||||||
private final TagSetHandle _tags;
|
private final TagSetHandle _tags;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new timeout job that will be fired when the reply is not received.
|
||||||
|
* This is only instantiated if we are expecting a reply.
|
||||||
|
*
|
||||||
|
* @param key may be null
|
||||||
|
* @param tags may be null
|
||||||
|
*/
|
||||||
public SendTimeoutJob(RouterContext enclosingContext, SessionKey key, TagSetHandle tags) {
|
public SendTimeoutJob(RouterContext enclosingContext, SessionKey key, TagSetHandle tags) {
|
||||||
super(enclosingContext);
|
super(enclosingContext);
|
||||||
_key = key;
|
_key = key;
|
||||||
@ -882,18 +968,28 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
|||||||
|
|
||||||
public String getName() { return "Outbound client message send timeout"; }
|
public String getName() { return "Outbound client message send timeout"; }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* May be run after SendSuccessJob, will have no effect.
|
||||||
|
*/
|
||||||
public void runJob() {
|
public void runJob() {
|
||||||
//if (_log.shouldLog(Log.INFO))
|
Result old;
|
||||||
// _log.info(OutboundClientMessageOneShotJob.this.getJobId()
|
// never fail after success
|
||||||
// + ": Soft timeout through the lease " + _lease);
|
synchronized(OutboundClientMessageOneShotJob.this) {
|
||||||
|
old = _finished;
|
||||||
// unused
|
if (old == Result.SUCCESS) {
|
||||||
//_lease.setNumFailure(_lease.getNumFailure()+1);
|
if (_log.shouldLog(Log.INFO))
|
||||||
|
_log.info(OutboundClientMessageOneShotJob.this.getJobId()
|
||||||
|
+ ": TIMEOUT-AFTER-SUCCESS");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// in sync block so we don't fail after success
|
||||||
if (_key != null && _tags != null && _leaseSet != null) {
|
if (_key != null && _tags != null && _leaseSet != null) {
|
||||||
SessionKeyManager skm = getContext().clientManager().getClientSessionKeyManager(_from.calculateHash());
|
SessionKeyManager skm = getContext().clientManager().getClientSessionKeyManager(_from.calculateHash());
|
||||||
if (skm != null)
|
if (skm != null)
|
||||||
skm.failTags(_leaseSet.getEncryptionKey(), _key, _tags);
|
skm.failTags(_leaseSet.getEncryptionKey(), _key, _tags);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
if (old == Result.NONE)
|
||||||
dieFatal(MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE);
|
dieFatal(MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -103,7 +103,7 @@ class FloodOnlySearchJob extends FloodSearchJob {
|
|||||||
|
|
||||||
// This OutNetMessage is never used or sent (setMessage() is never called), it's only
|
// This OutNetMessage is never used or sent (setMessage() is never called), it's only
|
||||||
// so we can register a reply selector.
|
// so we can register a reply selector.
|
||||||
_out = getContext().messageRegistry().registerPending(_replySelector, _onReply, _onTimeout, _timeoutMs);
|
_out = getContext().messageRegistry().registerPending(_replySelector, _onReply, _onTimeout);
|
||||||
|
|
||||||
/********
|
/********
|
||||||
// We need to randomize our ff selection, else we stay with the same ones since
|
// We need to randomize our ff selection, else we stay with the same ones since
|
||||||
|
@ -157,7 +157,9 @@ class FloodfillVerifyStoreJob extends JobImpl {
|
|||||||
_log.info("Starting verify (stored " + _key + " to " + _sentTo + "), asking " + _target);
|
_log.info("Starting verify (stored " + _key + " to " + _sentTo + "), asking " + _target);
|
||||||
_sendTime = getContext().clock().now();
|
_sendTime = getContext().clock().now();
|
||||||
_expiration = _sendTime + VERIFY_TIMEOUT;
|
_expiration = _sendTime + VERIFY_TIMEOUT;
|
||||||
getContext().messageRegistry().registerPending(new VerifyReplySelector(), new VerifyReplyJob(getContext()), new VerifyTimeoutJob(getContext()), VERIFY_TIMEOUT);
|
getContext().messageRegistry().registerPending(new VerifyReplySelector(),
|
||||||
|
new VerifyReplyJob(getContext()),
|
||||||
|
new VerifyTimeoutJob(getContext()));
|
||||||
getContext().tunnelDispatcher().dispatchOutbound(sent, outTunnel.getSendTunnelId(0), _target);
|
getContext().tunnelDispatcher().dispatchOutbound(sent, outTunnel.getSendTunnelId(0), _target);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -185,7 +185,7 @@ class IterativeSearchJob extends FloodSearchJob {
|
|||||||
MessageSelector replySelector = new IterativeLookupSelector(getContext(), this);
|
MessageSelector replySelector = new IterativeLookupSelector(getContext(), this);
|
||||||
ReplyJob onReply = new FloodOnlyLookupMatchJob(getContext(), this);
|
ReplyJob onReply = new FloodOnlyLookupMatchJob(getContext(), this);
|
||||||
Job onTimeout = new FloodOnlyLookupTimeoutJob(getContext(), this);
|
Job onTimeout = new FloodOnlyLookupTimeoutJob(getContext(), this);
|
||||||
_out = getContext().messageRegistry().registerPending(replySelector, onReply, onTimeout, _timeoutMs);
|
_out = getContext().messageRegistry().registerPending(replySelector, onReply, onTimeout);
|
||||||
if (_log.shouldLog(Log.INFO))
|
if (_log.shouldLog(Log.INFO))
|
||||||
_log.info(getJobId() + ": New ISJ for " +
|
_log.info(getJobId() + ": New ISJ for " +
|
||||||
(_isLease ? "LS " : "RI ") +
|
(_isLease ? "LS " : "RI ") +
|
||||||
|
@ -447,7 +447,7 @@ class SearchJob extends JobImpl {
|
|||||||
|
|
||||||
if (FloodfillNetworkDatabaseFacade.isFloodfill(router))
|
if (FloodfillNetworkDatabaseFacade.isFloodfill(router))
|
||||||
_floodfillSearchesOutstanding++;
|
_floodfillSearchesOutstanding++;
|
||||||
getContext().messageRegistry().registerPending(sel, reply, new FailedJob(getContext(), router), timeout);
|
getContext().messageRegistry().registerPending(sel, reply, new FailedJob(getContext(), router));
|
||||||
// TODO pass a priority to the dispatcher
|
// TODO pass a priority to the dispatcher
|
||||||
getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnelId, to);
|
getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnelId, to);
|
||||||
}
|
}
|
||||||
|
@ -34,7 +34,7 @@ class SingleSearchJob extends FloodOnlySearchJob {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void runJob() {
|
public void runJob() {
|
||||||
_onm = getContext().messageRegistry().registerPending(_replySelector, _onReply, _onTimeout, _timeoutMs);
|
_onm = getContext().messageRegistry().registerPending(_replySelector, _onReply, _onTimeout);
|
||||||
DatabaseLookupMessage dlm = new DatabaseLookupMessage(getContext(), true);
|
DatabaseLookupMessage dlm = new DatabaseLookupMessage(getContext(), true);
|
||||||
TunnelInfo replyTunnel = getContext().tunnelManager().selectInboundExploratoryTunnel(_to);
|
TunnelInfo replyTunnel = getContext().tunnelManager().selectInboundExploratoryTunnel(_to);
|
||||||
TunnelInfo outTunnel = getContext().tunnelManager().selectOutboundExploratoryTunnel(_to);
|
TunnelInfo outTunnel = getContext().tunnelManager().selectOutboundExploratoryTunnel(_to);
|
||||||
|
@ -376,7 +376,7 @@ class StoreJob extends JobImpl {
|
|||||||
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("sending store to " + peer.getIdentity().getHash() + " through " + outTunnel + ": " + msg);
|
_log.debug("sending store to " + peer.getIdentity().getHash() + " through " + outTunnel + ": " + msg);
|
||||||
getContext().messageRegistry().registerPending(selector, onReply, onFail, (int)(expiration - getContext().clock().now()));
|
getContext().messageRegistry().registerPending(selector, onReply, onFail);
|
||||||
getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnel.getSendTunnelId(0), null, to);
|
getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnel.getSendTunnelId(0), null, to);
|
||||||
} else {
|
} else {
|
||||||
if (_log.shouldLog(Log.WARN))
|
if (_log.shouldLog(Log.WARN))
|
||||||
@ -456,7 +456,7 @@ class StoreJob extends JobImpl {
|
|||||||
_log.debug("sending store to " + peer.getIdentity().getHash() + " through " + outTunnel + ": " + sent);
|
_log.debug("sending store to " + peer.getIdentity().getHash() + " through " + outTunnel + ": " + sent);
|
||||||
//_log.debug("Expiration is " + new Date(sent.getMessageExpiration()));
|
//_log.debug("Expiration is " + new Date(sent.getMessageExpiration()));
|
||||||
}
|
}
|
||||||
getContext().messageRegistry().registerPending(selector, onReply, onFail, (int)(expiration - getContext().clock().now()));
|
getContext().messageRegistry().registerPending(selector, onReply, onFail);
|
||||||
getContext().tunnelDispatcher().dispatchOutbound(sent, outTunnel.getSendTunnelId(0), null, to);
|
getContext().tunnelDispatcher().dispatchOutbound(sent, outTunnel.getSendTunnelId(0), null, to);
|
||||||
} else {
|
} else {
|
||||||
if (_log.shouldLog(Log.WARN))
|
if (_log.shouldLog(Log.WARN))
|
||||||
|
@ -149,7 +149,7 @@ public class PeerTestJob extends JobImpl {
|
|||||||
PeerReplyFoundJob reply = new PeerReplyFoundJob(getContext(), peer, inTunnel, outTunnel);
|
PeerReplyFoundJob reply = new PeerReplyFoundJob(getContext(), peer, inTunnel, outTunnel);
|
||||||
PeerReplyTimeoutJob timeoutJob = new PeerReplyTimeoutJob(getContext(), peer, inTunnel, outTunnel, sel);
|
PeerReplyTimeoutJob timeoutJob = new PeerReplyTimeoutJob(getContext(), peer, inTunnel, outTunnel, sel);
|
||||||
|
|
||||||
getContext().messageRegistry().registerPending(sel, reply, timeoutJob, timeoutMs);
|
getContext().messageRegistry().registerPending(sel, reply, timeoutJob);
|
||||||
getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnelId, null, peer.getIdentity().getHash());
|
getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnelId, null, peer.getIdentity().getHash());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -121,7 +121,7 @@ public class OutboundMessageRegistry {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
List<OutNetMessage> rv = null;
|
List<OutNetMessage> rv;
|
||||||
if (matchedSelectors != null) {
|
if (matchedSelectors != null) {
|
||||||
rv = new ArrayList<OutNetMessage>(matchedSelectors.size());
|
rv = new ArrayList<OutNetMessage>(matchedSelectors.size());
|
||||||
for (MessageSelector sel : matchedSelectors) {
|
for (MessageSelector sel : matchedSelectors) {
|
||||||
@ -162,19 +162,22 @@ public class OutboundMessageRegistry {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Registers a new, empty OutNetMessage, with the reply and timeout jobs specified.
|
* Registers a new, empty OutNetMessage, with the reply and timeout jobs specified.
|
||||||
|
* The onTimeout job is called at replySelector.getExpiration() (if no reply is received by then)
|
||||||
*
|
*
|
||||||
* @param replySelector non-null; The same selector may be used for more than one message.
|
* @param replySelector non-null; The same selector may be used for more than one message.
|
||||||
* @param onReply may be null
|
* @param onReply non-null
|
||||||
* @param onTimeout Also called on failed send; may be null
|
* @param onTimeout may be null
|
||||||
* @return an ONM where getMessage() is null. Use it to call unregisterPending() later if desired.
|
* @return a dummy OutNetMessage where getMessage() is null. Use it to call unregisterPending() later if desired.
|
||||||
*/
|
*/
|
||||||
public OutNetMessage registerPending(MessageSelector replySelector, ReplyJob onReply, Job onTimeout, int timeoutMs) {
|
public OutNetMessage registerPending(MessageSelector replySelector, ReplyJob onReply, Job onTimeout) {
|
||||||
OutNetMessage msg = new OutNetMessage(_context, _context.clock().now() + timeoutMs);
|
OutNetMessage msg = new OutNetMessage(_context);
|
||||||
msg.setOnFailedReplyJob(onTimeout);
|
msg.setOnFailedReplyJob(onTimeout);
|
||||||
msg.setOnFailedSendJob(onTimeout);
|
|
||||||
msg.setOnReplyJob(onReply);
|
msg.setOnReplyJob(onReply);
|
||||||
msg.setReplySelector(replySelector);
|
msg.setReplySelector(replySelector);
|
||||||
registerPending(msg, true);
|
registerPending(msg, true);
|
||||||
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
|
_log.debug("Registered: " + replySelector + " with reply job " + onReply +
|
||||||
|
" and timeout job " + onTimeout);
|
||||||
return msg;
|
return msg;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -279,6 +282,7 @@ public class OutboundMessageRegistry {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
boolean log = _log.shouldLog(Log.DEBUG);
|
||||||
if (!removing.isEmpty()) {
|
if (!removing.isEmpty()) {
|
||||||
for (MessageSelector sel : removing) {
|
for (MessageSelector sel : removing) {
|
||||||
OutNetMessage msg = null;
|
OutNetMessage msg = null;
|
||||||
@ -297,17 +301,31 @@ public class OutboundMessageRegistry {
|
|||||||
Job fail = msg.getOnFailedReplyJob();
|
Job fail = msg.getOnFailedReplyJob();
|
||||||
if (fail != null)
|
if (fail != null)
|
||||||
_context.jobQueue().addJob(fail);
|
_context.jobQueue().addJob(fail);
|
||||||
|
if (log)
|
||||||
|
_log.debug("Expired: " + sel + " with timeout job " + fail);
|
||||||
} else if (msgs != null) {
|
} else if (msgs != null) {
|
||||||
_activeMessages.removeAll(msgs);
|
_activeMessages.removeAll(msgs);
|
||||||
for (OutNetMessage m : msgs) {
|
for (OutNetMessage m : msgs) {
|
||||||
Job fail = m.getOnFailedReplyJob();
|
Job fail = m.getOnFailedReplyJob();
|
||||||
if (fail != null)
|
if (fail != null)
|
||||||
_context.jobQueue().addJob(fail);
|
_context.jobQueue().addJob(fail);
|
||||||
|
if (log)
|
||||||
|
_log.debug("Expired: " + sel + " with timeout job(s) " + fail);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
if (log)
|
||||||
|
_log.debug("Expired: " + sel + " with no known messages");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (log) {
|
||||||
|
int e = removing.size();
|
||||||
|
int r = _selectors.size();
|
||||||
|
int a = _activeMessages.size();
|
||||||
|
if (r > 0 || e > 0 || a > 0)
|
||||||
|
_log.debug("Expired: " + e + " remaining: " + r + " active: " + a);
|
||||||
|
}
|
||||||
if (_nextExpire <= now)
|
if (_nextExpire <= now)
|
||||||
_nextExpire = now + 10*1000;
|
_nextExpire = now + 10*1000;
|
||||||
schedule(_nextExpire - now);
|
schedule(_nextExpire - now);
|
||||||
|
@ -104,7 +104,7 @@ class TestJob extends JobImpl {
|
|||||||
ReplySelector sel = new ReplySelector(getContext(), m.getMessageId(), testExpiration);
|
ReplySelector sel = new ReplySelector(getContext(), m.getMessageId(), testExpiration);
|
||||||
OnTestReply onReply = new OnTestReply(getContext());
|
OnTestReply onReply = new OnTestReply(getContext());
|
||||||
OnTestTimeout onTimeout = new OnTestTimeout(getContext());
|
OnTestTimeout onTimeout = new OnTestTimeout(getContext());
|
||||||
OutNetMessage msg = getContext().messageRegistry().registerPending(sel, onReply, onTimeout, testPeriod);
|
OutNetMessage msg = getContext().messageRegistry().registerPending(sel, onReply, onTimeout);
|
||||||
onReply.setSentMessage(msg);
|
onReply.setSentMessage(msg);
|
||||||
sendTest(m);
|
sendTest(m);
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user