* SessionKeyManager, OCMOSJ, Garlic:

- Enable per-client SessionKeyManagers for better anonymity
      - tagsDelivered() now means tags are sent, not acked.
      - OCMOSJ uses the new TagSetHandle object returned from tagsDelivered()
        to call tagsAcked() or failTags() as appropriate.
      - Assume tags delivered on an established session to
        reduce streaming lib stalls caused by massive tag deliveries;
        should increase throughput and window sizes on long-lived streams
      - Unacked tagsets on a new session are stored on a separate list
      - Don't kill an OB Session just because it's temporarily out of tags
      - Increase min tag threshold to 30 (was 20) due to new speculative
        tags delivered scheme, and to increase effective max window
      - More Java 5 and dead code cleanups, and more comments and javadoc,
        debug logging cleanups
This commit is contained in:
zzz
2009-08-30 16:27:03 +00:00
parent 15f0cda41f
commit e0f1047d72
8 changed files with 346 additions and 100 deletions

View File

@ -95,7 +95,8 @@ public class SessionKeyManager {
* method after receiving an ack to a message delivering them)
*
*/
public void tagsDelivered(PublicKey target, SessionKey key, Set<SessionTag> sessionTags) { // nop
public TagSetHandle tagsDelivered(PublicKey target, SessionKey key, Set<SessionTag> sessionTags) { // nop
return null;
}
/**
@ -134,4 +135,6 @@ public class SessionKeyManager {
}
public void renderStatusHTML(Writer out) throws IOException {}
public void failTags(PublicKey target, SessionKey key, TagSetHandle ts) {}
public void tagsAcked(PublicKey target, SessionKey key, TagSetHandle ts) {}
}

View File

@ -0,0 +1,8 @@
package net.i2p.crypto;
/**
* An opaque handle to a TagSet returned by the SessionKeyManager,
* so that OCMOSJ can report that the tags were later acked, or not.
*
*/
public interface TagSetHandle {}

View File

@ -19,6 +19,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.TreeSet;
@ -36,6 +37,41 @@ import net.i2p.util.SimpleTimer;
* to disk). However, this being java, we cannot guarantee that the keys aren't swapped
* out to disk so this should not be considered secure in that sense.
*
* The outbound and inbound sides are completely independent, each with
* their own keys and tags.
*
* For a new session, outbound tags are not considered delivered until an ack is received.
* Otherwise, the loss of the first message would render all subsequent messages
* undecryptable. True?
*
* For an existing session, outbound tags are immediately considered delivered, and are
* later revoked if the ack times out. This prevents massive stream slowdown caused by
* repeated tag delivery after the minimum tag threshold is reached. Included tags
* pushes messages above the ideal 1956 size by ~2KB and causes excessive fragmentation
* and padding. As the tags are not seen by the streaming lib, they aren't accounted
* for in the window size, and one or more of a series of large messages is likely to be dropped,
* either due to high fragmentation or drop priorites at the tunnel OBEP.
*
* For this to work, the minimum tag threshold and tag delivery quanitity defined in
* GarlicMessageBuilder must be chosen with streaming lib windows sizes in mind.
* If a single TagSet is not delivered, there will be no stall as long as the
* current window size is smaller than the minimum tag threshold.
* A second consecutive TagSet delivery failure will cause a complete stall, as
* all subsequent messages will fail to decrypt.
* See ConnectionOptions in streaming for more information.
*
* There are large inefficiencies caused by the repeated delivery of tags in a new session.
* With an initial streaming window size of 6 and 40 tags per delivery, a web server
* would deliver up to 240 tags (7680 bytes, not including bundled leaseset, etc.)
* in the first volley of the response.
*
* Could the two directions be linked somehow, such that the initial request could
* contain a key or tags for the response?
*
* Should the tag threshold and quantity be adaptive?
*
* Todo: Switch to ConcurrentHashMaps and ReadWriteLocks, only get write lock during cleanup
*
*/
public class TransientSessionKeyManager extends SessionKeyManager {
private Log _log;
@ -126,6 +162,7 @@ public class TransientSessionKeyManager extends SessionKeyManager {
}
/* FIXME Exporting non-public type through public API */
/****** leftover from when we had the persistent SKM
protected void setData(Set<TagSet> inboundTagSets, Set<OutboundSession> outboundSessions) {
if (_log.shouldLog(Log.INFO))
_log.info("Loading " + inboundTagSets.size() + " inbound tag sets, and "
@ -152,6 +189,7 @@ public class TransientSessionKeyManager extends SessionKeyManager {
_outboundSessions.putAll(sessions);
}
}
******/
/**
* Retrieve the session key currently associated with encryption to the target,
@ -179,13 +217,10 @@ public class TransientSessionKeyManager extends SessionKeyManager {
* Associate a new session key with the specified target. Metrics to determine
* when to expire that key begin with this call.
*
* Unused except in tests?
*/
@Override
public void createSession(PublicKey target, SessionKey key) {
OutboundSession sess = new OutboundSession(target);
sess.setCurrentKey(key);
addSession(sess);
createAndReturnSession(target, key);
}
/**
@ -218,7 +253,7 @@ public class TransientSessionKeyManager extends SessionKeyManager {
if (sess.getCurrentKey().equals(key)) {
SessionTag nxt = sess.consumeNext();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Tag consumed: " + nxt + " with key: " + key.toBase64());
_log.debug("OB Tag consumed: " + nxt + " with: " + key);
return nxt;
}
if (_log.shouldLog(Log.DEBUG))
@ -261,23 +296,31 @@ public class TransientSessionKeyManager extends SessionKeyManager {
/**
* Take note of the fact that the given sessionTags associated with the key for
* encryption to the target have definitely been received at the target (aka call this
* method after receiving an ack to a message delivering them)
* encryption to the target have been sent. Whether to use the tags immediately
* (i.e. assume they will be received) or to wait until an ack, is implementation dependent.
*
* Here, we wait for the ack if the session is new, otherwise we use right away.
* Will this work???
* If the tags are pipelined sufficiently, it will.
*
* @return the TagSetHandle. Caller MUST subsequently call failTags() or tagsAcked()
* with this handle.
*/
@Override
public void tagsDelivered(PublicKey target, SessionKey key, Set sessionTags) {
public TagSetHandle tagsDelivered(PublicKey target, SessionKey key, Set<SessionTag> sessionTags) {
if (_log.shouldLog(Log.DEBUG)) {
//_log.debug("Tags delivered to set " + set + " on session " + sess);
if (sessionTags.size() > 0)
_log.debug("Tags delivered: " + sessionTags.size() + " for key: " + key.toBase64() + ": " + sessionTags);
_log.debug("Tags delivered: " + sessionTags.size() + " for key: " + key + ": " + sessionTags);
}
OutboundSession sess = getSession(target);
if (sess == null)
sess = createAndReturnSession(target, key);
sess.setCurrentKey(key);
else
sess.setCurrentKey(key);
TagSet set = new TagSet(sessionTags, key, _context.clock().now());
sess.addTags(set);
return set;
}
/**
@ -285,12 +328,44 @@ public class TransientSessionKeyManager extends SessionKeyManager {
* has failed to respond when they should have. This call essentially lets the system recover
* from corrupted tag sets and crashes
*
* @deprecated unused and rather drastic
*/
@Override
public void failTags(PublicKey target) {
removeSession(target);
}
/**
* Mark these tags as invalid, since the peer
* has failed to ack them in time.
*/
@Override
public void failTags(PublicKey target, SessionKey key, TagSetHandle ts) {
OutboundSession sess = getSession(target);
if (sess == null)
return;
if(!key.equals(sess.getCurrentKey()))
return;
sess.failTags((TagSet)ts);
if (_log.shouldLog(Log.DEBUG))
_log.debug("TagSet failed: " + ts);
}
/**
* Mark these tags as acked, start to use them (if we haven't already)
*/
@Override
public void tagsAcked(PublicKey target, SessionKey key, TagSetHandle ts) {
OutboundSession sess = getSession(target);
if (sess == null)
return;
if(!key.equals(sess.getCurrentKey()))
return;
sess.ackTags((TagSet)ts);
if (_log.shouldLog(Log.DEBUG))
_log.debug("TagSet acked: " + ts);
}
/**
* Accept the given tags and associate them with the given key for decryption
*
@ -304,9 +379,9 @@ public class TransientSessionKeyManager extends SessionKeyManager {
for (Iterator<SessionTag> iter = sessionTags.iterator(); iter.hasNext();) {
SessionTag tag = iter.next();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receiving tag " + tag + " for key " + key.toBase64() + " / " + key.toString() + ": tagSet: " + tagSet);
_log.debug("Receiving tag " + tag + " for key " + key + ": tagSet: " + tagSet);
synchronized (_inboundTagSets) {
old = (TagSet)_inboundTagSets.put(tag, tagSet);
old = _inboundTagSets.put(tag, tagSet);
overage = _inboundTagSets.size() - MAX_INBOUND_SESSION_TAGS;
if (old != null) {
if (!old.getAssociatedKey().equals(tagSet.getAssociatedKey())) {
@ -334,9 +409,9 @@ public class TransientSessionKeyManager extends SessionKeyManager {
}
if (_log.shouldLog(Log.WARN)) {
_log.warn("Multiple tags matching! tagSet: " + tagSet + " and old tagSet: " + old + " tag: " + dupTag + "/" + dupTag.toBase64());
_log.warn("Earlier tag set creation: " + old + ": key=" + old.getAssociatedKey().toBase64(), old.getCreatedBy());
_log.warn("Current tag set creation: " + tagSet + ": key=" + tagSet.getAssociatedKey().toBase64(), tagSet.getCreatedBy());
_log.warn("Multiple tags matching! tagSet: " + tagSet + " and old tagSet: " + old + " tag: " + dupTag + "/" + dupTag);
_log.warn("Earlier tag set creation: " + old + ": key=" + old.getAssociatedKey());
_log.warn("Current tag set creation: " + tagSet + ": key=" + tagSet.getAssociatedKey());
}
}
@ -410,26 +485,26 @@ public class TransientSessionKeyManager extends SessionKeyManager {
*/
@Override
public SessionKey consumeTag(SessionTag tag) {
if (false) aggressiveExpire();
//if (false) aggressiveExpire();
synchronized (_inboundTagSets) {
TagSet tagSet = (TagSet) _inboundTagSets.remove(tag);
if (tagSet == null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Cannot consume tag " + tag + " as it is not known");
_log.debug("Cannot consume IB " + tag + " as it is not known");
return null;
}
tagSet.consume(tag);
SessionKey key = tagSet.getAssociatedKey();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Consuming tag " + tag.toString() + " for sessionKey " + key.toBase64() + " / " + key.toString() + " on tagSet: " + tagSet);
_log.debug("Consuming IB " + tag + " for " + key + " on: " + tagSet);
return key;
}
}
private OutboundSession getSession(PublicKey target) {
synchronized (_outboundSessions) {
return (OutboundSession) _outboundSessions.get(target);
return _outboundSessions.get(target);
}
}
@ -443,7 +518,7 @@ public class TransientSessionKeyManager extends SessionKeyManager {
if (target == null) return;
OutboundSession session = null;
synchronized (_outboundSessions) {
session = (OutboundSession)_outboundSessions.remove(target);
session = _outboundSessions.remove(target);
}
if ( (session != null) && (_log.shouldLog(Log.WARN)) )
_log.warn("Removing session tags with " + session.availableTags() + " available for "
@ -461,11 +536,11 @@ public class TransientSessionKeyManager extends SessionKeyManager {
int remaining = 0;
long now = _context.clock().now();
StringBuilder buf = null;
StringBuilder bufSummary = null;
//StringBuilder bufSummary = null;
if (_log.shouldLog(Log.DEBUG)) {
buf = new StringBuilder(128);
buf.append("Expiring inbound: ");
bufSummary = new StringBuilder(1024);
//bufSummary = new StringBuilder(1024);
}
synchronized (_inboundTagSets) {
for (Iterator<SessionTag> iter = _inboundTagSets.keySet().iterator(); iter.hasNext();) {
@ -477,10 +552,10 @@ public class TransientSessionKeyManager extends SessionKeyManager {
iter.remove();
removed++;
if (buf != null)
buf.append(tag.toString()).append(" @ age ").append(DataHelper.formatDuration(age));
} else if (false && (bufSummary != null) ) {
bufSummary.append("\nTagSet: " + ts.toString() + ", key: " + ts.getAssociatedKey().toBase64()+"/" + ts.getAssociatedKey().toString()
+ ": tag: " + tag.toString());
buf.append(tag).append(" @ age ").append(DataHelper.formatDuration(age));
//} else if (false && (bufSummary != null) ) {
// bufSummary.append("\nTagSet: " + ts + ", key: " + ts.getAssociatedKey()
// + ": tag: " + tag);
}
}
remaining = _inboundTagSets.size();
@ -488,8 +563,8 @@ public class TransientSessionKeyManager extends SessionKeyManager {
_context.statManager().addRateData("crypto.sessionTagsRemaining", remaining, 0);
if ( (buf != null) && (removed > 0) )
_log.debug(buf.toString());
if (bufSummary != null)
_log.debug("Cleaning up with remaining: " + bufSummary.toString());
//if (bufSummary != null)
// _log.debug("Cleaning up with remaining: " + bufSummary.toString());
//_log.warn("Expiring tags: [" + tagsToDrop + "]");
@ -498,9 +573,11 @@ public class TransientSessionKeyManager extends SessionKeyManager {
PublicKey key = iter.next();
OutboundSession sess = _outboundSessions.get(key);
removed += sess.expireTags();
if (sess.availableTags() <= 0) {
// don't kill a new session or one that's temporarily out of tags
if (sess.getLastUsedDate() < now - (SESSION_LIFETIME_MAX_MS / 2) &&
sess.availableTags() <= 0) {
iter.remove();
removed++;
removed++; // just to have a non-zero return value?
}
}
}
@ -563,7 +640,7 @@ public class TransientSessionKeyManager extends SessionKeyManager {
int size = ts.getTags().size();
total += size;
buf.append("<li><b>Sent:</b> ").append(DataHelper.formatDuration(now - ts.getDate())).append(" ago with ");
buf.append(size).append(" tags remaining</li>");
buf.append(size).append(" tags remaining; acked? ").append(ts.getAcked()).append("</li>");
}
buf.append("</ul></td></tr>\n");
out.write(buf.toString());
@ -580,18 +657,27 @@ public class TransientSessionKeyManager extends SessionKeyManager {
* Just for the HTML method above so we can see what's going on easier
* Earliest first
*/
private class TagSetComparator implements Comparator {
private static class TagSetComparator implements Comparator {
public int compare(Object l, Object r) {
return (int) (((TagSet)l).getDate() - ((TagSet)r).getDate());
}
}
class OutboundSession {
private class OutboundSession {
private PublicKey _target;
private SessionKey _currentKey;
private long _established;
private long _lastUsed;
/** before the first ack, all tagsets go here. These are never expired, we rely
on the callers to call failTags() or ackTags() to remove them from this list. */
private /* FIXME final FIXME */ List<TagSet> _unackedTagSets;
/**
* As tagsets are acked, they go here.
* After the first ack, new tagsets go here (i.e. presumed acked)
*/
private /* FIXME final FIXME */ List<TagSet> _tagSets;
/** set to true after first tagset is acked */
private boolean _acked;
public OutboundSession(PublicKey target) {
this(target, null, _context.clock().now(), _context.clock().now(), new ArrayList());
@ -602,14 +688,43 @@ public class TransientSessionKeyManager extends SessionKeyManager {
_currentKey = curKey;
_established = established;
_lastUsed = lastUsed;
_tagSets = tagSets;
_unackedTagSets = tagSets;
_tagSets = new ArrayList();
}
/** list of TagSet objects */
/**
* @return list of TagSet objects
* This is used only by renderStatusHTML().
* It includes both acked and unacked TagSets.
*/
List<TagSet> getTagSets() {
synchronized (_tagSets) {
return new ArrayList(_tagSets);
List<TagSet> rv;
synchronized (_unackedTagSets) {
rv = new ArrayList(_unackedTagSets);
}
synchronized (_tagSets) {
rv.addAll(_tagSets);
}
return rv;
}
/**
* got an ack for these tags
* For tagsets delivered after the session was acked, this is a nop
* because the tagset was originally placed directly on the acked list.
*/
void ackTags(TagSet set) {
if (_unackedTagSets.remove(set)) {
_tagSets.add(set);
_acked = true;
}
set.setAcked();
}
/** didn't get an ack for these tags */
void failTags(TagSet set) {
_unackedTagSets.remove(set);
_tagSets.remove(set);
}
public PublicKey getTarget() {
@ -656,7 +771,7 @@ public class TransientSessionKeyManager extends SessionKeyManager {
int removed = 0;
synchronized (_tagSets) {
for (int i = 0; i < _tagSets.size(); i++) {
TagSet set = (TagSet) _tagSets.get(i);
TagSet set = _tagSets.get(i);
if (set.getDate() + SESSION_TAG_DURATION_MS <= now) {
_tagSets.remove(i);
i--;
@ -672,7 +787,7 @@ public class TransientSessionKeyManager extends SessionKeyManager {
_lastUsed = now;
synchronized (_tagSets) {
while (_tagSets.size() > 0) {
TagSet set = (TagSet) _tagSets.get(0);
TagSet set = _tagSets.get(0);
if (set.getDate() + SESSION_TAG_DURATION_MS > now) {
SessionTag tag = set.consumeNext();
if (tag != null) return tag;
@ -686,12 +801,13 @@ public class TransientSessionKeyManager extends SessionKeyManager {
return null;
}
/** @return the total number of tags in acked TagSets */
public int availableTags() {
int tags = 0;
long now = _context.clock().now();
synchronized (_tagSets) {
for (int i = 0; i < _tagSets.size(); i++) {
TagSet set = (TagSet) _tagSets.get(i);
TagSet set = _tagSets.get(i);
if (set.getDate() + SESSION_TAG_DURATION_MS > now)
tags += set.getTags().size();
}
@ -719,19 +835,31 @@ public class TransientSessionKeyManager extends SessionKeyManager {
return -1;
}
/**
* If the session has never been acked, put the TagSet on the unacked list.
* Otherwise, consider it good right away.
*/
public void addTags(TagSet set) {
_lastUsed = _context.clock().now();
synchronized (_tagSets) {
_tagSets.add(set);
if (_acked) {
synchronized (_tagSets) {
_tagSets.add(set);
}
} else {
synchronized (_unackedTagSets) {
_unackedTagSets.add(set);
}
}
}
}
static class TagSet {
private static class TagSet implements TagSetHandle {
private Set<SessionTag> _sessionTags;
private SessionKey _key;
private long _date;
private Exception _createdBy;
//private Exception _createdBy;
/** only used in renderStatusHTML() for debugging */
private boolean _acked;
public TagSet(Set<SessionTag> tags, SessionKey key, long date) {
if (key == null) throw new IllegalArgumentException("Missing key");
@ -739,12 +867,12 @@ public class TransientSessionKeyManager extends SessionKeyManager {
_sessionTags = tags;
_key = key;
_date = date;
if (true) {
long now = I2PAppContext.getGlobalContext().clock().now();
_createdBy = new Exception("Created by: key=" + _key.toBase64() + " on "
+ new Date(now) + "/" + now
+ " via " + Thread.currentThread().getName());
}
//if (true) {
// long now = I2PAppContext.getGlobalContext().clock().now();
// _createdBy = new Exception("Created by: key=" + _key.toBase64() + " on "
// + new Date(now) + "/" + now
// + " via " + Thread.currentThread().getName());
//}
}
/** when the tag set was created */
@ -770,22 +898,26 @@ public class TransientSessionKeyManager extends SessionKeyManager {
}
public void consume(SessionTag tag) {
if (contains(tag)) {
_sessionTags.remove(tag);
}
_sessionTags.remove(tag);
}
/** let's do this without counting the elements first */
public SessionTag consumeNext() {
if (_sessionTags.size() <= 0) {
SessionTag first;
try {
first = _sessionTags.iterator().next();
} catch (NoSuchElementException nsee) {
return null;
}
SessionTag first = (SessionTag) _sessionTags.iterator().next();
_sessionTags.remove(first);
return first;
}
public Exception getCreatedBy() { return _createdBy; }
//public Exception getCreatedBy() { return _createdBy; }
public void setAcked() { _acked = true; }
/** only used in renderStatusHTML() for debugging */
public boolean getAcked() { return _acked; }
@Override
public int hashCode() {
@ -800,9 +932,19 @@ public class TransientSessionKeyManager extends SessionKeyManager {
public boolean equals(Object o) {
if ((o == null) || !(o instanceof TagSet)) return false;
TagSet ts = (TagSet) o;
return DataHelper.eq(ts.getAssociatedKey(), getAssociatedKey())
return DataHelper.eq(ts.getAssociatedKey(), _key)
//&& DataHelper.eq(ts.getTags(), getTags())
&& ts.getDate() == getDate();
&& ts.getDate() == _date;
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder(256);
buf.append("TagSet established: ").append(new Date(_date));
buf.append(" Session key: ").append(_key.toBase64());
buf.append(" Size: ").append(_sessionTags.size());
buf.append(" Acked? ").append(_acked);
return buf.toString();
}
}
}