* OCMOSJ: Remove some global cache locks, other cleanups

This commit is contained in:
zzz
2011-08-31 12:52:22 +00:00
parent bd7e655788
commit b328b47bf4
3 changed files with 166 additions and 187 deletions

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 5;
public final static long BUILD = 6;
/** for example "-test" */
public final static String EXTRA = "";

View File

@ -9,6 +9,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import net.i2p.crypto.SessionKeyManager;
import net.i2p.crypto.TagSetHandle;
@ -42,14 +43,14 @@ import net.i2p.util.SimpleTimer;
/**
* Send a client message out a random outbound tunnel and into a random inbound
* tunnel on the target leaseSet. This also bundles the sender's leaseSet and
* tunnel on the target leaseSet. This also (sometimes) bundles the sender's leaseSet and
* a DeliveryStatusMessage (for ACKing any sessionTags used in the garlic).
*
*/
public class OutboundClientMessageOneShotJob extends JobImpl {
private final Log _log;
private long _overallExpiration;
private ClientMessage _clientMessage;
private final long _overallExpiration;
private final ClientMessage _clientMessage;
private final MessageId _clientMessageId;
private final int _clientMessageSize;
private final Destination _from;
@ -68,6 +69,80 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
private TunnelInfo _inTunnel;
private boolean _wantACK;
/**
* This is the place where we make I2P go fast.
*
* We have five static caches.
* - The LeaseSet cache is used to decide whether to bundle our own leaseset,
* which minimizes overhead.
* - The Lease cache is used to persistently send to the same lease for the destination,
* which keeps the streaming lib happy by minimizing out-of-order delivery.
* - The Tunnel and BackloggedTunnel caches are used to persistently use the same outbound tunnel
* for the same destination,
* which keeps the streaming lib happy by minimizing out-of-order delivery.
* - The last reply requested cache ensures that a reply is requested every so often,
* so that failed tunnels are recognized.
*
*/
/**
* Key used to cache things with, based on source + dest
*/
private final HashPair _hashPair;
/**
* Use the same outbound tunnel as we did for the same destination previously,
* if possible, to keep the streaming lib happy
* Use two caches - although a cache of a list of tunnels per dest might be
* more elegant.
* Key the caches on the source+dest pair.
*
*/
private static final Map<HashPair, TunnelInfo> _tunnelCache = new HashMap(64);
private static final Map<HashPair, TunnelInfo> _backloggedTunnelCache = new HashMap(64);
/**
* Returns the reply lease set if forced to do so,
* or if configured to do so,
* or if a certain percentage of the time if configured to do so,
* or if our lease set has changed since we last talked to them,
* or 10% of the time anyway so they don't forget us (disabled for now),
* or null otherwise.
*
* Note that wantACK randomly forces us another 5% of the time.
*
* We don't want to do this too often as a typical 2-lease leaseset
* in a DatabaseStoreMessage is 861+37=898 bytes -
* when added to garlic it's a 1056-byte addition total, which is huge.
*
* Key the cache on the source+dest pair.
*/
private static final Map<HashPair, LeaseSet> _leaseSetCache = new ConcurrentHashMap(64);
/**
* Use the same inbound tunnel (i.e. lease) as we did for the same destination previously,
* if possible, to keep the streaming lib happy
* Key the caches on the source+dest pair.
*
* We're going to use the lease until it expires, as long as it remains in the current leaseSet.
*
* If not found,
* fetch the next lease that we should try sending through, randomly chosen
* from within the sorted leaseSet (NOT sorted by # of failures through each
* lease).
*
*/
private static final ConcurrentHashMap<HashPair, Lease> _leaseCache = new ConcurrentHashMap(64);
/**
* This cache is used to ensure that we request a reply every so often.
* Hopefully this allows the router to recognize a failed tunnel and switch,
* before upper layers like streaming lib fail, even for low-bandwidth
* connections like IRC.
*/
private static final Map<HashPair, Long> _lastReplyRequestCache = new ConcurrentHashMap(64);
/**
* final timeout (in milliseconds) that the outbound message will fail in.
* This can be overridden in the router.config or the client's session config
@ -119,20 +194,20 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
_clientMessageSize = msg.getPayload().getSize();
_from = msg.getFromDestination();
_to = msg.getDestination();
_hashPair = new HashPair(_from.calculateHash(), _to.calculateHash());
_toString = _to.calculateHash().toBase64().substring(0,4);
_leaseSetLookupBegin = -1;
_start = getContext().clock().now();
// use expiration requested by client if available, otherwise session config,
// otherwise router config, otherwise default
_overallExpiration = msg.getExpiration();
if (_overallExpiration > 0) {
long overallExpiration = msg.getExpiration();
if (overallExpiration > 0) {
// Unless it's already expired, set a min and max expiration
if (_overallExpiration <= _start) {
_overallExpiration = Math.max(_overallExpiration, _start + OVERALL_TIMEOUT_MS_MIN);
_overallExpiration = Math.min(_overallExpiration, _start + OVERALL_TIMEOUT_MS_DEFAULT);
if (overallExpiration <= _start) {
overallExpiration = Math.max(overallExpiration, _start + OVERALL_TIMEOUT_MS_MIN);
overallExpiration = Math.min(overallExpiration, _start + OVERALL_TIMEOUT_MS_DEFAULT);
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Message Expiration (ms): " + (_overallExpiration - _start));
_log.info(getJobId() + ": Message Expiration (ms): " + (overallExpiration - _start));
} else {
if (_log.shouldLog(Log.WARN))
_log.warn(getJobId() + ": Expired before we got to it");
@ -152,10 +227,11 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
timeoutMs = OVERALL_TIMEOUT_MS_DEFAULT;
}
}
_overallExpiration = timeoutMs + _start;
overallExpiration = timeoutMs + _start;
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + " Default Expiration (ms): " + timeoutMs);
}
_overallExpiration = overallExpiration;
}
/** call once only */
@ -191,10 +267,9 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
long timeoutMs = _overallExpiration - now;
Hash key = _to.calculateHash();
SendJob success = new SendJob(getContext());
LeaseSet ls = getContext().netDb().lookupLeaseSetLocally(key);
if (ls != null) {
_leaseSet = getContext().netDb().lookupLeaseSetLocally(key);
if (_leaseSet != null) {
getContext().statManager().addRateData("client.leaseSetFoundLocally", 1, 0);
_leaseSetLookupBegin = -1;
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": Send outbound client message - leaseSet found locally for " + _toString);
success.runJob();
@ -208,23 +283,9 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
}
/**
* Returns the reply lease set if forced to do so,
* or if configured to do so,
* or if a certain percentage of the time if configured to do so,
* or if our lease set has changed since we last talked to them,
* or 10% of the time anyway so they don't forget us (disabled for now),
* or null otherwise.
*
* Note that wantACK randomly forces us another 5% of the time.
*
* We don't want to do this too often as a typical 2-lease leaseset
* in a DatabaseStoreMessage is 861+37=898 bytes -
* when added to garlic it's a 1056-byte addition total, which is huge.
*
* Key the cache on the source+dest pair.
*/
private final static HashMap<HashPair, LeaseSet> _leaseSetCache = new HashMap();
* @param force to force including a reply lease set
* @return lease set or null if we should not send the lease set
*/
private LeaseSet getReplyLeaseSet(boolean force) {
LeaseSet newLS = getContext().netDb().lookupLeaseSetLocally(_from.calculateHash());
if (newLS == null)
@ -256,9 +317,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
}
// If the last leaseSet we sent him is still good, don't bother sending again
synchronized (_leaseSetCache) {
LeaseSet ls = _leaseSetCache.put(_hashPair, newLS);
if (!force) {
LeaseSet ls = _leaseSetCache.get(hashPair());
if (ls != null) {
if (ls.equals(newLS)) {
// still good, send it 10% of the time
@ -275,13 +335,10 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
} else {
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Expired from cache - reply leaseset for " + _toString);
// will get overwritten below
// _leaseSetCache.remove(hashPair());
}
}
}
_leaseSetCache.put(hashPair(), newLS);
}
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Added to cache - reply leaseset for " + _toString);
return newLS;
@ -292,17 +349,18 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
public SendJob(RouterContext enclosingContext) {
super(enclosingContext);
}
public String getName() { return "Send outbound client message through the lease"; }
public String getName() { return "Outbound client message send"; }
public void runJob() {
if (_leaseSetLookupBegin > 0) {
long lookupTime = getContext().clock().now() - _leaseSetLookupBegin;
getContext().statManager().addRateData("client.leaseSetFoundRemoteTime", lookupTime, lookupTime);
getContext().statManager().addRateData("client.leaseSetFoundRemoteTime", lookupTime, 0);
}
_wantACK = false;
boolean ok = getNextLease();
if (ok) {
send();
} else {
// shouldn't happen
if (_log.shouldLog(Log.WARN))
_log.warn("Unable to send on a random lease, as getNext returned null (to=" + _toString + ")");
dieFatal();
@ -311,32 +369,23 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
}
/**
* Use the same inbound tunnel (i.e. lease) as we did for the same destination previously,
* if possible, to keep the streaming lib happy
* Key the caches on the source+dest pair.
*
* We're going to use the lease until it expires, as long as it remains in the current leaseSet.
*
* If not found,
* fetch the next lease that we should try sending through, randomly chosen
* from within the sorted leaseSet (NOT sorted by # of failures through each
* lease).
*
* @return success
*/
private final static HashMap<HashPair, Lease> _leaseCache = new HashMap();
private boolean getNextLease() {
_leaseSet = getContext().netDb().lookupLeaseSetLocally(_to.calculateHash());
// set in runJob if found locally
if (_leaseSet == null) {
if (_log.shouldLog(Log.WARN))
_log.warn(getJobId() + ": Lookup locally didn't find the leaseSet for " + _toString);
return false;
_leaseSet = getContext().netDb().lookupLeaseSetLocally(_to.calculateHash());
if (_leaseSet == null) {
// shouldn't happen
if (_log.shouldLog(Log.WARN))
_log.warn(getJobId() + ": Lookup locally didn't find the leaseSet for " + _toString);
return false;
}
}
// Use the same lease if it's still good
// Even if _leaseSet changed, _leaseSet.getEncryptionKey() didn't...
synchronized (_leaseCache) {
_lease = _leaseCache.get(hashPair());
_lease = _leaseCache.get(_hashPair);
if (_lease != null) {
// if outbound tunnel length == 0 && lease.firsthop.isBacklogged() don't use it ??
if (!_lease.isExpired(Router.CLOCK_FUDGE_FACTOR)) {
@ -351,14 +400,14 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
}
}
}
// remove only if still equal to _lease (concurrent)
_leaseCache.remove(_hashPair, _lease);
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Expired from cache - lease for " + _toString);
_leaseCache.remove(hashPair());
}
}
// get the possible leases
List leases = new ArrayList(_leaseSet.getLeaseCount());
List<Lease> leases = new ArrayList(_leaseSet.getLeaseCount());
for (int i = 0; i < _leaseSet.getLeaseCount(); i++) {
Lease lease = _leaseSet.getLease(i);
if (lease.isExpired(Router.CLOCK_FUDGE_FACTOR)) {
@ -401,7 +450,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
// Avoid a lease on a gateway we think is unreachable, if possible
for (int i = 0; i < leases.size(); i++) {
Lease l = (Lease) leases.get(i);
Lease l = leases.get(i);
/***
*** Anonymity concerns with this, as the dest could act unreachable just to us, then
*** look at our lease selection.
@ -418,13 +467,11 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
_log.warn(getJobId() + ": Skipping unreachable gateway " + l.getGateway() + " for " + _toString);
}
if (_lease == null) {
_lease = (Lease)leases.get(0);
_lease = leases.get(0);
if (_log.shouldLog(Log.WARN))
_log.warn(getJobId() + ": All leases are unreachable for " + _toString);
}
synchronized (_leaseCache) {
_leaseCache.put(hashPair(), _lease);
}
_leaseCache.put(_hashPair, _lease);
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Added to cache - lease for " + _toString);
_wantACK = true;
@ -433,15 +480,13 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
/**
* we couldn't even find the leaseSet, but try again (or die
* if we've already tried too hard)
*
* We couldn't even find the leaseSet, so die
*/
private class LookupLeaseSetFailedJob extends JobImpl {
public LookupLeaseSetFailedJob(RouterContext enclosingContext) {
super(enclosingContext);
}
public String getName() { return "Lookup for outbound client message failed"; }
public String getName() { return "Outbound client message lease lookup failed"; }
public void runJob() {
if (_leaseSetLookupBegin > 0) {
long lookupTime = getContext().clock().now() - _leaseSetLookupBegin;
@ -457,14 +502,6 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
}
}
/**
* This cache is used to ensure that we request a reply every so often.
* Hopefully this allows the router to recognize a failed tunnel and switch,
* before upper layers like streaming lib fail, even for low-bandwidth
* connections like IRC.
*/
private final static HashMap<HashPair, Long> _lastReplyRequestCache = new HashMap();
/**
* Send the message to the specified tunnel by creating a new garlic message containing
* the (already created) payload clove as well as a new delivery status message. This garlic
@ -490,13 +527,12 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
// DONE (selectOutboundTunnel() moved above here): wantACK if we changed our outbound tunnel (selectOutboundTunnel() sets _wantACK)
// DONE (added new cache): wantACK if we haven't in last 1m (requires a new static cache probably)
boolean wantACK;
synchronized (_lastReplyRequestCache) {
Long lastSent = _lastReplyRequestCache.get(hashPair());
Long lastSent = _lastReplyRequestCache.get(_hashPair);
wantACK = _wantACK || existingTags <= 30 ||
lastSent == null || lastSent.longValue() < now - REPLY_REQUEST_INTERVAL;
if (wantACK)
_lastReplyRequestCache.put(hashPair(), Long.valueOf(now));
}
_lastReplyRequestCache.put(_hashPair, Long.valueOf(now));
PublicKey key = _leaseSet.getEncryptionKey();
SessionKey sessKey = new SessionKey();
@ -559,14 +595,14 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
_log.debug(getJobId() + ": Placing GarlicMessage into the new tunnel message bound for "
+ _toString + " at "
+ _lease.getTunnelId() + " on "
+ _lease.getGateway().toBase64());
+ _lease.getGateway());
if (_outTunnel != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": Sending tunnel message out " + _outTunnel.getSendTunnelId(0) + " to "
+ _toString + " at "
+ _lease.getTunnelId() + " on "
+ _lease.getGateway().toBase64());
+ _lease.getGateway());
DispatchJob dispatchJob = new DispatchJob(getContext(), msg, selector, onReply, onFail, (int)(_overallExpiration-getContext().clock().now()));
//if (false) // dispatch may take 100+ms, so toss it in its own job
@ -579,7 +615,6 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
getContext().statManager().addRateData("client.dispatchNoTunnels", now - _start, 0);
dieFatal();
}
_clientMessage = null;
_clove = null;
getContext().statManager().addRateData("client.dispatchPrepareTime", now - _start, 0);
if (!wantACK)
@ -602,7 +637,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
_timeoutMs = timeoutMs;
}
public String getName() { return "Dispatch outbound client message"; }
public String getName() { return "Outbound client message dispatch"; }
public void runJob() {
if (_selector != null)
@ -620,32 +655,6 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
getContext().statManager().addRateData("client.dispatchSendTime", dispatchSendTime, 0);
}
}
/**
* This is the place where we make I2P go fast.
*
* We have five static caches.
* - The LeaseSet cache is used to decide whether to bundle our own leaseset,
* which minimizes overhead.
* - The Lease cache is used to persistently send to the same lease for the destination,
* which keeps the streaming lib happy by minimizing out-of-order delivery.
* - The Tunnel and BackloggedTunnel caches are used to persistently use the same outbound tunnel
* for the same destination,
* which keeps the streaming lib happy by minimizing out-of-order delivery.
* - The last reply requested cache ensures that a reply is requested every so often,
* so that failed tunnels are recognized.
*
*/
/**
* Key used to cache things with based on source + dest
*/
private HashPair _hashPair;
private HashPair hashPair() {
if (_hashPair == null)
_hashPair = new HashPair(_from.calculateHash(), _to.calculateHash());
return _hashPair;
}
/**
* Key used to cache things with based on source + dest
@ -671,15 +680,6 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
}
}
/**
* This is a little sneaky, but get the _from back out of the "opaque" hash key
* (needed for cleanTunnelCache)
*/
private static Hash sourceFromHashPair(HashPair s) {
return s.sh;
}
/**
* Called on failure to give us a better chance of success next time.
* Of course this is probably 60s too late.
@ -688,27 +688,21 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
* But it's a start.
*/
private void clearCaches() {
HashPair key = hashPair();
if (_inTunnel != null) { // if we wanted an ack, we sent our lease too
synchronized(_leaseSetCache) {
_leaseSetCache.remove(key);
}
_leaseSetCache.remove(_hashPair);
}
if (_lease != null) {
synchronized(_leaseCache) {
Lease l = _leaseCache.get(key);
if (l != null && l.equals(_lease))
_leaseCache.remove(key);
}
// remove only if still equal to _lease (concurrent)
_leaseCache.remove(_hashPair, _lease);
}
if (_outTunnel != null) {
synchronized(_tunnelCache) {
TunnelInfo t = _backloggedTunnelCache.get(key);
TunnelInfo t = _backloggedTunnelCache.get(_hashPair);
if (t != null && t.equals(_outTunnel))
_backloggedTunnelCache.remove(key);
t = _tunnelCache.get(key);
_backloggedTunnelCache.remove(_hashPair);
t = _tunnelCache.get(_hashPair);
if (t != null && t.equals(_outTunnel))
_tunnelCache.remove(key);
_tunnelCache.remove(_hashPair);
}
}
}
@ -717,21 +711,17 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
* @since 0.8.8
*/
public static void clearAllCaches() {
synchronized(_leaseSetCache) {
_leaseSetCache.clear();
}
synchronized(_leaseCache) {
_leaseCache.clear();
}
_leaseSetCache.clear();
_leaseCache.clear();
synchronized(_tunnelCache) {
_backloggedTunnelCache.clear();
_tunnelCache.clear();
}
_lastReplyRequestCache.clear();
}
/**
* Clean out old leaseSets
* Caller must synchronize on tc.
*/
private static void cleanLeaseSetCache(RouterContext ctx, Map<HashPair, LeaseSet> tc) {
long now = ctx.clock().now();
@ -744,7 +734,6 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
/**
* Clean out old leases
* Caller must synchronize on tc.
*/
private static void cleanLeaseCache(Map<HashPair, Lease> tc) {
for (Iterator<Lease> iter = tc.values().iterator(); iter.hasNext(); ) {
@ -759,23 +748,23 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
* Caller must synchronize on tc.
*/
private static void cleanTunnelCache(RouterContext ctx, Map<HashPair, TunnelInfo> tc) {
for (Iterator iter = tc.entrySet().iterator(); iter.hasNext(); ) {
Map.Entry entry = (Map.Entry)iter.next();
HashPair k = (HashPair) entry.getKey();
TunnelInfo tunnel = (TunnelInfo) entry.getValue();
if (!ctx.tunnelManager().isValidTunnel(sourceFromHashPair(k), tunnel))
for (Iterator<Map.Entry<HashPair, TunnelInfo>> iter = tc.entrySet().iterator(); iter.hasNext(); ) {
Map.Entry<HashPair, TunnelInfo> entry = iter.next();
HashPair k = entry.getKey();
TunnelInfo tunnel = entry.getValue();
// This is a little sneaky, but get the _from back out of the "opaque" hash key
if (!ctx.tunnelManager().isValidTunnel(k.sh, tunnel))
iter.remove();
}
}
/**
* Clean out old reply times
* Caller must synchronize on tc.
*/
private static void cleanReplyCache(RouterContext ctx, Map<HashPair, Long> tc) {
long now = ctx.clock().now();
for (Iterator iter = tc.values().iterator(); iter.hasNext(); ) {
Long l = (Long) iter.next();
for (Iterator<Long> iter = tc.values().iterator(); iter.hasNext(); ) {
Long l = iter.next();
if (l.longValue() < now - CLEAN_INTERVAL)
iter.remove();
}
@ -787,34 +776,16 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
_ctx = ctx;
}
public void timeReached() {
synchronized(_leaseSetCache) {
cleanLeaseSetCache(_ctx, _leaseSetCache);
}
synchronized(_leaseCache) {
cleanLeaseCache(_leaseCache);
}
cleanLeaseSetCache(_ctx, _leaseSetCache);
cleanLeaseCache(_leaseCache);
synchronized(_tunnelCache) {
cleanTunnelCache(_ctx, _tunnelCache);
cleanTunnelCache(_ctx, _backloggedTunnelCache);
}
synchronized(_lastReplyRequestCache) {
cleanReplyCache(_ctx, _lastReplyRequestCache);
}
cleanReplyCache(_ctx, _lastReplyRequestCache);
}
}
/**
* Use the same outbound tunnel as we did for the same destination previously,
* if possible, to keep the streaming lib happy
* Use two caches - although a cache of a list of tunnels per dest might be
* more elegant.
* Key the caches on the source+dest pair.
*
*/
private static final HashMap<HashPair, TunnelInfo> _tunnelCache = new HashMap();
private static HashMap<HashPair, TunnelInfo> _backloggedTunnelCache = new HashMap();
private TunnelInfo selectOutboundTunnel(Destination to) {
TunnelInfo tunnel;
synchronized (_tunnelCache) {
@ -824,22 +795,22 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
* if you were the originator by backlogging the tunnel, then removing the
* backlog and seeing if traffic came back or not.
*/
tunnel = _backloggedTunnelCache.get(hashPair());
tunnel = _backloggedTunnelCache.get(_hashPair);
if (tunnel != null) {
if (getContext().tunnelManager().isValidTunnel(_from.calculateHash(), tunnel)) {
if (!getContext().commSystem().isBacklogged(tunnel.getPeer(1))) {
if (_log.shouldLog(Log.WARN))
_log.warn("Switching back to tunnel " + tunnel + " for " + _toString);
_backloggedTunnelCache.remove(hashPair());
_tunnelCache.put(hashPair(), tunnel);
_backloggedTunnelCache.remove(_hashPair);
_tunnelCache.put(_hashPair, tunnel);
_wantACK = true;
return tunnel;
} // else still backlogged
} else // no longer valid
_backloggedTunnelCache.remove(hashPair());
_backloggedTunnelCache.remove(_hashPair);
}
// Use the same tunnel unless backlogged
tunnel = _tunnelCache.get(hashPair());
tunnel = _tunnelCache.get(_hashPair);
if (tunnel != null) {
if (getContext().tunnelManager().isValidTunnel(_from.calculateHash(), tunnel)) {
if (tunnel.getLength() <= 1 || !getContext().commSystem().isBacklogged(tunnel.getPeer(1)))
@ -847,28 +818,34 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
// backlogged
if (_log.shouldLog(Log.WARN))
_log.warn("Switching from backlogged " + tunnel + " for " + _toString);
_backloggedTunnelCache.put(hashPair(), tunnel);
_backloggedTunnelCache.put(_hashPair, tunnel);
} // else no longer valid
_tunnelCache.remove(hashPair());
_tunnelCache.remove(_hashPair);
}
// Pick a new tunnel
tunnel = selectOutboundTunnel();
if (tunnel != null)
_tunnelCache.put(hashPair(), tunnel);
_tunnelCache.put(_hashPair, tunnel);
_wantACK = true;
}
return tunnel;
}
/**
* Pick an arbitrary outbound tunnel to send the message through, or null if
* there aren't any around
*
* TODO - rather than pick one at random, pick the "closest" to the lease,
* to minimize network OBEP - IBGW connections?
* This would also eliminate a connection when OBEP == IBGW.
* Anonymity issues?
*/
private TunnelInfo selectOutboundTunnel() {
return getContext().tunnelManager().selectOutboundTunnel(_from.calculateHash());
}
/**
* Pick an arbitrary outbound tunnel for any deliveryStatusMessage to come back in
* Pick an arbitrary inbound tunnel for any deliveryStatusMessage to come back in
*
*/
private TunnelInfo selectInboundTunnel() {
@ -902,7 +879,6 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
getContext().messageHistory().sendPayloadMessage(_clientMessageId.getMessageId(), false, sendTime);
getContext().clientManager().messageDeliveryStatusUpdate(_from, _clientMessageId, false);
getContext().statManager().updateFrequency("client.sendMessageFailFrequency");
_clientMessage = null;
_clove = null;
}
@ -955,9 +931,9 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
public ReplySelector(long token) {
_pendingToken = token;
if (_log.shouldLog(Log.INFO))
_log.info(OutboundClientMessageOneShotJob.this.getJobId()
+ ": Reply selector for client message: token=" + token);
//if (_log.shouldLog(Log.INFO))
// _log.info(OutboundClientMessageOneShotJob.this.getJobId()
// + ": Reply selector for client message: token=" + token);
}
public boolean continueMatching() {
@ -1006,7 +982,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
_tags = tags;
}
public String getName() { return "Send client message successful"; }
public String getName() { return "Outbound client message send success"; }
public void runJob() {
// do we leak tags here?
@ -1067,7 +1043,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
_tags = tags;
}
public String getName() { return "Send client message timed out"; }
public String getName() { return "Outbound client message send timeout"; }
public void runJob() {
if (_log.shouldLog(Log.INFO))