* Outbound message:

- Fix a couple of tunnel cache cleaning bugs
      - Cache based on source+dest pairs rather than just dest
      - Send the reply leaseSet only when necessary,
        rather than all the time (big savings in overhead)
      - Enable persistent lease selection again
      - Logging tweaks
This commit is contained in:
zzz
2008-05-05 14:01:22 +00:00
parent b1af22a15e
commit a6f3478db3

View File

@ -10,6 +10,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import net.i2p.data.Base64;
import net.i2p.data.Certificate;
import net.i2p.data.Destination;
import net.i2p.data.Hash;
@ -48,7 +49,6 @@ import net.i2p.util.Log;
public class OutboundClientMessageOneShotJob extends JobImpl {
private Log _log;
private long _overallExpiration;
private boolean _shouldBundle;
private ClientMessage _clientMessage;
private MessageId _clientMessageId;
private int _clientMessageSize;
@ -147,7 +147,6 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
_start = getContext().clock().now();
_overallExpiration = timeoutMs + _start;
_shouldBundle = getShouldBundle();
_finished = false;
}
@ -177,30 +176,89 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
}
}
private boolean getShouldBundle() {
Properties opts = _clientMessage.getSenderConfig().getOptions();
String wantBundle = opts.getProperty(BUNDLE_REPLY_LEASESET, "true");
if ("true".equals(wantBundle)) {
int probability = BUNDLE_PROBABILITY_DEFAULT;
String str = opts.getProperty(BUNDLE_PROBABILITY);
try {
if (str != null)
probability = Integer.parseInt(str);
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn(getJobId() + ": Bundle leaseSet probability overridden incorrectly ["
+ str + "]", nfe);
/**
* Returns the reply lease set if forced to do so,
* or if configured to do so,
* or if a certain percentage of the time if configured to do so,
* or if our lease set has changed since we last talked to them,
* or 10% of the time anyway so they don't forget us (disabled for now),
* or null otherwise.
*
* Note that wantACK randomly forces us another 5% of the time.
*
* We don't want to do this too often as a typical 2-lease leaseset
* in a DatabaseStoreMessage is 861+37=898 bytes -
* when added to garlic it's a 1056-byte addition total, which is huge.
*
* Key the cache on the source+dest pair.
*/
private static HashMap _leaseSetCache = new HashMap();
private static long _lscleanTime = 0;
private LeaseSet getReplyLeaseSet(boolean force) {
LeaseSet newLS = getContext().netDb().lookupLeaseSetLocally(_from.calculateHash());
if (newLS == null)
return null; // punt
if (!force) {
// Don't send it every time unless configured to; default=false
Properties opts = _clientMessage.getSenderConfig().getOptions();
String wantBundle = opts.getProperty(BUNDLE_REPLY_LEASESET, "false");
if ("true".equals(wantBundle)) {
int probability = BUNDLE_PROBABILITY_DEFAULT;
String str = opts.getProperty(BUNDLE_PROBABILITY);
try {
if (str != null)
probability = Integer.parseInt(str);
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn(getJobId() + ": Bundle leaseSet probability overridden incorrectly ["
+ str + "]", nfe);
}
if (probability >= 100)
return newLS; // do this every time so don't worry about cache
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Bundle leaseSet probability is " + probability);
if (probability >= getContext().random().nextInt(100))
force = true; // just add newLS to cache below and return
// fall through to cache check and add
}
if (probability >= 100)
return true;
_log.error(getJobId() + ": Bundle leaseSet probability is " + probability);
if (probability >= getContext().random().nextInt(100))
return true;
else
return false;
} else {
return false;
}
// If the last leaseSet we sent him is still good, don't bother sending again
long now = getContext().clock().now();
synchronized (_leaseSetCache) {
if (now - _lscleanTime > 5*60*1000) { // clean out periodically
cleanLeaseSetCache(_leaseSetCache);
_lscleanTime = now;
}
if (!force) {
LeaseSet ls = (LeaseSet) _leaseSetCache.get(hashPair());
if (ls != null) {
if (ls.equals(newLS)) {
// still good, send it 10% of the time
// sendACK does 5% random which forces us, good enough
//if (10 >= getContext().random().nextInt(100)) {
// if (_log.shouldLog(Log.INFO))
// _log.info("Found in cache - including reply leaseset for " + _toString);
// return ls;
//} else {
if (_log.shouldLog(Log.INFO))
_log.info("Found in cache - NOT including reply leaseset for " + _toString);
return null;
//}
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Expired from cache - reply leaseset for " + _toString);
// will get overwritten below
// _leaseSetCache.remove(hashPair());
}
}
}
_leaseSetCache.put(hashPair(), newLS);
}
if (_log.shouldLog(Log.WARN))
_log.warn("Added to cache - reply leaseset for " + _toString);
return newLS;
}
/** send a message to a lease */
@ -228,9 +286,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
/**
* Use the same inbound tunnel (i.e. lease) as we did for the same destination previously,
* if possible, to keep the streaming lib happy
* Key the cache just on the dest, not on source+dest, as different sources
* simultaneously talking to the same dest is probably rare enough
* to not bother separating out.
* Key the caches on the source+dest pair.
*
* We're going to use the lease until it expires, as long as it remains in the current leaseSet.
*
@ -251,15 +307,14 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
}
long now = getContext().clock().now();
/*** removed until we fix SSU reachability
// Use the same lease if it's still good
// Even if _leaseSet changed, _leaseSet.getEncryptionKey() didn't...
synchronized (_leaseCache) {
if (now - _cleanTime > 5*60*1000) { // clean out periodically
if (now - _lcleanTime > 5*60*1000) { // clean out periodically
cleanLeaseCache(_leaseCache);
_cleanTime = now;
_lcleanTime = now;
}
_lease = (Lease) _leaseCache.get(_to);
_lease = (Lease) _leaseCache.get(hashPair());
if (_lease != null) {
// if outbound tunnel length == 0 && lease.firsthop.isBacklogged() don't use it ??
if (!_lease.isExpired(Router.CLOCK_FUDGE_FACTOR)) {
@ -279,7 +334,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
_leaseCache.remove(_to);
}
}
***/
// get the possible leases
List leases = new ArrayList(_leaseSet.getLeaseCount());
for (int i = 0; i < _leaseSet.getLeaseCount(); i++) {
@ -345,13 +400,11 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
if (_log.shouldLog(Log.WARN))
_log.warn(getJobId() + ": All leases are unreachable for " + _toString);
}
/*** removed until we fix SSU reachability
synchronized (_leaseCache) {
_leaseCache.put(_to, _lease);
_leaseCache.put(hashPair(), _lease);
}
if (_log.shouldLog(Log.WARN))
_log.warn("Added to cache - lease for " + _toString);
***/
return true;
}
@ -400,15 +453,15 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
if ( (existingTags > 30) && (getContext().random().nextInt(100) >= 5) )
wantACK = false;
long token = (wantACK ? getContext().random().nextLong(I2NPMessage.MAX_ID_VALUE) : -1);
PublicKey key = _leaseSet.getEncryptionKey();
SessionKey sessKey = new SessionKey();
Set tags = new HashSet();
LeaseSet replyLeaseSet = null;
if (_shouldBundle) {
replyLeaseSet = getContext().netDb().lookupLeaseSetLocally(_from.calculateHash());
}
// If we want an ack, bundle a leaseSet... (so he can get back to us)
LeaseSet replyLeaseSet = getReplyLeaseSet(wantACK);
// ... and vice versa (so we know he got it)
if (replyLeaseSet != null)
wantACK = true;
long token = (wantACK ? getContext().random().nextLong(I2NPMessage.MAX_ID_VALUE) : -1);
if (wantACK)
_inTunnel = selectInboundTunnel();
@ -511,6 +564,91 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
}
}
/**
* This is the place where we make I2P go fast.
*
* We have four static caches.
* - The LeaseSet cache is used to decide whether to bundle our own leaseset,
* which minimizes overhead.
* - The Lease cache is used to persistently send to the same lease for the destination,
* which keeps the streaming lib happy by minimizing out-of-order delivery.
* - The Tunnel and BackloggedTunnel caches are used to persistently use the same outbound tunnel
* for the same destination,
* which keeps the streaming lib happy by minimizing out-of-order delivery.
*
*/
/**
* String used to cache things with based on source + dest
*/
private String _hashPair;
private String hashPair() {
if (_hashPair == null)
_hashPair = _from.calculateHash().toBase64() + _to.calculateHash().toBase64();
return _hashPair;
}
/**
* This is a little sneaky, but get the _from back out of the "opaque" hash key
* (needed for cleanTunnelCache)
* 44 = 32 * 4 / 3
*/
private Hash sourceFromHashPair(String s) {
return new Hash(Base64.decode(s.substring(0, 44)));
}
/**
* Called on failure to give us a better chance of success next time.
* Of course this is probably 60s too late.
* And we could pick the bad ones at random again.
* Or remove entries that were sent and succeeded after this was sent but before this failed.
* But it's a start.
*/
private void clearCaches() {
String key = hashPair();
if (_inTunnel != null) { // if we wanted an ack, we sent our lease too
synchronized(_leaseSetCache) {
_leaseSetCache.remove(key);
}
}
if (_lease != null) {
synchronized(_leaseCache) {
Lease l = (Lease) _leaseCache.get(key);
if (l != null && l.equals(_lease))
_leaseCache.remove(key);
}
}
if (_outTunnel != null) {
synchronized(_tunnelCache) {
TunnelInfo t =(TunnelInfo) _backloggedTunnelCache.get(key);
if (t != null && t.equals(_outTunnel))
_backloggedTunnelCache.remove(key);
t = (TunnelInfo) _tunnelCache.get(key);
if (t != null && t.equals(_outTunnel))
_tunnelCache.remove(key);
}
}
}
/**
* Clean out old leaseSets from a set.
* Caller must synchronize on tc.
*/
private void cleanLeaseSetCache(HashMap tc) {
long now = getContext().clock().now();
List deleteList = new ArrayList();
for (Iterator iter = tc.keySet().iterator(); iter.hasNext(); ) {
String k = (String) iter.next();
LeaseSet l = (LeaseSet) tc.get(k);
if (l.getEarliestLeaseDate() < now)
deleteList.add(k);
}
for (Iterator iter = deleteList.iterator(); iter.hasNext(); ) {
String k = (String) iter.next();
tc.remove(k);
}
}
/**
* Clean out old leases from a set.
* Caller must synchronize on tc.
@ -518,14 +656,14 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
private void cleanLeaseCache(HashMap tc) {
List deleteList = new ArrayList();
for (Iterator iter = tc.keySet().iterator(); iter.hasNext(); ) {
Destination dest = (Destination) iter.next();
Lease l = (Lease) tc.get(dest);
String k = (String) iter.next();
Lease l = (Lease) tc.get(k);
if (l.isExpired(Router.CLOCK_FUDGE_FACTOR))
deleteList.add(dest);
deleteList.add(k);
}
for (Iterator iter = deleteList.iterator(); iter.hasNext(); ) {
Destination dest = (Destination) iter.next();
tc.remove(dest);
String k = (String) iter.next();
tc.remove(k);
}
}
@ -536,14 +674,14 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
private void cleanTunnelCache(HashMap tc) {
List deleteList = new ArrayList();
for (Iterator iter = tc.keySet().iterator(); iter.hasNext(); ) {
Destination dest = (Destination) iter.next();
TunnelInfo tunnel = (TunnelInfo) tc.get(dest);
if (!getContext().tunnelManager().isValidTunnel(_from.calculateHash(), tunnel))
deleteList.add(dest);
String k = (String) iter.next();
TunnelInfo tunnel = (TunnelInfo) tc.get(k);
if (!getContext().tunnelManager().isValidTunnel(sourceFromHashPair(k), tunnel))
deleteList.add(k);
}
for (Iterator iter = deleteList.iterator(); iter.hasNext(); ) {
Destination dest = (Destination) iter.next();
tc.remove(dest);
String k = (String) iter.next();
tc.remove(k);
}
}
@ -552,9 +690,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
* if possible, to keep the streaming lib happy
* Use two caches - although a cache of a list of tunnels per dest might be
* more elegant.
* Key the caches just on the dest, not on source+dest, as different sources
* simultaneously talking to the same dest is probably rare enough
* to not bother separating out.
* Key the caches on the source+dest pair.
*
*/
private static HashMap _tunnelCache = new HashMap();
@ -575,21 +711,21 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
* if you were the originator by backlogging the tunnel, then removing the
* backlog and seeing if traffic came back or not.
*/
tunnel = (TunnelInfo) _backloggedTunnelCache.get(to);
tunnel = (TunnelInfo) _backloggedTunnelCache.get(hashPair());
if (tunnel != null) {
if (getContext().tunnelManager().isValidTunnel(_from.calculateHash(), tunnel)) {
if (!getContext().commSystem().isBacklogged(tunnel.getPeer(1))) {
if (_log.shouldLog(Log.WARN))
_log.warn("Switching back to tunnel " + tunnel + " for " + _toString);
_backloggedTunnelCache.remove(to);
_tunnelCache.put(to, tunnel);
_backloggedTunnelCache.remove(hashPair());
_tunnelCache.put(hashPair(), tunnel);
return tunnel;
} // else still backlogged
} else // no longer valid
_backloggedTunnelCache.remove(to);
_backloggedTunnelCache.remove(hashPair());
}
// Use the same tunnel unless backlogged
tunnel = (TunnelInfo) _tunnelCache.get(to);
tunnel = (TunnelInfo) _tunnelCache.get(hashPair());
if (tunnel != null) {
if (getContext().tunnelManager().isValidTunnel(_from.calculateHash(), tunnel)) {
if (tunnel.getLength() <= 1 || !getContext().commSystem().isBacklogged(tunnel.getPeer(1)))
@ -597,14 +733,14 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
// backlogged
if (_log.shouldLog(Log.WARN))
_log.warn("Switching from backlogged " + tunnel + " for " + _toString);
_backloggedTunnelCache.put(to, tunnel);
_backloggedTunnelCache.put(hashPair(), tunnel);
} // else no longer valid
_tunnelCache.remove(to);
_tunnelCache.remove(hashPair());
}
// Pick a new tunnel
tunnel = selectOutboundTunnel();
if (tunnel != null)
_tunnelCache.put(to, tunnel);
_tunnelCache.put(hashPair(), tunnel);
}
return tunnel;
}
@ -647,6 +783,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
getContext().statManager().addRateData("client.timeoutCongestionMessage", messageDelay, 1);
getContext().statManager().addRateData("client.timeoutCongestionInbound", inboundDelta, 1);
clearCaches();
getContext().messageHistory().sendPayloadMessage(_clientMessageId.getMessageId(), false, sendTime);
getContext().clientManager().messageDeliveryStatusUpdate(_from, _clientMessageId, false);
getContext().statManager().updateFrequency("client.sendMessageFailFrequency");
@ -704,22 +841,22 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
_pendingToken = token;
if (_log.shouldLog(Log.INFO))
_log.info(OutboundClientMessageOneShotJob.this.getJobId()
+ "Reply selector for client message: token=" + token);
+ ": Reply selector for client message: token=" + token);
}
public boolean continueMatching() {
if (_log.shouldLog(Log.DEBUG))
_log.debug(OutboundClientMessageOneShotJob.this.getJobId()
+ "dont continue matching for token=" + _pendingToken);
+ ": dont continue matching for token=" + _pendingToken);
return false;
}
public long getExpiration() { return _overallExpiration; }
public boolean isMatch(I2NPMessage inMsg) {
if (inMsg.getType() == DeliveryStatusMessage.MESSAGE_TYPE) {
if (_log.shouldLog(Log.INFO))
_log.info(OutboundClientMessageOneShotJob.this.getJobId()
+ "delivery status message received: " + inMsg + " our token: " + _pendingToken);
if (_log.shouldLog(Log.DEBUG))
_log.debug(OutboundClientMessageOneShotJob.this.getJobId()
+ ": delivery status message received: " + inMsg + " our token: " + _pendingToken);
return _pendingToken == ((DeliveryStatusMessage)inMsg).getMessageId();
} else {
return false;