clean up OCMOSJ cache cleaner

This commit is contained in:
zzz
2009-02-24 23:18:12 +00:00
parent 7a684c160b
commit 559653f0ab

View File

@ -34,6 +34,8 @@ import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.router.TunnelInfo;
import net.i2p.util.Log;
import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer;
/**
* Send a client message out a random outbound tunnel and into a random inbound
@ -98,6 +100,10 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
*/
private static final int BUNDLE_PROBABILITY_DEFAULT = 100;
private static final Object _initializeLock = new Object();
private static boolean _initialized = false;
private static final int CLEAN_INTERVAL = 5*60*1000;
/**
* Send the sucker
*/
@ -105,20 +111,26 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
super(ctx);
_log = ctx.logManager().getLog(OutboundClientMessageOneShotJob.class);
ctx.statManager().createFrequencyStat("client.sendMessageFailFrequency", "How often does a client fail to send a message?", "ClientMessages", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.sendMessageSize", "How large are messages sent by the client?", "ClientMessages", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.sendAckTime", "Message round trip time", "ClientMessages", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.timeoutCongestionTunnel", "How lagged our tunnels are when a send times out?", "ClientMessages", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.timeoutCongestionMessage", "How fast we process messages locally when a send times out?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.timeoutCongestionInbound", "How much faster we are receiving data than our average bps when a send times out?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.leaseSetFoundLocally", "How often we tried to look for a leaseSet and found it locally?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.leaseSetFoundRemoteTime", "How long we tried to look for a remote leaseSet (when we succeeded)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.leaseSetFailedRemoteTime", "How long we tried to look for a remote leaseSet (when we failed)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.dispatchPrepareTime", "How long until we've queued up the dispatch job (since we started)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.dispatchTime", "How long until we've dispatched the message (since we started)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.dispatchSendTime", "How long the actual dispatching takes?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.dispatchNoTunnels", "How long after start do we run out of tunnels to send/receive with?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.dispatchNoACK", "Repeated message sends to a peer (no ack required)", "ClientMessages", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l });
synchronized (_initializeLock) {
if (!_initialized) {
SimpleScheduler.getInstance().addPeriodicEvent(new OCMOSJCacheCleaner(ctx), CLEAN_INTERVAL, CLEAN_INTERVAL);
ctx.statManager().createFrequencyStat("client.sendMessageFailFrequency", "How often does a client fail to send a message?", "ClientMessages", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.sendMessageSize", "How large are messages sent by the client?", "ClientMessages", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.sendAckTime", "Message round trip time", "ClientMessages", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.timeoutCongestionTunnel", "How lagged our tunnels are when a send times out?", "ClientMessages", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.timeoutCongestionMessage", "How fast we process messages locally when a send times out?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.timeoutCongestionInbound", "How much faster we are receiving data than our average bps when a send times out?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.leaseSetFoundLocally", "How often we tried to look for a leaseSet and found it locally?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.leaseSetFoundRemoteTime", "How long we tried to look for a remote leaseSet (when we succeeded)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.leaseSetFailedRemoteTime", "How long we tried to look for a remote leaseSet (when we failed)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.dispatchPrepareTime", "How long until we've queued up the dispatch job (since we started)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.dispatchTime", "How long until we've dispatched the message (since we started)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.dispatchSendTime", "How long the actual dispatching takes?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.dispatchNoTunnels", "How long after start do we run out of tunnels to send/receive with?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("client.dispatchNoACK", "Repeated message sends to a peer (no ack required)", "ClientMessages", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l });
_initialized = true;
}
}
long timeoutMs = OVERALL_TIMEOUT_MS_DEFAULT;
_clientMessage = msg;
_clientMessageId = msg.getMessageId();
@ -201,7 +213,6 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
* Key the cache on the source+dest pair.
*/
private static HashMap _leaseSetCache = new HashMap();
private static long _lscleanTime = 0;
private LeaseSet getReplyLeaseSet(boolean force) {
LeaseSet newLS = getContext().netDb().lookupLeaseSetLocally(_from.calculateHash());
if (newLS == null)
@ -235,10 +246,6 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
// If the last leaseSet we sent him is still good, don't bother sending again
long now = getContext().clock().now();
synchronized (_leaseSetCache) {
if (now - _lscleanTime > 5*60*1000) { // clean out periodically
cleanLeaseSetCache(_leaseSetCache);
_lscleanTime = now;
}
if (!force) {
LeaseSet ls = (LeaseSet) _leaseSetCache.get(hashPair());
if (ls != null) {
@ -306,7 +313,6 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
*
*/
private static HashMap _leaseCache = new HashMap();
private static long _lcleanTime = 0;
private boolean getNextLease() {
_leaseSet = getContext().netDb().lookupLeaseSetLocally(_to.calculateHash());
if (_leaseSet == null) {
@ -319,10 +325,6 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
// Use the same lease if it's still good
// Even if _leaseSet changed, _leaseSet.getEncryptionKey() didn't...
synchronized (_leaseCache) {
if (now - _lcleanTime > 5*60*1000) { // clean out periodically
cleanLeaseCache(_leaseCache);
_lcleanTime = now;
}
_lease = (Lease) _leaseCache.get(hashPair());
if (_lease != null) {
// if outbound tunnel length == 0 && lease.firsthop.isBacklogged() don't use it ??
@ -607,7 +609,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
* (needed for cleanTunnelCache)
* 44 = 32 * 4 / 3
*/
private Hash sourceFromHashPair(String s) {
private static Hash sourceFromHashPair(String s) {
return new Hash(Base64.decode(s.substring(44, 88)));
}
@ -648,8 +650,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
* Clean out old leaseSets from a set.
* Caller must synchronize on tc.
*/
private void cleanLeaseSetCache(HashMap tc) {
long now = getContext().clock().now();
private static void cleanLeaseSetCache(RouterContext ctx, HashMap tc) {
long now = ctx.clock().now();
List deleteList = new ArrayList();
for (Iterator iter = tc.entrySet().iterator(); iter.hasNext(); ) {
Map.Entry entry = (Map.Entry)iter.next();
@ -668,7 +670,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
* Clean out old leases from a set.
* Caller must synchronize on tc.
*/
private void cleanLeaseCache(HashMap tc) {
private static void cleanLeaseCache(HashMap tc) {
List deleteList = new ArrayList();
for (Iterator iter = tc.entrySet().iterator(); iter.hasNext(); ) {
Map.Entry entry = (Map.Entry)iter.next();
@ -687,13 +689,13 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
* Clean out old tunnels from a set.
* Caller must synchronize on tc.
*/
private void cleanTunnelCache(HashMap tc) {
private static void cleanTunnelCache(RouterContext ctx, HashMap tc) {
List deleteList = new ArrayList();
for (Iterator iter = tc.entrySet().iterator(); iter.hasNext(); ) {
Map.Entry entry = (Map.Entry)iter.next();
String k = (String) entry.getKey();
TunnelInfo tunnel = (TunnelInfo) entry.getValue();
if (!getContext().tunnelManager().isValidTunnel(sourceFromHashPair(k), tunnel))
if (!ctx.tunnelManager().isValidTunnel(sourceFromHashPair(k), tunnel))
deleteList.add(k);
}
for (Iterator iter = deleteList.iterator(); iter.hasNext(); ) {
@ -702,6 +704,25 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
}
}
private static class OCMOSJCacheCleaner implements SimpleTimer.TimedEvent {
private RouterContext _ctx;
private OCMOSJCacheCleaner(RouterContext ctx) {
_ctx = ctx;
}
public void timeReached() {
synchronized(_leaseSetCache) {
cleanLeaseSetCache(_ctx, _leaseSetCache);
}
synchronized(_leaseCache) {
cleanLeaseCache(_leaseCache);
}
synchronized(_tunnelCache) {
cleanTunnelCache(_ctx, _tunnelCache);
cleanTunnelCache(_ctx, _backloggedTunnelCache);
}
}
}
/**
* Use the same outbound tunnel as we did for the same destination previously,
* if possible, to keep the streaming lib happy
@ -712,16 +733,10 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
*/
private static HashMap _tunnelCache = new HashMap();
private static HashMap _backloggedTunnelCache = new HashMap();
private static long _cleanTime = 0;
private TunnelInfo selectOutboundTunnel(Destination to) {
TunnelInfo tunnel;
long now = getContext().clock().now();
synchronized (_tunnelCache) {
if (now - _cleanTime > 5*60*1000) { // clean out periodically
cleanTunnelCache(_tunnelCache);
cleanTunnelCache(_backloggedTunnelCache);
_cleanTime = now;
}
/**
* If old tunnel is valid and no longer backlogged, use it.
* This prevents an active anonymity attack, where a peer could tell