* OCMOSJ: Refactor cache to its own class, make non-static

This commit is contained in:
zzz
2012-03-16 12:20:29 +00:00
parent fb8244ee1a
commit 3a2fe5e860
5 changed files with 281 additions and 216 deletions

View File

@ -1,3 +1,14 @@
2012-03-16 zzz
* FragmentHandler: Zero-copy read of unfragmented messages
for speed and to reduce object churn
* Home page: Tag tooltip; CSS tweaks; news tweak
* HTTP Proxy: Jump and addresshelper page tweaks
* Jetty: Add I2P mime types to default eepsite config
* OCMOSJ: Refactor cache to its own class, make non-static
* TransportManager: Fix fatal exception on soft restart caused by DHSKB refactoring
* TrustedUpdate: Preserve default key names even when keys are set
in advanced config
2012-03-15 sponge 2012-03-15 sponge
* Plugins: * Plugins:
- String.isEmpty() [ java 6 ] -> (String.length() == 0) [ java 5 ] - String.isEmpty() [ java 6 ] -> (String.length() == 0) [ java 5 ]

View File

@ -11,6 +11,7 @@ package net.i2p.router;
import java.util.Properties; import java.util.Properties;
import net.i2p.client.I2PClient; import net.i2p.client.I2PClient;
import net.i2p.router.message.OutboundCache;
import net.i2p.router.message.OutboundClientMessageOneShotJob; import net.i2p.router.message.OutboundClientMessageOneShotJob;
import net.i2p.util.Log; import net.i2p.util.Log;
@ -25,10 +26,12 @@ import net.i2p.util.Log;
public class ClientMessagePool { public class ClientMessagePool {
private final Log _log; private final Log _log;
private final RouterContext _context; private final RouterContext _context;
private final OutboundCache _cache;
public ClientMessagePool(RouterContext context) { public ClientMessagePool(RouterContext context) {
_context = context; _context = context;
_log = _context.logManager().getLog(ClientMessagePool.class); _log = _context.logManager().getLog(ClientMessagePool.class);
_cache = new OutboundCache(_context);
OutboundClientMessageOneShotJob.init(_context); OutboundClientMessageOneShotJob.init(_context);
} }
@ -36,7 +39,7 @@ public class ClientMessagePool {
* @since 0.8.8 * @since 0.8.8
*/ */
public void shutdown() { public void shutdown() {
OutboundClientMessageOneShotJob.clearAllCaches(); _cache.clearAllCaches();
} }
/** /**
@ -72,7 +75,7 @@ public class ClientMessagePool {
} else { } else {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Adding message for remote delivery"); _log.debug("Adding message for remote delivery");
OutboundClientMessageOneShotJob j = new OutboundClientMessageOneShotJob(_context, msg); OutboundClientMessageOneShotJob j = new OutboundClientMessageOneShotJob(_context, _cache, msg);
if (true) // blocks the I2CP reader for a nontrivial period of time if (true) // blocks the I2CP reader for a nontrivial period of time
j.runJob(); j.runJob();
else else

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */ /** deprecated */
public final static String ID = "Monotone"; public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION; public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 15; public final static long BUILD = 16;
/** for example "-test" */ /** for example "-test" */
public final static String EXTRA = ""; public final static String EXTRA = "";

View File

@ -0,0 +1,238 @@
package net.i2p.router.message;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import net.i2p.data.Hash;
import net.i2p.data.Lease;
import net.i2p.data.LeaseSet;
import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.router.TunnelInfo;
import net.i2p.util.Log;
import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer;
/**
* Helper for OCMOSJ
*
* This is the place where we make I2P go fast.
*
* We have five static caches.
* - The LeaseSet cache is used to decide whether to bundle our own leaseset,
* which minimizes overhead.
* - The Lease cache is used to persistently send to the same lease for the destination,
* which keeps the streaming lib happy by minimizing out-of-order delivery.
* - The Tunnel and BackloggedTunnel caches are used to persistently use the same outbound tunnel
* for the same destination,
* which keeps the streaming lib happy by minimizing out-of-order delivery.
* - The last reply requested cache ensures that a reply is requested every so often,
* so that failed tunnels are recognized.
*
* @since 0.9 moved out of OCMOSJ
*/
public class OutboundCache {
/**
* Use the same outbound tunnel as we did for the same destination previously,
* if possible, to keep the streaming lib happy
* Use two caches - although a cache of a list of tunnels per dest might be
* more elegant.
* Key the caches on the source+dest pair.
*
* NOT concurrent.
*/
final Map<HashPair, TunnelInfo> tunnelCache = new HashMap(64);
/*
* NOT concurrent.
*/
final Map<HashPair, TunnelInfo> backloggedTunnelCache = new HashMap(64);
/**
* Returns the reply lease set if forced to do so,
* or if configured to do so,
* or if a certain percentage of the time if configured to do so,
* or if our lease set has changed since we last talked to them,
* or 10% of the time anyway so they don't forget us (disabled for now),
* or null otherwise.
*
* Note that wantACK randomly forces us another 5% of the time.
*
* We don't want to do this too often as a typical 2-lease leaseset
* in a DatabaseStoreMessage is 861+37=898 bytes -
* when added to garlic it's a 1056-byte addition total, which is huge.
*
* Key the cache on the source+dest pair.
*
* Concurrent.
*/
final Map<HashPair, LeaseSet> leaseSetCache = new ConcurrentHashMap(64);
/**
* Use the same inbound tunnel (i.e. lease) as we did for the same destination previously,
* if possible, to keep the streaming lib happy
* Key the caches on the source+dest pair.
*
* We're going to use the lease until it expires, as long as it remains in the current leaseSet.
*
* If not found,
* fetch the next lease that we should try sending through, randomly chosen
* from within the sorted leaseSet (NOT sorted by # of failures through each
* lease).
*
* Concurrent.
*/
final ConcurrentHashMap<HashPair, Lease> leaseCache = new ConcurrentHashMap(64);
/**
* This cache is used to ensure that we request a reply every so often.
* Hopefully this allows the router to recognize a failed tunnel and switch,
* before upper layers like streaming lib fail, even for low-bandwidth
* connections like IRC.
*
* Concurrent.
*/
final Map<HashPair, Long> lastReplyRequestCache = new ConcurrentHashMap(64);
private final RouterContext _context;
private static final int CLEAN_INTERVAL = 5*60*1000;
public OutboundCache(RouterContext ctx) {
_context = ctx;
SimpleScheduler.getInstance().addPeriodicEvent(new OCMOSJCacheCleaner(), CLEAN_INTERVAL, CLEAN_INTERVAL);
}
/**
* Key used to cache things with based on source + dest
* @since 0.8.3
*/
static class HashPair {
private final Hash sh, dh;
public HashPair(Hash s, Hash d) {
sh = s;
dh = d;
}
public int hashCode() {
return sh.hashCode() ^ dh.hashCode();
}
public boolean equals(Object o) {
if (o == null || !(o instanceof HashPair))
return false;
HashPair hp = (HashPair) o;
return sh.equals(hp.sh) && dh.equals(hp.dh);
}
}
/**
* Called on failure to give us a better chance of success next time.
* Of course this is probably 60s too late.
* And we could pick the bad ones at random again.
* Or remove entries that were sent and succeeded after this was sent but before this failed.
* But it's a start.
*
* @param lease may be null
* @param inTunnel may be null
* @param outTunnel may be null
*/
void clearCaches(HashPair hashPair, Lease lease, TunnelInfo inTunnel, TunnelInfo outTunnel) {
if (inTunnel != null) { // if we wanted an ack, we sent our lease too
leaseSetCache.remove(hashPair);
}
if (lease != null) {
// remove only if still equal to lease (concurrent)
leaseCache.remove(hashPair, lease);
}
if (outTunnel != null) {
synchronized(tunnelCache) {
TunnelInfo t = backloggedTunnelCache.get(hashPair);
if (t != null && t.equals(outTunnel))
backloggedTunnelCache.remove(hashPair);
t = tunnelCache.get(hashPair);
if (t != null && t.equals(outTunnel))
tunnelCache.remove(hashPair);
}
}
}
/**
* @since 0.8.8
*/
public void clearAllCaches() {
leaseSetCache.clear();
leaseCache.clear();
synchronized(tunnelCache) {
backloggedTunnelCache.clear();
tunnelCache.clear();
}
lastReplyRequestCache.clear();
}
/**
* Clean out old leaseSets
*/
private static void cleanLeaseSetCache(RouterContext ctx, Map<HashPair, LeaseSet> tc) {
long now = ctx.clock().now();
for (Iterator<LeaseSet> iter = tc.values().iterator(); iter.hasNext(); ) {
LeaseSet l = iter.next();
if (l.getEarliestLeaseDate() < now)
iter.remove();
}
}
/**
* Clean out old leases
*/
private static void cleanLeaseCache(Map<HashPair, Lease> tc) {
for (Iterator<Lease> iter = tc.values().iterator(); iter.hasNext(); ) {
Lease l = iter.next();
if (l.isExpired(Router.CLOCK_FUDGE_FACTOR))
iter.remove();
}
}
/**
* Clean out old tunnels
* Caller must synchronize on tc.
*/
private static void cleanTunnelCache(RouterContext ctx, Map<HashPair, TunnelInfo> tc) {
for (Iterator<Map.Entry<HashPair, TunnelInfo>> iter = tc.entrySet().iterator(); iter.hasNext(); ) {
Map.Entry<HashPair, TunnelInfo> entry = iter.next();
HashPair k = entry.getKey();
TunnelInfo tunnel = entry.getValue();
// This is a little sneaky, but get the _from back out of the "opaque" hash key
if (!ctx.tunnelManager().isValidTunnel(k.sh, tunnel))
iter.remove();
}
}
/**
* Clean out old reply times
*/
private static void cleanReplyCache(RouterContext ctx, Map<HashPair, Long> tc) {
long now = ctx.clock().now();
for (Iterator<Long> iter = tc.values().iterator(); iter.hasNext(); ) {
Long l = iter.next();
if (l.longValue() < now - CLEAN_INTERVAL)
iter.remove();
}
}
private class OCMOSJCacheCleaner implements SimpleTimer.TimedEvent {
public void timeReached() {
cleanLeaseSetCache(_context, leaseSetCache);
cleanLeaseCache(leaseCache);
synchronized(tunnelCache) {
cleanTunnelCache(_context, tunnelCache);
cleanTunnelCache(_context, backloggedTunnelCache);
}
cleanReplyCache(_context, lastReplyRequestCache);
}
}
}

View File

@ -2,18 +2,14 @@ package net.i2p.router.message;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import net.i2p.crypto.SessionKeyManager; import net.i2p.crypto.SessionKeyManager;
import net.i2p.crypto.TagSetHandle; import net.i2p.crypto.TagSetHandle;
import net.i2p.data.Base64;
import net.i2p.data.Certificate; import net.i2p.data.Certificate;
import net.i2p.data.Destination; import net.i2p.data.Destination;
import net.i2p.data.Hash; import net.i2p.data.Hash;
@ -38,8 +34,6 @@ import net.i2p.router.Router;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.router.TunnelInfo; import net.i2p.router.TunnelInfo;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer;
/** /**
* Send a client message out a random outbound tunnel and into a random inbound * Send a client message out a random outbound tunnel and into a random inbound
@ -49,6 +43,7 @@ import net.i2p.util.SimpleTimer;
*/ */
public class OutboundClientMessageOneShotJob extends JobImpl { public class OutboundClientMessageOneShotJob extends JobImpl {
private final Log _log; private final Log _log;
private final OutboundCache _cache;
private final long _overallExpiration; private final long _overallExpiration;
private final ClientMessage _clientMessage; private final ClientMessage _clientMessage;
private final MessageId _clientMessageId; private final MessageId _clientMessageId;
@ -69,79 +64,10 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
private TunnelInfo _inTunnel; private TunnelInfo _inTunnel;
private boolean _wantACK; private boolean _wantACK;
/**
* This is the place where we make I2P go fast.
*
* We have five static caches.
* - The LeaseSet cache is used to decide whether to bundle our own leaseset,
* which minimizes overhead.
* - The Lease cache is used to persistently send to the same lease for the destination,
* which keeps the streaming lib happy by minimizing out-of-order delivery.
* - The Tunnel and BackloggedTunnel caches are used to persistently use the same outbound tunnel
* for the same destination,
* which keeps the streaming lib happy by minimizing out-of-order delivery.
* - The last reply requested cache ensures that a reply is requested every so often,
* so that failed tunnels are recognized.
*
*/
/** /**
* Key used to cache things with, based on source + dest * Key used to cache things with, based on source + dest
*/ */
private final HashPair _hashPair; private final OutboundCache.HashPair _hashPair;
/**
* Use the same outbound tunnel as we did for the same destination previously,
* if possible, to keep the streaming lib happy
* Use two caches - although a cache of a list of tunnels per dest might be
* more elegant.
* Key the caches on the source+dest pair.
*
*/
private static final Map<HashPair, TunnelInfo> _tunnelCache = new HashMap(64);
private static final Map<HashPair, TunnelInfo> _backloggedTunnelCache = new HashMap(64);
/**
* Returns the reply lease set if forced to do so,
* or if configured to do so,
* or if a certain percentage of the time if configured to do so,
* or if our lease set has changed since we last talked to them,
* or 10% of the time anyway so they don't forget us (disabled for now),
* or null otherwise.
*
* Note that wantACK randomly forces us another 5% of the time.
*
* We don't want to do this too often as a typical 2-lease leaseset
* in a DatabaseStoreMessage is 861+37=898 bytes -
* when added to garlic it's a 1056-byte addition total, which is huge.
*
* Key the cache on the source+dest pair.
*/
private static final Map<HashPair, LeaseSet> _leaseSetCache = new ConcurrentHashMap(64);
/**
* Use the same inbound tunnel (i.e. lease) as we did for the same destination previously,
* if possible, to keep the streaming lib happy
* Key the caches on the source+dest pair.
*
* We're going to use the lease until it expires, as long as it remains in the current leaseSet.
*
* If not found,
* fetch the next lease that we should try sending through, randomly chosen
* from within the sorted leaseSet (NOT sorted by # of failures through each
* lease).
*
*/
private static final ConcurrentHashMap<HashPair, Lease> _leaseCache = new ConcurrentHashMap(64);
/**
* This cache is used to ensure that we request a reply every so often.
* Hopefully this allows the router to recognize a failed tunnel and switch,
* before upper layers like streaming lib fail, even for low-bandwidth
* connections like IRC.
*/
private static final Map<HashPair, Long> _lastReplyRequestCache = new ConcurrentHashMap(64);
/** /**
* final timeout (in milliseconds) that the outbound message will fail in. * final timeout (in milliseconds) that the outbound message will fail in.
@ -178,14 +104,14 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
*/ */
private static final int BUNDLE_PROBABILITY_DEFAULT = 100; private static final int BUNDLE_PROBABILITY_DEFAULT = 100;
private static final int CLEAN_INTERVAL = 5*60*1000;
private static final int REPLY_REQUEST_INTERVAL = 60*1000; private static final int REPLY_REQUEST_INTERVAL = 60*1000;
/** /**
* Send the sucker * Send the sucker
*/ */
public OutboundClientMessageOneShotJob(RouterContext ctx, ClientMessage msg) { public OutboundClientMessageOneShotJob(RouterContext ctx, OutboundCache cache, ClientMessage msg) {
super(ctx); super(ctx);
_cache = cache;
_log = ctx.logManager().getLog(OutboundClientMessageOneShotJob.class); _log = ctx.logManager().getLog(OutboundClientMessageOneShotJob.class);
long timeoutMs = OVERALL_TIMEOUT_MS_DEFAULT; long timeoutMs = OVERALL_TIMEOUT_MS_DEFAULT;
@ -194,7 +120,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
_clientMessageSize = msg.getPayload().getSize(); _clientMessageSize = msg.getPayload().getSize();
_from = msg.getFromDestination(); _from = msg.getFromDestination();
_to = msg.getDestination(); _to = msg.getDestination();
_hashPair = new HashPair(_from.calculateHash(), _to.calculateHash()); _hashPair = new OutboundCache.HashPair(_from.calculateHash(), _to.calculateHash());
_toString = _to.calculateHash().toBase64().substring(0,4); _toString = _to.calculateHash().toBase64().substring(0,4);
_start = getContext().clock().now(); _start = getContext().clock().now();
@ -236,7 +162,6 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
/** call once only */ /** call once only */
public static void init(RouterContext ctx) { public static void init(RouterContext ctx) {
SimpleScheduler.getInstance().addPeriodicEvent(new OCMOSJCacheCleaner(ctx), CLEAN_INTERVAL, CLEAN_INTERVAL);
ctx.statManager().createFrequencyStat("client.sendMessageFailFrequency", "How often does a client fail to send a message?", "ClientMessages", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); ctx.statManager().createFrequencyStat("client.sendMessageFailFrequency", "How often does a client fail to send a message?", "ClientMessages", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.sendMessageSize", "How large are messages sent by the client?", "ClientMessages", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); ctx.statManager().createRateStat("client.sendMessageSize", "How large are messages sent by the client?", "ClientMessages", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRequiredRateStat("client.sendAckTime", "Message round trip time (ms)", "ClientMessages", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); ctx.statManager().createRequiredRateStat("client.sendAckTime", "Message round trip time (ms)", "ClientMessages", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
@ -317,7 +242,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
} }
// If the last leaseSet we sent him is still good, don't bother sending again // If the last leaseSet we sent him is still good, don't bother sending again
LeaseSet ls = _leaseSetCache.put(_hashPair, newLS); LeaseSet ls = _cache.leaseSetCache.put(_hashPair, newLS);
if (!force) { if (!force) {
if (ls != null) { if (ls != null) {
if (ls.equals(newLS)) { if (ls.equals(newLS)) {
@ -369,6 +294,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
} }
/** /**
* Choose a lease from his leaseset to send the message to. Sets _lease.
* Sets _wantACK if it's new or changed.
* @return success * @return success
*/ */
private boolean getNextLease() { private boolean getNextLease() {
@ -385,7 +312,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
// Use the same lease if it's still good // Use the same lease if it's still good
// Even if _leaseSet changed, _leaseSet.getEncryptionKey() didn't... // Even if _leaseSet changed, _leaseSet.getEncryptionKey() didn't...
_lease = _leaseCache.get(_hashPair); _lease = _cache.leaseCache.get(_hashPair);
if (_lease != null) { if (_lease != null) {
// if outbound tunnel length == 0 && lease.firsthop.isBacklogged() don't use it ?? // if outbound tunnel length == 0 && lease.firsthop.isBacklogged() don't use it ??
if (!_lease.isExpired(Router.CLOCK_FUDGE_FACTOR)) { if (!_lease.isExpired(Router.CLOCK_FUDGE_FACTOR)) {
@ -401,7 +328,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
} }
} }
// remove only if still equal to _lease (concurrent) // remove only if still equal to _lease (concurrent)
_leaseCache.remove(_hashPair, _lease); _cache.leaseCache.remove(_hashPair, _lease);
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Expired from cache - lease for " + _toString); _log.info(getJobId() + ": Expired from cache - lease for " + _toString);
} }
@ -471,7 +398,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn(getJobId() + ": All leases are unreachable for " + _toString); _log.warn(getJobId() + ": All leases are unreachable for " + _toString);
} }
_leaseCache.put(_hashPair, _lease); _cache.leaseCache.put(_hashPair, _lease);
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Added to cache - lease for " + _toString); _log.info(getJobId() + ": Added to cache - lease for " + _toString);
_wantACK = true; _wantACK = true;
@ -528,11 +455,11 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
// DONE (added new cache): wantACK if we haven't in last 1m (requires a new static cache probably) // DONE (added new cache): wantACK if we haven't in last 1m (requires a new static cache probably)
boolean wantACK; boolean wantACK;
Long lastSent = _lastReplyRequestCache.get(_hashPair); Long lastSent = _cache.lastReplyRequestCache.get(_hashPair);
wantACK = _wantACK || existingTags <= 30 || wantACK = _wantACK || existingTags <= 30 ||
lastSent == null || lastSent.longValue() < now - REPLY_REQUEST_INTERVAL; lastSent == null || lastSent.longValue() < now - REPLY_REQUEST_INTERVAL;
if (wantACK) if (wantACK)
_lastReplyRequestCache.put(_hashPair, Long.valueOf(now)); _cache.lastReplyRequestCache.put(_hashPair, Long.valueOf(now));
PublicKey key = _leaseSet.getEncryptionKey(); PublicKey key = _leaseSet.getEncryptionKey();
SessionKey sessKey = new SessionKey(); SessionKey sessKey = new SessionKey();
@ -656,30 +583,6 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
} }
} }
/**
* Key used to cache things with based on source + dest
* @since 0.8.3
*/
private static class HashPair {
private final Hash sh, dh;
public HashPair(Hash s, Hash d) {
sh = s;
dh = d;
}
public int hashCode() {
return sh.hashCode() ^ dh.hashCode();
}
public boolean equals(Object o) {
if (o == null || !(o instanceof HashPair))
return false;
HashPair hp = (HashPair) o;
return sh.equals(hp.sh) && dh.equals(hp.dh);
}
}
/** /**
* Called on failure to give us a better chance of success next time. * Called on failure to give us a better chance of success next time.
* Of course this is probably 60s too late. * Of course this is probably 60s too late.
@ -688,129 +591,39 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
* But it's a start. * But it's a start.
*/ */
private void clearCaches() { private void clearCaches() {
if (_inTunnel != null) { // if we wanted an ack, we sent our lease too _cache.clearCaches(_hashPair, _lease, _inTunnel, _outTunnel);
_leaseSetCache.remove(_hashPair);
}
if (_lease != null) {
// remove only if still equal to _lease (concurrent)
_leaseCache.remove(_hashPair, _lease);
}
if (_outTunnel != null) {
synchronized(_tunnelCache) {
TunnelInfo t = _backloggedTunnelCache.get(_hashPair);
if (t != null && t.equals(_outTunnel))
_backloggedTunnelCache.remove(_hashPair);
t = _tunnelCache.get(_hashPair);
if (t != null && t.equals(_outTunnel))
_tunnelCache.remove(_hashPair);
}
}
} }
/** /**
* @since 0.8.8 * Choose our outbound tunnel to send the message through.
* Sets _wantACK if it's new or changed.
* @return the tunnel or null on failure
*/ */
public static void clearAllCaches() {
_leaseSetCache.clear();
_leaseCache.clear();
synchronized(_tunnelCache) {
_backloggedTunnelCache.clear();
_tunnelCache.clear();
}
_lastReplyRequestCache.clear();
}
/**
* Clean out old leaseSets
*/
private static void cleanLeaseSetCache(RouterContext ctx, Map<HashPair, LeaseSet> tc) {
long now = ctx.clock().now();
for (Iterator<LeaseSet> iter = tc.values().iterator(); iter.hasNext(); ) {
LeaseSet l = iter.next();
if (l.getEarliestLeaseDate() < now)
iter.remove();
}
}
/**
* Clean out old leases
*/
private static void cleanLeaseCache(Map<HashPair, Lease> tc) {
for (Iterator<Lease> iter = tc.values().iterator(); iter.hasNext(); ) {
Lease l = iter.next();
if (l.isExpired(Router.CLOCK_FUDGE_FACTOR))
iter.remove();
}
}
/**
* Clean out old tunnels
* Caller must synchronize on tc.
*/
private static void cleanTunnelCache(RouterContext ctx, Map<HashPair, TunnelInfo> tc) {
for (Iterator<Map.Entry<HashPair, TunnelInfo>> iter = tc.entrySet().iterator(); iter.hasNext(); ) {
Map.Entry<HashPair, TunnelInfo> entry = iter.next();
HashPair k = entry.getKey();
TunnelInfo tunnel = entry.getValue();
// This is a little sneaky, but get the _from back out of the "opaque" hash key
if (!ctx.tunnelManager().isValidTunnel(k.sh, tunnel))
iter.remove();
}
}
/**
* Clean out old reply times
*/
private static void cleanReplyCache(RouterContext ctx, Map<HashPair, Long> tc) {
long now = ctx.clock().now();
for (Iterator<Long> iter = tc.values().iterator(); iter.hasNext(); ) {
Long l = iter.next();
if (l.longValue() < now - CLEAN_INTERVAL)
iter.remove();
}
}
private static class OCMOSJCacheCleaner implements SimpleTimer.TimedEvent {
private RouterContext _ctx;
private OCMOSJCacheCleaner(RouterContext ctx) {
_ctx = ctx;
}
public void timeReached() {
cleanLeaseSetCache(_ctx, _leaseSetCache);
cleanLeaseCache(_leaseCache);
synchronized(_tunnelCache) {
cleanTunnelCache(_ctx, _tunnelCache);
cleanTunnelCache(_ctx, _backloggedTunnelCache);
}
cleanReplyCache(_ctx, _lastReplyRequestCache);
}
}
private TunnelInfo selectOutboundTunnel(Destination to) { private TunnelInfo selectOutboundTunnel(Destination to) {
TunnelInfo tunnel; TunnelInfo tunnel;
synchronized (_tunnelCache) { synchronized (_cache.tunnelCache) {
/** /**
* If old tunnel is valid and no longer backlogged, use it. * If old tunnel is valid and no longer backlogged, use it.
* This prevents an active anonymity attack, where a peer could tell * This prevents an active anonymity attack, where a peer could tell
* if you were the originator by backlogging the tunnel, then removing the * if you were the originator by backlogging the tunnel, then removing the
* backlog and seeing if traffic came back or not. * backlog and seeing if traffic came back or not.
*/ */
tunnel = _backloggedTunnelCache.get(_hashPair); tunnel = _cache.backloggedTunnelCache.get(_hashPair);
if (tunnel != null) { if (tunnel != null) {
if (getContext().tunnelManager().isValidTunnel(_from.calculateHash(), tunnel)) { if (getContext().tunnelManager().isValidTunnel(_from.calculateHash(), tunnel)) {
if (!getContext().commSystem().isBacklogged(tunnel.getPeer(1))) { if (!getContext().commSystem().isBacklogged(tunnel.getPeer(1))) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Switching back to tunnel " + tunnel + " for " + _toString); _log.warn("Switching back to tunnel " + tunnel + " for " + _toString);
_backloggedTunnelCache.remove(_hashPair); _cache.backloggedTunnelCache.remove(_hashPair);
_tunnelCache.put(_hashPair, tunnel); _cache.tunnelCache.put(_hashPair, tunnel);
_wantACK = true; _wantACK = true;
return tunnel; return tunnel;
} // else still backlogged } // else still backlogged
} else // no longer valid } else // no longer valid
_backloggedTunnelCache.remove(_hashPair); _cache.backloggedTunnelCache.remove(_hashPair);
} }
// Use the same tunnel unless backlogged // Use the same tunnel unless backlogged
tunnel = _tunnelCache.get(_hashPair); tunnel = _cache.tunnelCache.get(_hashPair);
if (tunnel != null) { if (tunnel != null) {
if (getContext().tunnelManager().isValidTunnel(_from.calculateHash(), tunnel)) { if (getContext().tunnelManager().isValidTunnel(_from.calculateHash(), tunnel)) {
if (tunnel.getLength() <= 1 || !getContext().commSystem().isBacklogged(tunnel.getPeer(1))) if (tunnel.getLength() <= 1 || !getContext().commSystem().isBacklogged(tunnel.getPeer(1)))
@ -818,14 +631,14 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
// backlogged // backlogged
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Switching from backlogged " + tunnel + " for " + _toString); _log.warn("Switching from backlogged " + tunnel + " for " + _toString);
_backloggedTunnelCache.put(_hashPair, tunnel); _cache.backloggedTunnelCache.put(_hashPair, tunnel);
} // else no longer valid } // else no longer valid
_tunnelCache.remove(_hashPair); _cache.tunnelCache.remove(_hashPair);
} }
// Pick a new tunnel // Pick a new tunnel
tunnel = selectOutboundTunnel(); tunnel = selectOutboundTunnel();
if (tunnel != null) if (tunnel != null)
_tunnelCache.put(_hashPair, tunnel); _cache.tunnelCache.put(_hashPair, tunnel);
_wantACK = true; _wantACK = true;
} }
return tunnel; return tunnel;