diff --git a/history.txt b/history.txt index 1f5fc3c543..85c6c983c6 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,6 @@ +2011-08-31 zzz + * OCMOSJ: Remove some global cache locks, other cleanups + 2011-08-30 zzz * I2CP: Cache b32 lookups client-side * I2PTunnelHTTPClient: Use existing session for b32 lookups diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 4e28a2c8d2..c10128fe62 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 5; + public final static long BUILD = 6; /** for example "-test" */ public final static String EXTRA = ""; diff --git a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java index 13cd85267a..049510fb2d 100644 --- a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java +++ b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java @@ -9,6 +9,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import net.i2p.crypto.SessionKeyManager; import net.i2p.crypto.TagSetHandle; @@ -42,14 +43,14 @@ import net.i2p.util.SimpleTimer; /** * Send a client message out a random outbound tunnel and into a random inbound - * tunnel on the target leaseSet. This also bundles the sender's leaseSet and + * tunnel on the target leaseSet. This also (sometimes) bundles the sender's leaseSet and * a DeliveryStatusMessage (for ACKing any sessionTags used in the garlic). * */ public class OutboundClientMessageOneShotJob extends JobImpl { private final Log _log; - private long _overallExpiration; - private ClientMessage _clientMessage; + private final long _overallExpiration; + private final ClientMessage _clientMessage; private final MessageId _clientMessageId; private final int _clientMessageSize; private final Destination _from; @@ -68,6 +69,80 @@ public class OutboundClientMessageOneShotJob extends JobImpl { private TunnelInfo _inTunnel; private boolean _wantACK; + /** + * This is the place where we make I2P go fast. + * + * We have five static caches. + * - The LeaseSet cache is used to decide whether to bundle our own leaseset, + * which minimizes overhead. + * - The Lease cache is used to persistently send to the same lease for the destination, + * which keeps the streaming lib happy by minimizing out-of-order delivery. + * - The Tunnel and BackloggedTunnel caches are used to persistently use the same outbound tunnel + * for the same destination, + * which keeps the streaming lib happy by minimizing out-of-order delivery. + * - The last reply requested cache ensures that a reply is requested every so often, + * so that failed tunnels are recognized. + * + */ + + /** + * Key used to cache things with, based on source + dest + */ + private final HashPair _hashPair; + + /** + * Use the same outbound tunnel as we did for the same destination previously, + * if possible, to keep the streaming lib happy + * Use two caches - although a cache of a list of tunnels per dest might be + * more elegant. + * Key the caches on the source+dest pair. + * + */ + private static final Map _tunnelCache = new HashMap(64); + + private static final Map _backloggedTunnelCache = new HashMap(64); + + /** + * Returns the reply lease set if forced to do so, + * or if configured to do so, + * or if a certain percentage of the time if configured to do so, + * or if our lease set has changed since we last talked to them, + * or 10% of the time anyway so they don't forget us (disabled for now), + * or null otherwise. + * + * Note that wantACK randomly forces us another 5% of the time. + * + * We don't want to do this too often as a typical 2-lease leaseset + * in a DatabaseStoreMessage is 861+37=898 bytes - + * when added to garlic it's a 1056-byte addition total, which is huge. + * + * Key the cache on the source+dest pair. + */ + private static final Map _leaseSetCache = new ConcurrentHashMap(64); + + /** + * Use the same inbound tunnel (i.e. lease) as we did for the same destination previously, + * if possible, to keep the streaming lib happy + * Key the caches on the source+dest pair. + * + * We're going to use the lease until it expires, as long as it remains in the current leaseSet. + * + * If not found, + * fetch the next lease that we should try sending through, randomly chosen + * from within the sorted leaseSet (NOT sorted by # of failures through each + * lease). + * + */ + private static final ConcurrentHashMap _leaseCache = new ConcurrentHashMap(64); + + /** + * This cache is used to ensure that we request a reply every so often. + * Hopefully this allows the router to recognize a failed tunnel and switch, + * before upper layers like streaming lib fail, even for low-bandwidth + * connections like IRC. + */ + private static final Map _lastReplyRequestCache = new ConcurrentHashMap(64); + /** * final timeout (in milliseconds) that the outbound message will fail in. * This can be overridden in the router.config or the client's session config @@ -119,20 +194,20 @@ public class OutboundClientMessageOneShotJob extends JobImpl { _clientMessageSize = msg.getPayload().getSize(); _from = msg.getFromDestination(); _to = msg.getDestination(); + _hashPair = new HashPair(_from.calculateHash(), _to.calculateHash()); _toString = _to.calculateHash().toBase64().substring(0,4); - _leaseSetLookupBegin = -1; _start = getContext().clock().now(); // use expiration requested by client if available, otherwise session config, // otherwise router config, otherwise default - _overallExpiration = msg.getExpiration(); - if (_overallExpiration > 0) { + long overallExpiration = msg.getExpiration(); + if (overallExpiration > 0) { // Unless it's already expired, set a min and max expiration - if (_overallExpiration <= _start) { - _overallExpiration = Math.max(_overallExpiration, _start + OVERALL_TIMEOUT_MS_MIN); - _overallExpiration = Math.min(_overallExpiration, _start + OVERALL_TIMEOUT_MS_DEFAULT); + if (overallExpiration <= _start) { + overallExpiration = Math.max(overallExpiration, _start + OVERALL_TIMEOUT_MS_MIN); + overallExpiration = Math.min(overallExpiration, _start + OVERALL_TIMEOUT_MS_DEFAULT); if (_log.shouldLog(Log.INFO)) - _log.info(getJobId() + ": Message Expiration (ms): " + (_overallExpiration - _start)); + _log.info(getJobId() + ": Message Expiration (ms): " + (overallExpiration - _start)); } else { if (_log.shouldLog(Log.WARN)) _log.warn(getJobId() + ": Expired before we got to it"); @@ -152,10 +227,11 @@ public class OutboundClientMessageOneShotJob extends JobImpl { timeoutMs = OVERALL_TIMEOUT_MS_DEFAULT; } } - _overallExpiration = timeoutMs + _start; + overallExpiration = timeoutMs + _start; if (_log.shouldLog(Log.INFO)) _log.info(getJobId() + " Default Expiration (ms): " + timeoutMs); } + _overallExpiration = overallExpiration; } /** call once only */ @@ -191,10 +267,9 @@ public class OutboundClientMessageOneShotJob extends JobImpl { long timeoutMs = _overallExpiration - now; Hash key = _to.calculateHash(); SendJob success = new SendJob(getContext()); - LeaseSet ls = getContext().netDb().lookupLeaseSetLocally(key); - if (ls != null) { + _leaseSet = getContext().netDb().lookupLeaseSetLocally(key); + if (_leaseSet != null) { getContext().statManager().addRateData("client.leaseSetFoundLocally", 1, 0); - _leaseSetLookupBegin = -1; if (_log.shouldLog(Log.DEBUG)) _log.debug(getJobId() + ": Send outbound client message - leaseSet found locally for " + _toString); success.runJob(); @@ -208,23 +283,9 @@ public class OutboundClientMessageOneShotJob extends JobImpl { } /** - * Returns the reply lease set if forced to do so, - * or if configured to do so, - * or if a certain percentage of the time if configured to do so, - * or if our lease set has changed since we last talked to them, - * or 10% of the time anyway so they don't forget us (disabled for now), - * or null otherwise. - * - * Note that wantACK randomly forces us another 5% of the time. - * - * We don't want to do this too often as a typical 2-lease leaseset - * in a DatabaseStoreMessage is 861+37=898 bytes - - * when added to garlic it's a 1056-byte addition total, which is huge. - * - * Key the cache on the source+dest pair. - */ - private final static HashMap _leaseSetCache = new HashMap(); - + * @param force to force including a reply lease set + * @return lease set or null if we should not send the lease set + */ private LeaseSet getReplyLeaseSet(boolean force) { LeaseSet newLS = getContext().netDb().lookupLeaseSetLocally(_from.calculateHash()); if (newLS == null) @@ -256,9 +317,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl { } // If the last leaseSet we sent him is still good, don't bother sending again - synchronized (_leaseSetCache) { + LeaseSet ls = _leaseSetCache.put(_hashPair, newLS); if (!force) { - LeaseSet ls = _leaseSetCache.get(hashPair()); if (ls != null) { if (ls.equals(newLS)) { // still good, send it 10% of the time @@ -275,13 +335,10 @@ public class OutboundClientMessageOneShotJob extends JobImpl { } else { if (_log.shouldLog(Log.INFO)) _log.info(getJobId() + ": Expired from cache - reply leaseset for " + _toString); - // will get overwritten below - // _leaseSetCache.remove(hashPair()); } } } - _leaseSetCache.put(hashPair(), newLS); - } + if (_log.shouldLog(Log.INFO)) _log.info(getJobId() + ": Added to cache - reply leaseset for " + _toString); return newLS; @@ -292,17 +349,18 @@ public class OutboundClientMessageOneShotJob extends JobImpl { public SendJob(RouterContext enclosingContext) { super(enclosingContext); } - public String getName() { return "Send outbound client message through the lease"; } + public String getName() { return "Outbound client message send"; } public void runJob() { if (_leaseSetLookupBegin > 0) { long lookupTime = getContext().clock().now() - _leaseSetLookupBegin; - getContext().statManager().addRateData("client.leaseSetFoundRemoteTime", lookupTime, lookupTime); + getContext().statManager().addRateData("client.leaseSetFoundRemoteTime", lookupTime, 0); } _wantACK = false; boolean ok = getNextLease(); if (ok) { send(); } else { + // shouldn't happen if (_log.shouldLog(Log.WARN)) _log.warn("Unable to send on a random lease, as getNext returned null (to=" + _toString + ")"); dieFatal(); @@ -311,32 +369,23 @@ public class OutboundClientMessageOneShotJob extends JobImpl { } /** - * Use the same inbound tunnel (i.e. lease) as we did for the same destination previously, - * if possible, to keep the streaming lib happy - * Key the caches on the source+dest pair. - * - * We're going to use the lease until it expires, as long as it remains in the current leaseSet. - * - * If not found, - * fetch the next lease that we should try sending through, randomly chosen - * from within the sorted leaseSet (NOT sorted by # of failures through each - * lease). - * + * @return success */ - private final static HashMap _leaseCache = new HashMap(); - private boolean getNextLease() { - _leaseSet = getContext().netDb().lookupLeaseSetLocally(_to.calculateHash()); + // set in runJob if found locally if (_leaseSet == null) { - if (_log.shouldLog(Log.WARN)) - _log.warn(getJobId() + ": Lookup locally didn't find the leaseSet for " + _toString); - return false; + _leaseSet = getContext().netDb().lookupLeaseSetLocally(_to.calculateHash()); + if (_leaseSet == null) { + // shouldn't happen + if (_log.shouldLog(Log.WARN)) + _log.warn(getJobId() + ": Lookup locally didn't find the leaseSet for " + _toString); + return false; + } } // Use the same lease if it's still good // Even if _leaseSet changed, _leaseSet.getEncryptionKey() didn't... - synchronized (_leaseCache) { - _lease = _leaseCache.get(hashPair()); + _lease = _leaseCache.get(_hashPair); if (_lease != null) { // if outbound tunnel length == 0 && lease.firsthop.isBacklogged() don't use it ?? if (!_lease.isExpired(Router.CLOCK_FUDGE_FACTOR)) { @@ -351,14 +400,14 @@ public class OutboundClientMessageOneShotJob extends JobImpl { } } } + // remove only if still equal to _lease (concurrent) + _leaseCache.remove(_hashPair, _lease); if (_log.shouldLog(Log.INFO)) _log.info(getJobId() + ": Expired from cache - lease for " + _toString); - _leaseCache.remove(hashPair()); } - } // get the possible leases - List leases = new ArrayList(_leaseSet.getLeaseCount()); + List leases = new ArrayList(_leaseSet.getLeaseCount()); for (int i = 0; i < _leaseSet.getLeaseCount(); i++) { Lease lease = _leaseSet.getLease(i); if (lease.isExpired(Router.CLOCK_FUDGE_FACTOR)) { @@ -401,7 +450,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { // Avoid a lease on a gateway we think is unreachable, if possible for (int i = 0; i < leases.size(); i++) { - Lease l = (Lease) leases.get(i); + Lease l = leases.get(i); /*** *** Anonymity concerns with this, as the dest could act unreachable just to us, then *** look at our lease selection. @@ -418,13 +467,11 @@ public class OutboundClientMessageOneShotJob extends JobImpl { _log.warn(getJobId() + ": Skipping unreachable gateway " + l.getGateway() + " for " + _toString); } if (_lease == null) { - _lease = (Lease)leases.get(0); + _lease = leases.get(0); if (_log.shouldLog(Log.WARN)) _log.warn(getJobId() + ": All leases are unreachable for " + _toString); } - synchronized (_leaseCache) { - _leaseCache.put(hashPair(), _lease); - } + _leaseCache.put(_hashPair, _lease); if (_log.shouldLog(Log.INFO)) _log.info(getJobId() + ": Added to cache - lease for " + _toString); _wantACK = true; @@ -433,15 +480,13 @@ public class OutboundClientMessageOneShotJob extends JobImpl { /** - * we couldn't even find the leaseSet, but try again (or die - * if we've already tried too hard) - * + * We couldn't even find the leaseSet, so die */ private class LookupLeaseSetFailedJob extends JobImpl { public LookupLeaseSetFailedJob(RouterContext enclosingContext) { super(enclosingContext); } - public String getName() { return "Lookup for outbound client message failed"; } + public String getName() { return "Outbound client message lease lookup failed"; } public void runJob() { if (_leaseSetLookupBegin > 0) { long lookupTime = getContext().clock().now() - _leaseSetLookupBegin; @@ -457,14 +502,6 @@ public class OutboundClientMessageOneShotJob extends JobImpl { } } - /** - * This cache is used to ensure that we request a reply every so often. - * Hopefully this allows the router to recognize a failed tunnel and switch, - * before upper layers like streaming lib fail, even for low-bandwidth - * connections like IRC. - */ - private final static HashMap _lastReplyRequestCache = new HashMap(); - /** * Send the message to the specified tunnel by creating a new garlic message containing * the (already created) payload clove as well as a new delivery status message. This garlic @@ -490,13 +527,12 @@ public class OutboundClientMessageOneShotJob extends JobImpl { // DONE (selectOutboundTunnel() moved above here): wantACK if we changed our outbound tunnel (selectOutboundTunnel() sets _wantACK) // DONE (added new cache): wantACK if we haven't in last 1m (requires a new static cache probably) boolean wantACK; - synchronized (_lastReplyRequestCache) { - Long lastSent = _lastReplyRequestCache.get(hashPair()); + + Long lastSent = _lastReplyRequestCache.get(_hashPair); wantACK = _wantACK || existingTags <= 30 || lastSent == null || lastSent.longValue() < now - REPLY_REQUEST_INTERVAL; if (wantACK) - _lastReplyRequestCache.put(hashPair(), Long.valueOf(now)); - } + _lastReplyRequestCache.put(_hashPair, Long.valueOf(now)); PublicKey key = _leaseSet.getEncryptionKey(); SessionKey sessKey = new SessionKey(); @@ -559,14 +595,14 @@ public class OutboundClientMessageOneShotJob extends JobImpl { _log.debug(getJobId() + ": Placing GarlicMessage into the new tunnel message bound for " + _toString + " at " + _lease.getTunnelId() + " on " - + _lease.getGateway().toBase64()); + + _lease.getGateway()); if (_outTunnel != null) { if (_log.shouldLog(Log.DEBUG)) _log.debug(getJobId() + ": Sending tunnel message out " + _outTunnel.getSendTunnelId(0) + " to " + _toString + " at " + _lease.getTunnelId() + " on " - + _lease.getGateway().toBase64()); + + _lease.getGateway()); DispatchJob dispatchJob = new DispatchJob(getContext(), msg, selector, onReply, onFail, (int)(_overallExpiration-getContext().clock().now())); //if (false) // dispatch may take 100+ms, so toss it in its own job @@ -579,7 +615,6 @@ public class OutboundClientMessageOneShotJob extends JobImpl { getContext().statManager().addRateData("client.dispatchNoTunnels", now - _start, 0); dieFatal(); } - _clientMessage = null; _clove = null; getContext().statManager().addRateData("client.dispatchPrepareTime", now - _start, 0); if (!wantACK) @@ -602,7 +637,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { _timeoutMs = timeoutMs; } - public String getName() { return "Dispatch outbound client message"; } + public String getName() { return "Outbound client message dispatch"; } public void runJob() { if (_selector != null) @@ -620,32 +655,6 @@ public class OutboundClientMessageOneShotJob extends JobImpl { getContext().statManager().addRateData("client.dispatchSendTime", dispatchSendTime, 0); } } - - /** - * This is the place where we make I2P go fast. - * - * We have five static caches. - * - The LeaseSet cache is used to decide whether to bundle our own leaseset, - * which minimizes overhead. - * - The Lease cache is used to persistently send to the same lease for the destination, - * which keeps the streaming lib happy by minimizing out-of-order delivery. - * - The Tunnel and BackloggedTunnel caches are used to persistently use the same outbound tunnel - * for the same destination, - * which keeps the streaming lib happy by minimizing out-of-order delivery. - * - The last reply requested cache ensures that a reply is requested every so often, - * so that failed tunnels are recognized. - * - */ - - /** - * Key used to cache things with based on source + dest - */ - private HashPair _hashPair; - private HashPair hashPair() { - if (_hashPair == null) - _hashPair = new HashPair(_from.calculateHash(), _to.calculateHash()); - return _hashPair; - } /** * Key used to cache things with based on source + dest @@ -671,15 +680,6 @@ public class OutboundClientMessageOneShotJob extends JobImpl { } } - - /** - * This is a little sneaky, but get the _from back out of the "opaque" hash key - * (needed for cleanTunnelCache) - */ - private static Hash sourceFromHashPair(HashPair s) { - return s.sh; - } - /** * Called on failure to give us a better chance of success next time. * Of course this is probably 60s too late. @@ -688,27 +688,21 @@ public class OutboundClientMessageOneShotJob extends JobImpl { * But it's a start. */ private void clearCaches() { - HashPair key = hashPair(); if (_inTunnel != null) { // if we wanted an ack, we sent our lease too - synchronized(_leaseSetCache) { - _leaseSetCache.remove(key); - } + _leaseSetCache.remove(_hashPair); } if (_lease != null) { - synchronized(_leaseCache) { - Lease l = _leaseCache.get(key); - if (l != null && l.equals(_lease)) - _leaseCache.remove(key); - } + // remove only if still equal to _lease (concurrent) + _leaseCache.remove(_hashPair, _lease); } if (_outTunnel != null) { synchronized(_tunnelCache) { - TunnelInfo t = _backloggedTunnelCache.get(key); + TunnelInfo t = _backloggedTunnelCache.get(_hashPair); if (t != null && t.equals(_outTunnel)) - _backloggedTunnelCache.remove(key); - t = _tunnelCache.get(key); + _backloggedTunnelCache.remove(_hashPair); + t = _tunnelCache.get(_hashPair); if (t != null && t.equals(_outTunnel)) - _tunnelCache.remove(key); + _tunnelCache.remove(_hashPair); } } } @@ -717,21 +711,17 @@ public class OutboundClientMessageOneShotJob extends JobImpl { * @since 0.8.8 */ public static void clearAllCaches() { - synchronized(_leaseSetCache) { - _leaseSetCache.clear(); - } - synchronized(_leaseCache) { - _leaseCache.clear(); - } + _leaseSetCache.clear(); + _leaseCache.clear(); synchronized(_tunnelCache) { _backloggedTunnelCache.clear(); _tunnelCache.clear(); } + _lastReplyRequestCache.clear(); } /** * Clean out old leaseSets - * Caller must synchronize on tc. */ private static void cleanLeaseSetCache(RouterContext ctx, Map tc) { long now = ctx.clock().now(); @@ -744,7 +734,6 @@ public class OutboundClientMessageOneShotJob extends JobImpl { /** * Clean out old leases - * Caller must synchronize on tc. */ private static void cleanLeaseCache(Map tc) { for (Iterator iter = tc.values().iterator(); iter.hasNext(); ) { @@ -759,23 +748,23 @@ public class OutboundClientMessageOneShotJob extends JobImpl { * Caller must synchronize on tc. */ private static void cleanTunnelCache(RouterContext ctx, Map tc) { - for (Iterator iter = tc.entrySet().iterator(); iter.hasNext(); ) { - Map.Entry entry = (Map.Entry)iter.next(); - HashPair k = (HashPair) entry.getKey(); - TunnelInfo tunnel = (TunnelInfo) entry.getValue(); - if (!ctx.tunnelManager().isValidTunnel(sourceFromHashPair(k), tunnel)) + for (Iterator> iter = tc.entrySet().iterator(); iter.hasNext(); ) { + Map.Entry entry = iter.next(); + HashPair k = entry.getKey(); + TunnelInfo tunnel = entry.getValue(); + // This is a little sneaky, but get the _from back out of the "opaque" hash key + if (!ctx.tunnelManager().isValidTunnel(k.sh, tunnel)) iter.remove(); } } /** * Clean out old reply times - * Caller must synchronize on tc. */ private static void cleanReplyCache(RouterContext ctx, Map tc) { long now = ctx.clock().now(); - for (Iterator iter = tc.values().iterator(); iter.hasNext(); ) { - Long l = (Long) iter.next(); + for (Iterator iter = tc.values().iterator(); iter.hasNext(); ) { + Long l = iter.next(); if (l.longValue() < now - CLEAN_INTERVAL) iter.remove(); } @@ -787,34 +776,16 @@ public class OutboundClientMessageOneShotJob extends JobImpl { _ctx = ctx; } public void timeReached() { - synchronized(_leaseSetCache) { - cleanLeaseSetCache(_ctx, _leaseSetCache); - } - synchronized(_leaseCache) { - cleanLeaseCache(_leaseCache); - } + cleanLeaseSetCache(_ctx, _leaseSetCache); + cleanLeaseCache(_leaseCache); synchronized(_tunnelCache) { cleanTunnelCache(_ctx, _tunnelCache); cleanTunnelCache(_ctx, _backloggedTunnelCache); } - synchronized(_lastReplyRequestCache) { - cleanReplyCache(_ctx, _lastReplyRequestCache); - } + cleanReplyCache(_ctx, _lastReplyRequestCache); } } - /** - * Use the same outbound tunnel as we did for the same destination previously, - * if possible, to keep the streaming lib happy - * Use two caches - although a cache of a list of tunnels per dest might be - * more elegant. - * Key the caches on the source+dest pair. - * - */ - private static final HashMap _tunnelCache = new HashMap(); - - private static HashMap _backloggedTunnelCache = new HashMap(); - private TunnelInfo selectOutboundTunnel(Destination to) { TunnelInfo tunnel; synchronized (_tunnelCache) { @@ -824,22 +795,22 @@ public class OutboundClientMessageOneShotJob extends JobImpl { * if you were the originator by backlogging the tunnel, then removing the * backlog and seeing if traffic came back or not. */ - tunnel = _backloggedTunnelCache.get(hashPair()); + tunnel = _backloggedTunnelCache.get(_hashPair); if (tunnel != null) { if (getContext().tunnelManager().isValidTunnel(_from.calculateHash(), tunnel)) { if (!getContext().commSystem().isBacklogged(tunnel.getPeer(1))) { if (_log.shouldLog(Log.WARN)) _log.warn("Switching back to tunnel " + tunnel + " for " + _toString); - _backloggedTunnelCache.remove(hashPair()); - _tunnelCache.put(hashPair(), tunnel); + _backloggedTunnelCache.remove(_hashPair); + _tunnelCache.put(_hashPair, tunnel); _wantACK = true; return tunnel; } // else still backlogged } else // no longer valid - _backloggedTunnelCache.remove(hashPair()); + _backloggedTunnelCache.remove(_hashPair); } // Use the same tunnel unless backlogged - tunnel = _tunnelCache.get(hashPair()); + tunnel = _tunnelCache.get(_hashPair); if (tunnel != null) { if (getContext().tunnelManager().isValidTunnel(_from.calculateHash(), tunnel)) { if (tunnel.getLength() <= 1 || !getContext().commSystem().isBacklogged(tunnel.getPeer(1))) @@ -847,28 +818,34 @@ public class OutboundClientMessageOneShotJob extends JobImpl { // backlogged if (_log.shouldLog(Log.WARN)) _log.warn("Switching from backlogged " + tunnel + " for " + _toString); - _backloggedTunnelCache.put(hashPair(), tunnel); + _backloggedTunnelCache.put(_hashPair, tunnel); } // else no longer valid - _tunnelCache.remove(hashPair()); + _tunnelCache.remove(_hashPair); } // Pick a new tunnel tunnel = selectOutboundTunnel(); if (tunnel != null) - _tunnelCache.put(hashPair(), tunnel); + _tunnelCache.put(_hashPair, tunnel); _wantACK = true; } return tunnel; } + /** * Pick an arbitrary outbound tunnel to send the message through, or null if * there aren't any around * + * TODO - rather than pick one at random, pick the "closest" to the lease, + * to minimize network OBEP - IBGW connections? + * This would also eliminate a connection when OBEP == IBGW. + * Anonymity issues? */ private TunnelInfo selectOutboundTunnel() { return getContext().tunnelManager().selectOutboundTunnel(_from.calculateHash()); } + /** - * Pick an arbitrary outbound tunnel for any deliveryStatusMessage to come back in + * Pick an arbitrary inbound tunnel for any deliveryStatusMessage to come back in * */ private TunnelInfo selectInboundTunnel() { @@ -902,7 +879,6 @@ public class OutboundClientMessageOneShotJob extends JobImpl { getContext().messageHistory().sendPayloadMessage(_clientMessageId.getMessageId(), false, sendTime); getContext().clientManager().messageDeliveryStatusUpdate(_from, _clientMessageId, false); getContext().statManager().updateFrequency("client.sendMessageFailFrequency"); - _clientMessage = null; _clove = null; } @@ -955,9 +931,9 @@ public class OutboundClientMessageOneShotJob extends JobImpl { public ReplySelector(long token) { _pendingToken = token; - if (_log.shouldLog(Log.INFO)) - _log.info(OutboundClientMessageOneShotJob.this.getJobId() - + ": Reply selector for client message: token=" + token); + //if (_log.shouldLog(Log.INFO)) + // _log.info(OutboundClientMessageOneShotJob.this.getJobId() + // + ": Reply selector for client message: token=" + token); } public boolean continueMatching() { @@ -1006,7 +982,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { _tags = tags; } - public String getName() { return "Send client message successful"; } + public String getName() { return "Outbound client message send success"; } public void runJob() { // do we leak tags here? @@ -1067,7 +1043,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { _tags = tags; } - public String getName() { return "Send client message timed out"; } + public String getName() { return "Outbound client message send timeout"; } public void runJob() { if (_log.shouldLog(Log.INFO))