* OCMOSJ:

- Change from 5% reply requests to at least
        once per minute, in hopes of reducing IRC drops
      - More clean up of the cache cleaning
This commit is contained in:
zzz
2009-03-01 20:45:16 +00:00
parent 59b624a4a4
commit c455fa6309

View File

@ -103,6 +103,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
private static final Object _initializeLock = new Object();
private static boolean _initialized = false;
private static final int CLEAN_INTERVAL = 5*60*1000;
private static final int REPLY_REQUEST_INTERVAL = 60*1000;
/**
* Send the sucker
@ -212,7 +213,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
*
* Key the cache on the source+dest pair.
*/
private static HashMap _leaseSetCache = new HashMap();
private static HashMap<String, LeaseSet> _leaseSetCache = new HashMap();
private LeaseSet getReplyLeaseSet(boolean force) {
LeaseSet newLS = getContext().netDb().lookupLeaseSetLocally(_from.calculateHash());
if (newLS == null)
@ -247,7 +248,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
long now = getContext().clock().now();
synchronized (_leaseSetCache) {
if (!force) {
LeaseSet ls = (LeaseSet) _leaseSetCache.get(hashPair());
LeaseSet ls = _leaseSetCache.get(hashPair());
if (ls != null) {
if (ls.equals(newLS)) {
// still good, send it 10% of the time
@ -312,7 +313,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
* lease).
*
*/
private static HashMap _leaseCache = new HashMap();
private static HashMap<String, Lease> _leaseCache = new HashMap();
private boolean getNextLease() {
_leaseSet = getContext().netDb().lookupLeaseSetLocally(_to.calculateHash());
if (_leaseSet == null) {
@ -325,7 +326,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
// Use the same lease if it's still good
// Even if _leaseSet changed, _leaseSet.getEncryptionKey() didn't...
synchronized (_leaseCache) {
_lease = (Lease) _leaseCache.get(hashPair());
_lease = _leaseCache.get(hashPair());
if (_lease != null) {
// if outbound tunnel length == 0 && lease.firsthop.isBacklogged() don't use it ??
if (!_lease.isExpired(Router.CLOCK_FUDGE_FACTOR)) {
@ -446,6 +447,14 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
}
}
/**
* This cache is used to ensure that we request a reply every so often.
* Hopefully this allows the router to recognize a failed tunnel and switch,
* before upper layers like streaming lib fail, even for low-bandwidth
* connections like IRC.
*/
private static HashMap<String, Long> _lastReplyRequestCache = new HashMap();
/**
* Send the message to the specified tunnel by creating a new garlic message containing
* the (already created) payload clove as well as a new delivery status message. This garlic
@ -456,18 +465,27 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
*/
private void send() {
if (_finished) return;
if (getContext().clock().now() >= _overallExpiration) {
long now = getContext().clock().now();
if (now >= _overallExpiration) {
dieFatal();
return;
}
int existingTags = GarlicMessageBuilder.estimateAvailableTags(getContext(), _leaseSet.getEncryptionKey());
_outTunnel = selectOutboundTunnel(_to);
// boolean wantACK = _wantACK || existingTags <= 30 || getContext().random().nextInt(100) < 5;
// what's the point of 5% random? possible improvements or replacements:
// - wantACK if we changed their inbound lease (getNextLease() sets _wantACK)
// - wantACK if we changed our outbound tunnel (selectOutboundTunnel() sets _wantACK)
// - wantACK if we haven't in last 1m (requires a new static cache probably)
boolean wantACK = _wantACK || existingTags <= 30 || getContext().random().nextInt(100) < 5;
// DONE (getNextLease() is called before this): wantACK if we changed their inbound lease (getNextLease() sets _wantACK)
// DONE (selectOutboundTunnel() moved above here): wantACK if we changed our outbound tunnel (selectOutboundTunnel() sets _wantACK)
// DONE (added new cache): wantACK if we haven't in last 1m (requires a new static cache probably)
boolean wantACK;
synchronized (_lastReplyRequestCache) {
Long lastSent = _lastReplyRequestCache.get(hashPair());
wantACK = _wantACK || existingTags <= 30 ||
lastSent == null || lastSent.longValue() < now - REPLY_REQUEST_INTERVAL;
if (wantACK)
_lastReplyRequestCache.put(hashPair(), Long.valueOf(now));
}
PublicKey key = _leaseSet.getEncryptionKey();
SessionKey sessKey = new SessionKey();
@ -501,7 +519,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
// we dont receive the reply? hmm...)
if (_log.shouldLog(Log.WARN))
_log.warn(getJobId() + ": Unable to create the garlic message (no tunnels left or too lagged) to " + _toString);
getContext().statManager().addRateData("client.dispatchNoTunnels", getContext().clock().now() - _start, 0);
getContext().statManager().addRateData("client.dispatchNoTunnels", now - _start, 0);
dieFatal();
return;
}
@ -539,12 +557,12 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
} else {
if (_log.shouldLog(Log.WARN))
_log.warn(getJobId() + ": Could not find any outbound tunnels to send the payload through... this might take a while");
getContext().statManager().addRateData("client.dispatchNoTunnels", getContext().clock().now() - _start, 0);
getContext().statManager().addRateData("client.dispatchNoTunnels", now - _start, 0);
dieFatal();
}
_clientMessage = null;
_clove = null;
getContext().statManager().addRateData("client.dispatchPrepareTime", getContext().clock().now() - _start, 0);
getContext().statManager().addRateData("client.dispatchPrepareTime", now - _start, 0);
if (!wantACK)
getContext().statManager().addRateData("client.dispatchNoACK", 1, 0);
}
@ -582,7 +600,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
/**
* This is the place where we make I2P go fast.
*
* We have four static caches.
* We have five static caches.
* - The LeaseSet cache is used to decide whether to bundle our own leaseset,
* which minimizes overhead.
* - The Lease cache is used to persistently send to the same lease for the destination,
@ -590,6 +608,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
* - The Tunnel and BackloggedTunnel caches are used to persistently use the same outbound tunnel
* for the same destination,
* which keeps the streaming lib happy by minimizing out-of-order delivery.
* - The last reply requested cache ensures that a reply is requested every so often,
* so that failed tunnels are recognized.
*
*/
@ -629,17 +649,17 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
}
if (_lease != null) {
synchronized(_leaseCache) {
Lease l = (Lease) _leaseCache.get(key);
Lease l = _leaseCache.get(key);
if (l != null && l.equals(_lease))
_leaseCache.remove(key);
}
}
if (_outTunnel != null) {
synchronized(_tunnelCache) {
TunnelInfo t =(TunnelInfo) _backloggedTunnelCache.get(key);
TunnelInfo t = _backloggedTunnelCache.get(key);
if (t != null && t.equals(_outTunnel))
_backloggedTunnelCache.remove(key);
t = (TunnelInfo) _tunnelCache.get(key);
t = _tunnelCache.get(key);
if (t != null && t.equals(_outTunnel))
_tunnelCache.remove(key);
}
@ -652,17 +672,12 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
*/
private static void cleanLeaseSetCache(RouterContext ctx, HashMap tc) {
long now = ctx.clock().now();
List deleteList = new ArrayList();
for (Iterator iter = tc.entrySet().iterator(); iter.hasNext(); ) {
Map.Entry entry = (Map.Entry)iter.next();
String k = (String) entry.getKey();
LeaseSet l = (LeaseSet) entry.getValue();
if (l.getEarliestLeaseDate() < now)
deleteList.add(k);
}
for (Iterator iter = deleteList.iterator(); iter.hasNext(); ) {
String k = (String) iter.next();
tc.remove(k);
iter.remove();
}
}
@ -671,17 +686,12 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
* Caller must synchronize on tc.
*/
private static void cleanLeaseCache(HashMap tc) {
List deleteList = new ArrayList();
for (Iterator iter = tc.entrySet().iterator(); iter.hasNext(); ) {
Map.Entry entry = (Map.Entry)iter.next();
String k = (String) entry.getKey();
Lease l = (Lease) entry.getValue();
if (l.isExpired(Router.CLOCK_FUDGE_FACTOR))
deleteList.add(k);
}
for (Iterator iter = deleteList.iterator(); iter.hasNext(); ) {
String k = (String) iter.next();
tc.remove(k);
iter.remove();
}
}
@ -690,17 +700,25 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
* Caller must synchronize on tc.
*/
private static void cleanTunnelCache(RouterContext ctx, HashMap tc) {
List deleteList = new ArrayList();
for (Iterator iter = tc.entrySet().iterator(); iter.hasNext(); ) {
Map.Entry entry = (Map.Entry)iter.next();
String k = (String) entry.getKey();
TunnelInfo tunnel = (TunnelInfo) entry.getValue();
if (!ctx.tunnelManager().isValidTunnel(sourceFromHashPair(k), tunnel))
deleteList.add(k);
iter.remove();
}
for (Iterator iter = deleteList.iterator(); iter.hasNext(); ) {
String k = (String) iter.next();
tc.remove(k);
}
/**
* Clean out old reply times
* Caller must synchronize on tc.
*/
private static void cleanReplyCache(RouterContext ctx, HashMap tc) {
long now = ctx.clock().now();
for (Iterator iter = tc.values().iterator(); iter.hasNext(); ) {
Long l = (Long) iter.next();
if (l.longValue() < now - CLEAN_INTERVAL)
iter.remove();
}
}
@ -720,6 +738,9 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
cleanTunnelCache(_ctx, _tunnelCache);
cleanTunnelCache(_ctx, _backloggedTunnelCache);
}
synchronized(_lastReplyRequestCache) {
cleanReplyCache(_ctx, _lastReplyRequestCache);
}
}
}
@ -731,8 +752,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
* Key the caches on the source+dest pair.
*
*/
private static HashMap _tunnelCache = new HashMap();
private static HashMap _backloggedTunnelCache = new HashMap();
private static HashMap<String, TunnelInfo> _tunnelCache = new HashMap();
private static HashMap<String, TunnelInfo> _backloggedTunnelCache = new HashMap();
private TunnelInfo selectOutboundTunnel(Destination to) {
TunnelInfo tunnel;
long now = getContext().clock().now();
@ -743,7 +764,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
* if you were the originator by backlogging the tunnel, then removing the
* backlog and seeing if traffic came back or not.
*/
tunnel = (TunnelInfo) _backloggedTunnelCache.get(hashPair());
tunnel = _backloggedTunnelCache.get(hashPair());
if (tunnel != null) {
if (getContext().tunnelManager().isValidTunnel(_from.calculateHash(), tunnel)) {
if (!getContext().commSystem().isBacklogged(tunnel.getPeer(1))) {
@ -758,7 +779,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
_backloggedTunnelCache.remove(hashPair());
}
// Use the same tunnel unless backlogged
tunnel = (TunnelInfo) _tunnelCache.get(hashPair());
tunnel = _tunnelCache.get(hashPair());
if (tunnel != null) {
if (getContext().tunnelManager().isValidTunnel(_from.calculateHash(), tunnel)) {
if (tunnel.getLength() <= 1 || !getContext().commSystem().isBacklogged(tunnel.getPeer(1)))