forked from I2P_Developers/i2p.i2p
2004-11-21 jrandom
* Destroy ElGamal/AES+SessionTag keys after 15 minutes of inactivity rather that every 15 minutes, and increase the warning period in which we refresh tags from 30s to 2 minutes. * Bugfix for a rare problem closing an I2PTunnel stream where we'd fail to close the I2PSocket (leaving it to timeout).
This commit is contained in:
@ -118,15 +118,23 @@ class I2PSessionImpl2 extends I2PSessionImpl {
|
||||
if ( (tagsSent == null) || (tagsSent.size() <= 0) ) {
|
||||
if (oldTags < 10) {
|
||||
sentTags = createNewTags(50);
|
||||
//_log.error("** sendBestEffort only had " + oldTags + " adding 50");
|
||||
} else if (availTimeLeft < 30 * 1000) {
|
||||
// if we have > 10 tags, but they expire in under 30 seconds, we want more
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("** sendBestEffort only had " + oldTags + " with " + availTimeLeft + ", adding 50");
|
||||
} else if (availTimeLeft < 2 * 60 * 1000) {
|
||||
// if we have > 10 tags, but they expire in under 2 minutes, we want more
|
||||
sentTags = createNewTags(50);
|
||||
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Tags are almost expired, adding 50 new ones");
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(getPrefix() + "Tags expiring in " + availTimeLeft + ", adding 50 new ones");
|
||||
//_log.error("** sendBestEffort available time left " + availTimeLeft);
|
||||
} else {
|
||||
//_log.error("sendBestEffort old tags: " + oldTags + " available time left: " + availTimeLeft);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("sendBestEffort old tags: " + oldTags + " available time left: " + availTimeLeft);
|
||||
}
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("sendBestEffort is sending " + tagsSent.size() + " with " + availTimeLeft
|
||||
+ "ms left, " + oldTags + " tags known and "
|
||||
+ (tag == null ? "no tag" : " a valid tag"));
|
||||
}
|
||||
|
||||
SessionKey newKey = null;
|
||||
@ -184,7 +192,7 @@ class I2PSessionImpl2 extends I2PSessionImpl {
|
||||
long afterRemovingSync = _context.clock().now();
|
||||
boolean found = state.received(MessageStatusMessage.STATUS_SEND_ACCEPTED);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(getPrefix() + "After waitFor sending state " + state.getMessageId().getMessageId()
|
||||
_log.debug(getPrefix() + "After waitFor sending state " + state.getMessageId()
|
||||
+ " / " + state.getNonce() + " found = " + found);
|
||||
if (found) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
@ -210,7 +218,7 @@ class I2PSessionImpl2 extends I2PSessionImpl {
|
||||
Set sentTags = null;
|
||||
if (_context.sessionKeyManager().getAvailableTags(dest.getPublicKey(), key) < 10) {
|
||||
sentTags = createNewTags(50);
|
||||
} else if (_context.sessionKeyManager().getAvailableTimeLeft(dest.getPublicKey(), key) < 30 * 1000) {
|
||||
} else if (_context.sessionKeyManager().getAvailableTimeLeft(dest.getPublicKey(), key) < 2 * 60 * 1000) {
|
||||
// if we have > 10 tags, but they expire in under 30 seconds, we want more
|
||||
sentTags = createNewTags(50);
|
||||
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Tags are almost expired, adding 50 new ones");
|
||||
@ -267,9 +275,10 @@ class I2PSessionImpl2 extends I2PSessionImpl {
|
||||
_sendingStates.remove(state);
|
||||
}
|
||||
long afterRemovingSync = _context.clock().now();
|
||||
boolean guaranteed = isGuaranteed();
|
||||
boolean found = false;
|
||||
boolean accepted = state.received(MessageStatusMessage.STATUS_SEND_ACCEPTED);
|
||||
if (isGuaranteed())
|
||||
if (guaranteed)
|
||||
found = state.received(MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS);
|
||||
else
|
||||
found = accepted;
|
||||
@ -286,7 +295,8 @@ class I2PSessionImpl2 extends I2PSessionImpl {
|
||||
+ ")");
|
||||
//if (true)
|
||||
// throw new OutOfMemoryError("not really an OOM, but more of jr fucking shit up");
|
||||
nackTags(state);
|
||||
if (guaranteed)
|
||||
nackTags(state);
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -294,19 +304,24 @@ class I2PSessionImpl2 extends I2PSessionImpl {
|
||||
_log.debug(getPrefix() + "After waitFor sending state " + state.getMessageId().getMessageId()
|
||||
+ " / " + state.getNonce() + " found = " + found);
|
||||
|
||||
// WARNING: this will always be false for mode=BestEffort, even though the message may go
|
||||
// through, causing every datagram to be ElGamal encrypted!
|
||||
// TODO: Fix this to include support for acks received after the sendMessage completes
|
||||
if (found) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(getPrefix() + "Message sent after " + state.getElapsed() + "ms with "
|
||||
+ payload.length + " bytes");
|
||||
ackTags(state);
|
||||
// the 'found' value is only useful for mode=Guaranteed, as mode=BestEffort
|
||||
// doesn't block
|
||||
if (guaranteed) {
|
||||
if (found) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(getPrefix() + "Message sent after " + state.getElapsed() + "ms with "
|
||||
+ payload.length + " bytes");
|
||||
ackTags(state);
|
||||
} else {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(getPrefix() + "Message send failed after " + state.getElapsed() + "ms with "
|
||||
+ payload.length + " bytes");
|
||||
nackTags(state);
|
||||
}
|
||||
} else {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(getPrefix() + "Message send failed after " + state.getElapsed() + "ms with "
|
||||
_log.info(getPrefix() + "Message send enqueued after " + state.getElapsed() + "ms with "
|
||||
+ payload.length + " bytes");
|
||||
nackTags(state);
|
||||
}
|
||||
return found;
|
||||
}
|
||||
|
@ -156,7 +156,7 @@ public class PersistentSessionKeyManager extends TransientSessionKeyManager {
|
||||
tag.setData(val);
|
||||
tags.add(tag);
|
||||
}
|
||||
TagSet ts = new TagSet(tags, key);
|
||||
TagSet ts = new TagSet(tags, key, _context.clock().now());
|
||||
ts.setDate(date.getTime());
|
||||
return ts;
|
||||
}
|
||||
|
@ -36,6 +36,7 @@ class TransientSessionKeyManager extends SessionKeyManager {
|
||||
private Log _log;
|
||||
private Map _outboundSessions; // PublicKey --> OutboundSession
|
||||
private Map _inboundTagSets; // SessionTag --> TagSet
|
||||
protected I2PAppContext _context;
|
||||
|
||||
/**
|
||||
* Let session tags sit around for 10 minutes before expiring them. We can now have such a large
|
||||
@ -62,6 +63,7 @@ class TransientSessionKeyManager extends SessionKeyManager {
|
||||
public TransientSessionKeyManager(I2PAppContext context) {
|
||||
super(context);
|
||||
_log = context.logManager().getLog(TransientSessionKeyManager.class);
|
||||
_context = context;
|
||||
_outboundSessions = new HashMap(64);
|
||||
_inboundTagSets = new HashMap(1024);
|
||||
}
|
||||
@ -116,12 +118,14 @@ class TransientSessionKeyManager extends SessionKeyManager {
|
||||
public SessionKey getCurrentKey(PublicKey target) {
|
||||
OutboundSession sess = getSession(target);
|
||||
if (sess == null) return null;
|
||||
long now = Clock.getInstance().now();
|
||||
if (sess.getEstablishedDate() < now - SESSION_LIFETIME_MAX_MS) {
|
||||
long now = _context.clock().now();
|
||||
if (sess.getLastUsedDate() < now - SESSION_LIFETIME_MAX_MS) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Expiring old session key established on "
|
||||
+ new Date(sess.getEstablishedDate())
|
||||
+ " with target " + target);
|
||||
+ " but not used for "
|
||||
+ (now-sess.getLastUsedDate())
|
||||
+ "ms with target " + target);
|
||||
return null;
|
||||
}
|
||||
return sess.getCurrentKey();
|
||||
@ -185,7 +189,11 @@ class TransientSessionKeyManager extends SessionKeyManager {
|
||||
OutboundSession sess = getSession(target);
|
||||
if (sess == null) { return 0; }
|
||||
if (sess.getCurrentKey().equals(key)) {
|
||||
return (sess.getLastExpirationDate() + SESSION_TAG_DURATION_MS) - Clock.getInstance().now();
|
||||
long end = sess.getLastExpirationDate();
|
||||
if (end <= 0)
|
||||
return 0;
|
||||
else
|
||||
return end - _context.clock().now();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
@ -203,7 +211,7 @@ class TransientSessionKeyManager extends SessionKeyManager {
|
||||
sess = getSession(target);
|
||||
}
|
||||
sess.setCurrentKey(key);
|
||||
TagSet set = new TagSet(sessionTags, key);
|
||||
TagSet set = new TagSet(sessionTags, key, _context.clock().now());
|
||||
sess.addTags(set);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Tags delivered to set " + set + " on session " + sess);
|
||||
@ -226,7 +234,7 @@ class TransientSessionKeyManager extends SessionKeyManager {
|
||||
*
|
||||
*/
|
||||
public void tagsReceived(SessionKey key, Set sessionTags) {
|
||||
TagSet tagSet = new TagSet(sessionTags, key);
|
||||
TagSet tagSet = new TagSet(sessionTags, key, _context.clock().now());
|
||||
for (Iterator iter = sessionTags.iterator(); iter.hasNext();) {
|
||||
SessionTag tag = (SessionTag) iter.next();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
@ -285,9 +293,14 @@ class TransientSessionKeyManager extends SessionKeyManager {
|
||||
|
||||
private void removeSession(PublicKey target) {
|
||||
if (target == null) return;
|
||||
OutboundSession session = null;
|
||||
synchronized (_outboundSessions) {
|
||||
_outboundSessions.remove(target);
|
||||
session = (OutboundSession)_outboundSessions.remove(target);
|
||||
}
|
||||
if ( (session != null) && (_log.shouldLog(Log.WARN)) )
|
||||
_log.warn("Removing session tags with " + session.availableTags() + " available for "
|
||||
+ (session.getLastExpirationDate()-_context.clock().now())
|
||||
+ "ms more", new Exception("Removed by"));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -297,32 +310,47 @@ class TransientSessionKeyManager extends SessionKeyManager {
|
||||
*/
|
||||
public int aggressiveExpire() {
|
||||
int removed = 0;
|
||||
long now = Clock.getInstance().now();
|
||||
Set tagsToDrop = new HashSet(64);
|
||||
long now = _context.clock().now();
|
||||
Set tagsToDrop = null; // new HashSet(64);
|
||||
synchronized (_inboundTagSets) {
|
||||
for (Iterator iter = _inboundTagSets.keySet().iterator(); iter.hasNext();) {
|
||||
SessionTag tag = (SessionTag) iter.next();
|
||||
TagSet ts = (TagSet) _inboundTagSets.get(tag);
|
||||
if (ts.getDate() < now - SESSION_LIFETIME_MAX_MS) {
|
||||
if (tagsToDrop == null)
|
||||
tagsToDrop = new HashSet(4);
|
||||
tagsToDrop.add(tag);
|
||||
}
|
||||
}
|
||||
removed += tagsToDrop.size();
|
||||
for (Iterator iter = tagsToDrop.iterator(); iter.hasNext();)
|
||||
_inboundTagSets.remove(iter.next());
|
||||
if (tagsToDrop != null) {
|
||||
removed += tagsToDrop.size();
|
||||
for (Iterator iter = tagsToDrop.iterator(); iter.hasNext();)
|
||||
_inboundTagSets.remove(iter.next());
|
||||
}
|
||||
}
|
||||
//_log.warn("Expiring tags: [" + tagsToDrop + "]");
|
||||
|
||||
synchronized (_outboundSessions) {
|
||||
Set sessionsToDrop = new HashSet(64);
|
||||
Set sessionsToDrop = null;
|
||||
for (Iterator iter = _outboundSessions.keySet().iterator(); iter.hasNext();) {
|
||||
PublicKey key = (PublicKey) iter.next();
|
||||
OutboundSession sess = (OutboundSession) _outboundSessions.get(key);
|
||||
removed += sess.expireTags();
|
||||
if (sess.getTagSets().size() <= 0) sessionsToDrop.add(key);
|
||||
if (sess.getTagSets().size() <= 0) {
|
||||
if (sessionsToDrop == null)
|
||||
sessionsToDrop = new HashSet(4);
|
||||
sessionsToDrop.add(key);
|
||||
}
|
||||
}
|
||||
if (sessionsToDrop != null) {
|
||||
for (Iterator iter = sessionsToDrop.iterator(); iter.hasNext();) {
|
||||
OutboundSession cur = (OutboundSession)_outboundSessions.remove(iter.next());
|
||||
if ( (cur != null) && (_log.shouldLog(Log.WARN)) )
|
||||
_log.warn("Removing session tags with " + cur.availableTags() + " available for "
|
||||
+ (cur.getLastExpirationDate()-_context.clock().now())
|
||||
+ "ms more", new Exception("Removed by"));
|
||||
}
|
||||
}
|
||||
for (Iterator iter = sessionsToDrop.iterator(); iter.hasNext();)
|
||||
_outboundSessions.remove(iter.next());
|
||||
}
|
||||
return removed;
|
||||
}
|
||||
@ -388,7 +416,7 @@ class TransientSessionKeyManager extends SessionKeyManager {
|
||||
private List _tagSets;
|
||||
|
||||
public OutboundSession(PublicKey target) {
|
||||
this(target, null, Clock.getInstance().now(), Clock.getInstance().now(), new ArrayList());
|
||||
this(target, null, _context.clock().now(), _context.clock().now(), new ArrayList());
|
||||
}
|
||||
|
||||
OutboundSession(PublicKey target, SessionKey curKey, long established, long lastUsed, List tagSets) {
|
||||
@ -415,6 +443,7 @@ class TransientSessionKeyManager extends SessionKeyManager {
|
||||
}
|
||||
|
||||
public void setCurrentKey(SessionKey key) {
|
||||
_lastUsed = _context.clock().now();
|
||||
if (_currentKey != null) {
|
||||
if (!_currentKey.equals(key)) {
|
||||
int dropped = 0;
|
||||
@ -445,22 +474,24 @@ class TransientSessionKeyManager extends SessionKeyManager {
|
||||
* Expire old tags, returning the number of tag sets removed
|
||||
*/
|
||||
public int expireTags() {
|
||||
long now = Clock.getInstance().now();
|
||||
Set toRemove = new HashSet(64);
|
||||
long now = _context.clock().now();
|
||||
int removed = 0;
|
||||
synchronized (_tagSets) {
|
||||
for (int i = 0; i < _tagSets.size(); i++) {
|
||||
TagSet set = (TagSet) _tagSets.get(i);
|
||||
if (set.getDate() + SESSION_TAG_DURATION_MS <= now) {
|
||||
toRemove.add(set);
|
||||
_tagSets.remove(i);
|
||||
i--;
|
||||
removed++;
|
||||
}
|
||||
}
|
||||
_tagSets.removeAll(toRemove);
|
||||
}
|
||||
return toRemove.size();
|
||||
return removed;
|
||||
}
|
||||
|
||||
public SessionTag consumeNext() {
|
||||
long now = Clock.getInstance().now();
|
||||
long now = _context.clock().now();
|
||||
_lastUsed = now;
|
||||
synchronized (_tagSets) {
|
||||
while (_tagSets.size() > 0) {
|
||||
TagSet set = (TagSet) _tagSets.get(0);
|
||||
@ -479,10 +510,12 @@ class TransientSessionKeyManager extends SessionKeyManager {
|
||||
|
||||
public int availableTags() {
|
||||
int tags = 0;
|
||||
long now = _context.clock().now();
|
||||
synchronized (_tagSets) {
|
||||
for (int i = 0; i < _tagSets.size(); i++) {
|
||||
TagSet set = (TagSet) _tagSets.get(i);
|
||||
tags += set.getTags().size();
|
||||
if (set.getDate() + SESSION_TAG_DURATION_MS > now)
|
||||
tags += set.getTags().size();
|
||||
}
|
||||
}
|
||||
return tags;
|
||||
@ -498,13 +531,18 @@ class TransientSessionKeyManager extends SessionKeyManager {
|
||||
synchronized (_tagSets) {
|
||||
for (Iterator iter = _tagSets.iterator(); iter.hasNext();) {
|
||||
TagSet set = (TagSet) iter.next();
|
||||
if (set.getDate() > last) last = set.getDate();
|
||||
if ( (set.getDate() > last) && (set.getTags().size() > 0) )
|
||||
last = set.getDate();
|
||||
}
|
||||
}
|
||||
return last + SESSION_TAG_DURATION_MS;
|
||||
if (last > 0)
|
||||
return last + SESSION_TAG_DURATION_MS;
|
||||
else
|
||||
return -1;
|
||||
}
|
||||
|
||||
public void addTags(TagSet set) {
|
||||
_lastUsed = _context.clock().now();
|
||||
synchronized (_tagSets) {
|
||||
_tagSets.add(set);
|
||||
}
|
||||
@ -516,12 +554,12 @@ class TransientSessionKeyManager extends SessionKeyManager {
|
||||
private SessionKey _key;
|
||||
private long _date;
|
||||
|
||||
public TagSet(Set tags, SessionKey key) {
|
||||
public TagSet(Set tags, SessionKey key, long date) {
|
||||
if (key == null) throw new IllegalArgumentException("Missing key");
|
||||
if (tags == null) throw new IllegalArgumentException("Missing tags");
|
||||
_sessionTags = tags;
|
||||
_key = key;
|
||||
_date = Clock.getInstance().now();
|
||||
_date = date;
|
||||
}
|
||||
|
||||
public long getDate() {
|
||||
|
Reference in New Issue
Block a user