OCMOSJ: Cancel timeout job on reply

JobQueue: Improve removeJob()
This commit is contained in:
zzz
2019-12-03 15:43:44 +00:00
parent 9289a6daa9
commit cad3c46ea6
4 changed files with 24 additions and 7 deletions

View File

@ -1,3 +1,14 @@
2019-12-03 zzz
* NDT: Numerous fixes (ticket #2672)
* OCMOSJ: Cancel timeout job on reply
2019-12-02 zzz
* Console:
- Move restart status up in summary bar
- Process restart status first regardless of display order
* NDT: Prevent NPE on JSON parse of bad/empty input (ticket #2672)
* Update manager: Notify GeoIP type and file version
* 2019-12-01 0.9.44 released * 2019-12-01 0.9.44 released
2019-11-30 zzz 2019-11-30 zzz

View File

@ -240,8 +240,10 @@ public class JobQueue {
public void removeJob(Job job) { public void removeJob(Job job) {
synchronized (_jobLock) { synchronized (_jobLock) {
_readyJobs.remove(job); boolean removed = _timedJobs.remove(job);
_timedJobs.remove(job); // linear search, do this last
if (!removed)
_readyJobs.remove(job);
} }
} }

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */ /** deprecated */
public final static String ID = "Monotone"; public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION; public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 1; public final static long BUILD = 2;
/** for example "-test" */ /** for example "-test" */
public final static String EXTRA = ""; public final static String EXTRA = "";

View File

@ -685,8 +685,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
if (skm != null) if (skm != null)
tsh = skm.tagsDelivered(_encryptionKey, sessKey, tags); tsh = skm.tagsDelivered(_encryptionKey, sessKey, tags);
} }
onReply = new SendSuccessJob(getContext(), sessKey, tsh, replyLeaseSet);
onFail = new SendTimeoutJob(getContext(), sessKey, tsh); onFail = new SendTimeoutJob(getContext(), sessKey, tsh);
onReply = new SendSuccessJob(getContext(), sessKey, tsh, replyLeaseSet, onFail);
long expiration = Math.max(_overallExpiration, _start + REPLY_TIMEOUT_MS_MIN); long expiration = Math.max(_overallExpiration, _start + REPLY_TIMEOUT_MS_MIN);
selector = new ReplySelector(token, expiration); selector = new ReplySelector(token, expiration);
} }
@ -747,8 +747,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
} else { } else {
// We put our own timeout on the job queue before the selector expires, // We put our own timeout on the job queue before the selector expires,
// so we can keep waiting for the reply and restore the tags (success-after-failure) // so we can keep waiting for the reply and restore the tags (success-after-failure)
// The timeout job will always fire, even after success. // We cancel the timeout job in the success job
// We don't bother cancelling the timeout job as JobQueue.removeJob() is a linear search
getContext().messageRegistry().registerPending(_selector, _replyFound, null); getContext().messageRegistry().registerPending(_selector, _replyFound, null);
_replyTimeout.getTiming().setStartAfter(_overallExpiration); _replyTimeout.getTiming().setStartAfter(_overallExpiration);
getContext().jobQueue().addJob(_replyTimeout); getContext().jobQueue().addJob(_replyTimeout);
@ -1004,6 +1003,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
private final SessionKey _key; private final SessionKey _key;
private final TagSetHandle _tags; private final TagSetHandle _tags;
private final LeaseSet _deliveredLS; private final LeaseSet _deliveredLS;
private final SendTimeoutJob _replyTimeout;
/** /**
* Create a new success job that will be fired when the message encrypted with * Create a new success job that will be fired when the message encrypted with
@ -1013,12 +1013,15 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
* @param key may be null * @param key may be null
* @param tags may be null * @param tags may be null
* @param ls the delivered leaseset or null * @param ls the delivered leaseset or null
* @param timeout will be cancelled when this is run
*/ */
public SendSuccessJob(RouterContext enclosingContext, SessionKey key, TagSetHandle tags, LeaseSet ls) { public SendSuccessJob(RouterContext enclosingContext, SessionKey key,
TagSetHandle tags, LeaseSet ls, SendTimeoutJob timeout) {
super(enclosingContext); super(enclosingContext);
_key = key; _key = key;
_tags = tags; _tags = tags;
_deliveredLS = ls; _deliveredLS = ls;
_replyTimeout = timeout;
} }
public String getName() { return "Outbound client message send success"; } public String getName() { return "Outbound client message send success"; }
@ -1060,6 +1063,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
skm.tagsAcked(_encryptionKey, _key, _tags); skm.tagsAcked(_encryptionKey, _key, _tags);
} }
} }
getContext().jobQueue().removeJob(_replyTimeout);
long sendTime = getContext().clock().now() - _start; long sendTime = getContext().clock().now() - _start;
if (old == Result.FAIL) { if (old == Result.FAIL) {