* SessionKeyManager:

- Don't use unacked tagsets after consecutive ack failures
      and revert to full ElGamal if necessary (ticket #574)
    - Synchronize creation of new sessions to prevent dups
    - Don't remove an unacked session until it's really out of tags
    - Failsafe removal of old unacked tagsets
    - Cleanups, final, comments, log tweaks, debug.jsp tweaks, synchronization tweaks
This commit is contained in:
zzz
2012-03-08 17:48:19 +00:00
parent 1e978ea435
commit 97f402be0b
5 changed files with 263 additions and 90 deletions

View File

@ -38,15 +38,28 @@ public class SessionKeyManager {
* Retrieve the session key currently associated with encryption to the target,
* or null if a new session key should be generated.
*
* Warning - don't generate a new session if this returns null, it's racy, use getCurrentOrNewKey()
*/
public SessionKey getCurrentKey(PublicKey target) {
return null;
}
/**
* Retrieve the session key currently associated with encryption to the target.
* Generates a new session and session key if not previously exising.
*
* @return non-null
* @since 0.9
*/
public SessionKey getCurrentOrNewKey(PublicKey target) {
return null;
}
/**
* Associate a new session key with the specified target. Metrics to determine
* when to expire that key begin with this call.
*
* @deprecated racy
*/
public void createSession(PublicKey target, SessionKey key) { // nop
}
@ -54,6 +67,7 @@ public class SessionKeyManager {
/**
* Generate a new session key and associate it with the specified target.
*
* @deprecated racy
*/
public SessionKey createSession(PublicKey target) {
SessionKey key = KeyGenerator.getInstance().generateSessionKey();

View File

@ -22,6 +22,7 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
@ -74,13 +75,16 @@ import net.i2p.util.SimpleTimer;
*
*/
public class TransientSessionKeyManager extends SessionKeyManager {
private Log _log;
private final Log _log;
/** Map allowing us to go from the targeted PublicKey to the OutboundSession used */
private final Map<PublicKey, OutboundSession> _outboundSessions;
/** Map allowing us to go from a SessionTag to the containing TagSet */
private final Map<SessionTag, TagSet> _inboundTagSets;
protected I2PAppContext _context;
protected final I2PAppContext _context;
private volatile boolean _alive;
/** for debugging */
private final AtomicInteger _rcvTagSetID = new AtomicInteger();
private final AtomicInteger _sentTagSetID = new AtomicInteger();
/**
* Let session tags sit around for 10 minutes before expiring them. We can now have such a large
@ -119,7 +123,6 @@ public class TransientSessionKeyManager extends SessionKeyManager {
_alive = true;
SimpleScheduler.getInstance().addEvent(new CleanupEvent(), 60*1000);
}
private TransientSessionKeyManager() { this(null); }
@Override
public void shutdown() {
@ -192,6 +195,7 @@ public class TransientSessionKeyManager extends SessionKeyManager {
* Retrieve the session key currently associated with encryption to the target,
* or null if a new session key should be generated.
*
* Warning - don't generate a new session if this returns null, it's racy, use getCurrentOrNewKey()
*/
@Override
public SessionKey getCurrentKey(PublicKey target) {
@ -204,16 +208,42 @@ public class TransientSessionKeyManager extends SessionKeyManager {
+ new Date(sess.getEstablishedDate())
+ " but not used for "
+ (now-sess.getLastUsedDate())
+ "ms with target " + target);
+ "ms with target " + toString(target));
return null;
}
return sess.getCurrentKey();
}
/**
* Retrieve the session key currently associated with encryption to the target.
* Generates a new session and session key if not previously exising.
*
* @return non-null
* @since 0.9
*/
@Override
public SessionKey getCurrentOrNewKey(PublicKey target) {
synchronized (_outboundSessions) {
OutboundSession sess = _outboundSessions.get(target);
if (sess != null) {
long now = _context.clock().now();
if (sess.getLastUsedDate() < now - SESSION_LIFETIME_MAX_MS)
sess = null;
}
if (sess == null) {
SessionKey key = _context.keyGenerator().generateSessionKey();
sess = createAndReturnSession(target, key);
return key;
}
return sess.getCurrentKey();
}
}
/**
* Associate a new session key with the specified target. Metrics to determine
* when to expire that key begin with this call.
*
* @deprecated racy
*/
@Override
public void createSession(PublicKey target, SessionKey key) {
@ -226,7 +256,9 @@ public class TransientSessionKeyManager extends SessionKeyManager {
*
*/
private OutboundSession createAndReturnSession(PublicKey target, SessionKey key) {
OutboundSession sess = new OutboundSession(target);
if (_log.shouldLog(Log.INFO))
_log.info("New OB session, sesskey: " + key + " target: " + toString(target));
OutboundSession sess = new OutboundSession(_context, _log, target);
sess.setCurrentKey(key);
addSession(sess);
return sess;
@ -243,18 +275,19 @@ public class TransientSessionKeyManager extends SessionKeyManager {
public SessionTag consumeNextAvailableTag(PublicKey target, SessionKey key) {
OutboundSession sess = getSession(target);
if (sess == null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("No session for " + target);
if (_log.shouldLog(Log.WARN))
_log.warn("No session for " + toString(target));
return null;
}
if (sess.getCurrentKey().equals(key)) {
SessionTag nxt = sess.consumeNext();
if (_log.shouldLog(Log.DEBUG))
_log.debug("OB Tag consumed: " + nxt + " with: " + key);
// logged in OutboundSession
//if (nxt != null && _log.shouldLog(Log.DEBUG))
// _log.debug("OB Tag consumed: " + nxt + " with: " + key);
return nxt;
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Key does not match existing key, no tag");
if (_log.shouldLog(Log.WARN))
_log.warn("Key does not match existing key, no tag");
return null;
}
@ -305,18 +338,20 @@ public class TransientSessionKeyManager extends SessionKeyManager {
*/
@Override
public TagSetHandle tagsDelivered(PublicKey target, SessionKey key, Set<SessionTag> sessionTags) {
if (_log.shouldLog(Log.DEBUG)) {
//_log.debug("Tags delivered to set " + set + " on session " + sess);
if (!sessionTags.isEmpty())
_log.debug("Tags delivered: " + sessionTags.size() + " for key: " + key + ": " + sessionTags);
}
// if this is ever null, this is racy and needs synch
OutboundSession sess = getSession(target);
if (sess == null)
if (sess == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("No session for delivered TagSet to target: " + toString(target));
sess = createAndReturnSession(target, key);
else
} else {
sess.setCurrentKey(key);
TagSet set = new TagSet(sessionTags, key, _context.clock().now());
}
TagSet set = new TagSet(sessionTags, key, _context.clock().now(), _sentTagSetID.incrementAndGet());
sess.addTags(set);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Tags delivered: " + set +
" target: " + toString(target) /** + ": " + sessionTags */ );
return set;
}
@ -339,13 +374,19 @@ public class TransientSessionKeyManager extends SessionKeyManager {
@Override
public void failTags(PublicKey target, SessionKey key, TagSetHandle ts) {
OutboundSession sess = getSession(target);
if (sess == null)
if (sess == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("No session for failed TagSet: " + ts);
return;
if(!key.equals(sess.getCurrentKey()))
}
if(!key.equals(sess.getCurrentKey())) {
if (_log.shouldLog(Log.WARN))
_log.warn("Wrong session key (wanted " + sess.getCurrentKey() + ") for failed TagSet: " + ts);
return;
}
if (_log.shouldLog(Log.WARN))
_log.warn("TagSet failed: " + ts);
sess.failTags((TagSet)ts);
if (_log.shouldLog(Log.DEBUG))
_log.debug("TagSet failed: " + ts);
}
/**
@ -354,13 +395,19 @@ public class TransientSessionKeyManager extends SessionKeyManager {
@Override
public void tagsAcked(PublicKey target, SessionKey key, TagSetHandle ts) {
OutboundSession sess = getSession(target);
if (sess == null)
if (sess == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("No session for acked TagSet: " + ts);
return;
if(!key.equals(sess.getCurrentKey()))
}
if(!key.equals(sess.getCurrentKey())) {
if (_log.shouldLog(Log.WARN))
_log.warn("Wrong session key (wanted " + sess.getCurrentKey() + ") for acked TagSet: " + ts);
return;
sess.ackTags((TagSet)ts);
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("TagSet acked: " + ts);
sess.ackTags((TagSet)ts);
}
/**
@ -370,13 +417,15 @@ public class TransientSessionKeyManager extends SessionKeyManager {
@Override
public void tagsReceived(SessionKey key, Set<SessionTag> sessionTags) {
int overage = 0;
TagSet tagSet = new TagSet(sessionTags, key, _context.clock().now());
TagSet tagSet = new TagSet(sessionTags, key, _context.clock().now(), _rcvTagSetID.incrementAndGet());
if (_log.shouldLog(Log.INFO))
_log.info("Received " + tagSet);
TagSet old = null;
SessionTag dupTag = null;
for (Iterator<SessionTag> iter = sessionTags.iterator(); iter.hasNext();) {
SessionTag tag = iter.next();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receiving tag " + tag + " for key " + key + ": tagSet: " + tagSet);
_log.debug("Receiving tag " + tag + " in tagSet: " + tagSet);
synchronized (_inboundTagSets) {
old = _inboundTagSets.put(tag, tagSet);
overage = _inboundTagSets.size() - MAX_INBOUND_SESSION_TAGS;
@ -415,8 +464,8 @@ public class TransientSessionKeyManager extends SessionKeyManager {
if (overage > 0)
clearExcess(overage);
if ( (sessionTags.isEmpty()) && (_log.shouldLog(Log.DEBUG)) )
_log.debug("Received 0 tags for key " + key);
//if ( (sessionTags.isEmpty()) && (_log.shouldLog(Log.DEBUG)) )
// _log.debug("Received 0 tags for key " + key);
//if (false) aggressiveExpire();
}
@ -494,7 +543,7 @@ public class TransientSessionKeyManager extends SessionKeyManager {
SessionKey key = tagSet.getAssociatedKey();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Consuming IB " + tag + " for " + key + " on: " + tagSet);
_log.debug("IB Tag consumed: " + tag + " from: " + tagSet);
return key;
}
}
@ -607,7 +656,8 @@ public class TransientSessionKeyManager extends SessionKeyManager {
TagSet ts = siter.next();
int size = ts.getTags().size();
total += size;
buf.append("<li><b>Received:</b> ").append(DataHelper.formatDuration(now - ts.getDate())).append(" ago with ");
buf.append("<li><b>ID: ").append(ts.getID())
.append(" Received:</b> ").append(DataHelper.formatDuration2(now - ts.getDate())).append(" ago with ");
buf.append(size).append(" tags remaining</li>");
}
buf.append("</ul></td></tr>\n");
@ -615,7 +665,7 @@ public class TransientSessionKeyManager extends SessionKeyManager {
buf.setLength(0);
}
buf.append("<tr><th colspan=\"2\">Total tags: ").append(total).append(" (");
buf.append(DataHelper.formatSize(32*total)).append("B)</th></tr>\n" +
buf.append(DataHelper.formatSize2(32*total)).append("B)</th></tr>\n" +
"</table>" +
"<h2><b>Outbound sessions</b></h2>" +
"<table>");
@ -625,9 +675,10 @@ public class TransientSessionKeyManager extends SessionKeyManager {
OutboundSession sess = iter.next();
Set<TagSet> sets = new TreeSet(new TagSetComparator());
sets.addAll(sess.getTagSets());
buf.append("<tr><td><b>Target public key:</b> ").append(sess.getTarget().toBase64().substring(0, 20)).append("...<br>" +
"<b>Established:</b> ").append(DataHelper.formatDuration(now - sess.getEstablishedDate())).append(" ago<br>" +
"<b>Last Used:</b> ").append(DataHelper.formatDuration(now - sess.getLastUsedDate())).append(" ago<br>" +
buf.append("<tr><td><b>Target public key:</b> ").append(toString(sess.getTarget())).append("<br>" +
"<b>Established:</b> ").append(DataHelper.formatDuration2(now - sess.getEstablishedDate())).append(" ago<br>" +
"<b>Ack Received?</b> ").append(sess.getAckReceived()).append("<br>" +
"<b>Last Used:</b> ").append(DataHelper.formatDuration2(now - sess.getLastUsedDate())).append(" ago<br>" +
"<b>Session key:</b> ").append(sess.getCurrentKey().toBase64()).append("</td>" +
"<td><b># Sets:</b> ").append(sess.getTagSets().size()).append("</td></tr>" +
"<tr><td colspan=\"2\"><ul>");
@ -635,7 +686,8 @@ public class TransientSessionKeyManager extends SessionKeyManager {
TagSet ts = siter.next();
int size = ts.getTags().size();
total += size;
buf.append("<li><b>Sent:</b> ").append(DataHelper.formatDuration(now - ts.getDate())).append(" ago with ");
buf.append("<li><b>ID: ").append(ts.getID())
.append(" Sent:</b> ").append(DataHelper.formatDuration2(now - ts.getDate())).append(" ago with ");
buf.append(size).append(" tags remaining; acked? ").append(ts.getAcked()).append("</li>");
}
buf.append("</ul></td></tr>\n");
@ -643,12 +695,22 @@ public class TransientSessionKeyManager extends SessionKeyManager {
buf.setLength(0);
}
buf.append("<tr><th colspan=\"2\">Total tags: ").append(total).append(" (");
buf.append(DataHelper.formatSize(32*total)).append("B)</th></tr>\n" +
buf.append(DataHelper.formatSize2(32*total)).append("B)</th></tr>\n" +
"</table>");
out.write(buf.toString());
}
/**
* For debugging
* @since 0.9
*/
private static String toString(PublicKey target) {
if (target == null)
return "null";
return target.toBase64().substring(0, 20) + "...";
}
/**
* Just for the HTML method above so we can see what's going on easier
* Earliest first
@ -659,28 +721,51 @@ public class TransientSessionKeyManager extends SessionKeyManager {
}
}
/** fixme pass in context and change to static */
private class OutboundSession {
private PublicKey _target;
/**
* The state for a crypto session to a single public key
*/
private static class OutboundSession {
private final I2PAppContext _context;
private final Log _log;
private final PublicKey _target;
private SessionKey _currentKey;
private long _established;
private final long _established;
private long _lastUsed;
/** before the first ack, all tagsets go here. These are never expired, we rely
on the callers to call failTags() or ackTags() to remove them from this list. */
private /* FIXME final FIXME */ List<TagSet> _unackedTagSets;
/**
* Before the first ack, all tagsets go here. These are never expired, we rely
* on the callers to call failTags() or ackTags() to remove them from this list.
* Actually we now do a failsafe expire.
* Synch on _tagSets to access this.
*/
private final List<TagSet> _unackedTagSets;
/**
* As tagsets are acked, they go here.
* After the first ack, new tagsets go here (i.e. presumed acked)
*/
private /* FIXME final FIXME */ List<TagSet> _tagSets;
/** set to true after first tagset is acked */
private boolean _acked;
private final List<TagSet> _tagSets;
/**
* Set to true after first tagset is acked.
* Upon repeated failures, we may revert back to false.
* This prevents us getting "stuck" forever, using tags that weren't acked
* to deliver the next set of tags.
*/
private volatile boolean _acked;
/**
* Fail count
* Synch on _tagSets to access this.
*/
private int _consecutiveFailures;
public OutboundSession(PublicKey target) {
this(target, null, _context.clock().now(), _context.clock().now(), new ArrayList());
private static final int MAX_FAILS = 2;
public OutboundSession(I2PAppContext ctx, Log log, PublicKey target) {
this(ctx, log, target, null, ctx.clock().now(), ctx.clock().now(), new ArrayList());
}
OutboundSession(PublicKey target, SessionKey curKey, long established, long lastUsed, List<TagSet> tagSets) {
OutboundSession(I2PAppContext ctx, Log log, PublicKey target, SessionKey curKey,
long established, long lastUsed, List<TagSet> tagSets) {
_context = ctx;
_log = log;
_target = target;
_currentKey = curKey;
_established = established;
@ -711,9 +796,17 @@ public class TransientSessionKeyManager extends SessionKeyManager {
void ackTags(TagSet set) {
synchronized (_tagSets) {
if (_unackedTagSets.remove(set)) {
// we could perhaps use it even if not previuosly in unacked,
// i.e. it was expired already, but _tagSets is a list not a set...
_tagSets.add(set);
_acked = true;
} else if (_log.shouldLog(Log.WARN)) {
if(!_tagSets.contains(set))
_log.warn("Ack of unknown tagset: " + set);
else if (set.getAcked())
_log.warn("Dup ack of tagset: " + set);
}
_acked = true;
_consecutiveFailures = 0;
}
set.setAcked();
}
@ -722,7 +815,29 @@ public class TransientSessionKeyManager extends SessionKeyManager {
void failTags(TagSet set) {
synchronized (_tagSets) {
_unackedTagSets.remove(set);
_tagSets.remove(set);
if (_tagSets.remove(set)) {
if (++_consecutiveFailures >= MAX_FAILS) {
// revert back to non-speculative ack mode,
// and force full ElG next time by reclassifying all tagsets that weren't really acked
_acked = false;
int acked = 0;
int unacked = 0;
for (Iterator<TagSet> iter = _tagSets.iterator(); iter.hasNext(); ) {
TagSet ts = iter.next();
if (!ts.getAcked()) {
iter.remove();
_unackedTagSets.add(ts);
unacked++;
} else {
acked++;
}
}
if (_log.shouldLog(Log.WARN))
_log.warn(_consecutiveFailures + " consecutive failed tagset deliveries to " + _currentKey
+ ": reverting to full ElG and un-acking " + unacked + " unacked tag sets, with "
+ acked + " remaining acked tag sets");
}
}
}
}
@ -738,16 +853,18 @@ public class TransientSessionKeyManager extends SessionKeyManager {
_lastUsed = _context.clock().now();
if (_currentKey != null) {
if (!_currentKey.equals(key)) {
int dropped = 0;
List<TagSet> sets = _tagSets;
_tagSets = new ArrayList();
for (int i = 0; i < sets.size(); i++) {
TagSet set = (TagSet) sets.get(i);
dropped += set.getTags().size();
synchronized (_tagSets) {
if (_log.shouldLog(Log.WARN)) {
int dropped = 0;
for (TagSet set : _tagSets) {
dropped += set.getTags().size();
}
_log.warn("Rekeyed from " + _currentKey + " to " + key
+ ": dropping " + dropped + " session tags", new Exception());
}
_acked = false;
_tagSets.clear();
}
if (_log.shouldLog(Log.INFO))
_log.info("Rekeyed from " + _currentKey + " to " + key
+ ": dropping " + dropped + " session tags");
}
}
_currentKey = key;
@ -769,14 +886,23 @@ public class TransientSessionKeyManager extends SessionKeyManager {
long now = _context.clock().now();
int removed = 0;
synchronized (_tagSets) {
for (int i = 0; i < _tagSets.size(); i++) {
TagSet set = _tagSets.get(i);
for (Iterator<TagSet> iter = _tagSets.iterator(); iter.hasNext(); ) {
TagSet set = iter.next();
if (set.getDate() + SESSION_TAG_DURATION_MS <= now) {
_tagSets.remove(i);
i--;
iter.remove();
removed++;
}
}
// failsafe, sometimes these are sticking around, not sure why, so clean them periodically
if ((now & 0x0f) == 0) {
for (Iterator<TagSet> iter = _unackedTagSets.iterator(); iter.hasNext(); ) {
TagSet set = iter.next();
if (set.getDate() + SESSION_TAG_DURATION_MS <= now) {
iter.remove();
removed++;
}
}
}
}
return removed;
}
@ -789,10 +915,16 @@ public class TransientSessionKeyManager extends SessionKeyManager {
TagSet set = _tagSets.get(0);
if (set.getDate() + SESSION_TAG_DURATION_MS > now) {
SessionTag tag = set.consumeNext();
if (tag != null) return tag;
if (tag != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("OB Tag consumed: " + tag + " from: " + set);
return tag;
} else if (_log.shouldLog(Log.INFO)) {
_log.info("Removing empty " + set);
}
} else {
if (_log.shouldLog(Log.INFO))
_log.info("TagSet from " + new Date(set.getDate()) + " expired");
_log.info("Expired " + set);
}
_tagSets.remove(0);
}
@ -812,7 +944,8 @@ public class TransientSessionKeyManager extends SessionKeyManager {
// so tags are sent when the acked tags are below
// 30, 17, and 4.
if (!set.getAcked())
sz /= 3;
// round up so we don't report 0 when we have 1 or 2 remaining and get the session removed
sz = (sz + 2) / 3;
tags += sz;
}
}
@ -846,32 +979,36 @@ public class TransientSessionKeyManager extends SessionKeyManager {
*/
public void addTags(TagSet set) {
_lastUsed = _context.clock().now();
if (_acked) {
synchronized (_tagSets) {
synchronized (_tagSets) {
if (_acked)
_tagSets.add(set);
}
} else {
synchronized (_unackedTagSets) {
else
_unackedTagSets.add(set);
}
}
}
/** @since 0.9 for debugging */
public boolean getAckReceived() {
return _acked;
}
}
private static class TagSet implements TagSetHandle {
private Set<SessionTag> _sessionTags;
private SessionKey _key;
private long _date;
private final Set<SessionTag> _sessionTags;
private final SessionKey _key;
private final long _date;
private final int _id;
//private Exception _createdBy;
/** did we get an ack for this tagset? */
/** did we get an ack for this tagset? Only for outbound tagsets */
private boolean _acked;
public TagSet(Set<SessionTag> tags, SessionKey key, long date) {
public TagSet(Set<SessionTag> tags, SessionKey key, long date, int id) {
if (key == null) throw new IllegalArgumentException("Missing key");
if (tags == null) throw new IllegalArgumentException("Missing tags");
_sessionTags = tags;
_key = key;
_date = date;
_id = id;
//if (true) {
// long now = I2PAppContext.getGlobalContext().clock().now();
// _createdBy = new Exception("Created by: key=" + _key.toBase64() + " on "
@ -885,9 +1022,9 @@ public class TransientSessionKeyManager extends SessionKeyManager {
return _date;
}
void setDate(long when) {
_date = when;
}
//void setDate(long when) {
// _date = when;
//}
/** tags still available */
public Set<SessionTag> getTags() {
@ -898,15 +1035,24 @@ public class TransientSessionKeyManager extends SessionKeyManager {
return _key;
}
/**
* Caller must synch.
*/
public boolean contains(SessionTag tag) {
return _sessionTags.contains(tag);
}
/**
* Caller must synch.
*/
public void consume(SessionTag tag) {
_sessionTags.remove(tag);
}
/** let's do this without counting the elements first */
/**
* Let's do this without counting the elements first.
* Caller must synch.
*/
public SessionTag consumeNext() {
SessionTag first;
try {
@ -943,11 +1089,16 @@ public class TransientSessionKeyManager extends SessionKeyManager {
}
******/
/** @since 0.9 for debugging */
public int getID() {
return _id;
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder(256);
buf.append("TagSet established: ").append(new Date(_date));
buf.append(" Session key: ").append(_key.toBase64());
buf.append("TagSet #").append(_id).append(" created: ").append(new Date(_date));
buf.append(" Session key: ").append(_key);
buf.append(" Size: ").append(_sessionTags.size());
buf.append(" Acked? ").append(_acked);
return buf.toString();