OCMOSJ: Keep bundling LS until acked

log tweaks
This commit is contained in:
zzz
2019-10-27 12:24:08 +00:00
parent 591b994b75
commit 3d75b3dc31
2 changed files with 34 additions and 12 deletions

View File

@ -67,7 +67,7 @@ public class OutboundCache {
* *
* Concurrent. * Concurrent.
*/ */
final Map<HashPair, LeaseSet> leaseSetCache = new ConcurrentHashMap<HashPair, LeaseSet>(64); final ConcurrentHashMap<HashPair, LeaseSet> leaseSetCache = new ConcurrentHashMap<HashPair, LeaseSet>(64);
/** /**
* Use the same inbound tunnel (i.e. lease) as we did for the same destination previously, * Use the same inbound tunnel (i.e. lease) as we did for the same destination previously,

View File

@ -326,22 +326,26 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
return null; // punt return null; // punt
// If the last leaseSet we sent him is still good, don't bother sending again // If the last leaseSet we sent him is still good, don't bother sending again
LeaseSet ls = _cache.leaseSetCache.put(_hashPair, newLS); // As of 0.9.44, we do not put it in the cache here, we wait until it is acked
// and do it in SendSuccessJob.
if (!force) { if (!force) {
LeaseSet ls = _cache.leaseSetCache.get(_hashPair);
if (ls != null) { if (ls != null) {
if (ls.equals(newLS)) { if (ls.getDate() >= newLS.getDate()) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Found in cache - NOT including reply leaseset for " + _toString); _log.info(getJobId() + ": LS already acked - NOT sending reply LS to " + _toString);
return null; return null;
} else { } else {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Expired from cache - reply leaseset for " + _toString); _log.info(getJobId() + ": Expired from cache - sending reply LS to " + _toString);
} }
} else {
if (_log.shouldInfo())
_log.info(getJobId() + ": Not acked - sending reply LS to " + _toString);
} }
} }
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Added to cache - reply leaseset for " + _toString);
return newLS; return newLS;
} }
@ -367,9 +371,9 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
if (rc == 0) { if (rc == 0) {
send(); send();
} else { } else {
// shouldn't happen // shouldn't happen unless unsupported encryption
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Unable to send on a random lease, as getNext returned null (to=" + _toString + ")"); _log.warn("Got the lease but can't send to it, failure code " + rc + " (to=" + _toString + ")");
dieFatal(rc); dieFatal(rc);
} }
} }
@ -478,7 +482,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
// randomize the ordering (so leases with equal # of failures per next // randomize the ordering (so leases with equal # of failures per next
// sort are randomly ordered) // sort are randomly ordered)
Collections.shuffle(leases, getContext().random()); if (leases.size() > 1)
Collections.shuffle(leases, getContext().random());
/**** /****
if (false) { if (false) {
@ -680,7 +685,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
if (skm != null) if (skm != null)
tsh = skm.tagsDelivered(_encryptionKey, sessKey, tags); tsh = skm.tagsDelivered(_encryptionKey, sessKey, tags);
} }
onReply = new SendSuccessJob(getContext(), sessKey, tsh); onReply = new SendSuccessJob(getContext(), sessKey, tsh, replyLeaseSet);
onFail = new SendTimeoutJob(getContext(), sessKey, tsh); onFail = new SendTimeoutJob(getContext(), sessKey, tsh);
long expiration = Math.max(_overallExpiration, _start + REPLY_TIMEOUT_MS_MIN); long expiration = Math.max(_overallExpiration, _start + REPLY_TIMEOUT_MS_MIN);
selector = new ReplySelector(token, expiration); selector = new ReplySelector(token, expiration);
@ -996,6 +1001,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
private class SendSuccessJob extends JobImpl implements ReplyJob { private class SendSuccessJob extends JobImpl implements ReplyJob {
private final SessionKey _key; private final SessionKey _key;
private final TagSetHandle _tags; private final TagSetHandle _tags;
private final LeaseSet _deliveredLS;
/** /**
* Create a new success job that will be fired when the message encrypted with * Create a new success job that will be fired when the message encrypted with
@ -1004,11 +1010,13 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
* *
* @param key may be null * @param key may be null
* @param tags may be null * @param tags may be null
* @param ls the delivered leaseset or null
*/ */
public SendSuccessJob(RouterContext enclosingContext, SessionKey key, TagSetHandle tags) { public SendSuccessJob(RouterContext enclosingContext, SessionKey key, TagSetHandle tags, LeaseSet ls) {
super(enclosingContext); super(enclosingContext);
_key = key; _key = key;
_tags = tags; _tags = tags;
_deliveredLS = ls;
} }
public String getName() { return "Outbound client message send success"; } public String getName() { return "Outbound client message send success"; }
@ -1017,6 +1025,20 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
* May be run after SendTimeoutJob, will re-add the tags. * May be run after SendTimeoutJob, will re-add the tags.
*/ */
public void runJob() { public void runJob() {
if (_deliveredLS != null) {
// note that the delivered LS was acked
LeaseSet oldls = _cache.leaseSetCache.putIfAbsent(_hashPair, _deliveredLS);
if (oldls != null) {
if (_deliveredLS.getDate() > oldls.getDate()) {
_cache.leaseSetCache.put(_hashPair, _deliveredLS);
if (_log.shouldInfo())
_log.info(getJobId() + ": added to cache - got reply LS from " + _toString);
}
} else {
if (_log.shouldInfo())
_log.info(getJobId() + ": added to cache - got reply LS from " + _toString);
}
}
// do we leak tags here? // do we leak tags here?
Result old; Result old;
// never succeed twice but we can succeed after fail // never succeed twice but we can succeed after fail