diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java index 315e1fb2e5..1b8df44cc1 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java @@ -97,6 +97,7 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL } public void run() { + boolean closedCleanly = false; try { InputStream in = s.getInputStream(); OutputStream out = s.getOutputStream(); // = new BufferedOutputStream(s.getOutputStream(), NETWORK_BUFFER_SIZE); @@ -121,6 +122,7 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL i2ps.close(); t1.join(); t2.join(); + closedCleanly = true; } catch (InterruptedException ex) { if (_log.shouldLog(Log.ERROR)) _log.error("Interrupted", ex); @@ -133,14 +135,21 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL } finally { removeRef(); try { - if (s != null) s.close(); + if ( (s != null) && (!closedCleanly) ) + s.close(); + } catch (IOException ex) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Could not close java socket", ex); + } + try { if (i2ps != null) { - i2ps.close(); + if (!closedCleanly) + i2ps.close(); i2ps.setSocketErrorListener(null); } } catch (IOException ex) { if (_log.shouldLog(Log.ERROR)) - _log.error("Could not close socket", ex); + _log.error("Could not close I2PSocket", ex); } } } @@ -179,14 +188,12 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL } public void run() { + String from = i2ps.getThisDestination().calculateHash().toBase64().substring(0,6); + String to = i2ps.getPeerDestination().calculateHash().toBase64().substring(0,6); + if (_log.shouldLog(Log.DEBUG)) { - String from = i2ps.getThisDestination().calculateHash().toBase64().substring(0,6); - String to = i2ps.getPeerDestination().calculateHash().toBase64().substring(0,6); - _log.debug(direction + ": Forwarding between " - + from - + " and " - + to); + + from + " and " + to); } ByteArray ba = _cache.acquire(); @@ -214,6 +221,7 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL out.flush(); // make sure the data get though } } + out.flush(); } catch (SocketException ex) { // this *will* occur when the other threads closes the socket synchronized (finishLock) { @@ -235,6 +243,10 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL //else // _log.warn("You may ignore this", ex); } finally { + if (_log.shouldLog(Log.INFO)) { + _log.info(direction + ": done forwarding between " + + from + " and " + to); + } try { out.close(); in.close(); diff --git a/core/java/src/net/i2p/client/I2PSessionImpl2.java b/core/java/src/net/i2p/client/I2PSessionImpl2.java index aab11ad194..6bfe73fdca 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl2.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl2.java @@ -118,15 +118,23 @@ class I2PSessionImpl2 extends I2PSessionImpl { if ( (tagsSent == null) || (tagsSent.size() <= 0) ) { if (oldTags < 10) { sentTags = createNewTags(50); - //_log.error("** sendBestEffort only had " + oldTags + " adding 50"); - } else if (availTimeLeft < 30 * 1000) { - // if we have > 10 tags, but they expire in under 30 seconds, we want more + if (_log.shouldLog(Log.DEBUG)) + _log.debug("** sendBestEffort only had " + oldTags + " with " + availTimeLeft + ", adding 50"); + } else if (availTimeLeft < 2 * 60 * 1000) { + // if we have > 10 tags, but they expire in under 2 minutes, we want more sentTags = createNewTags(50); - if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Tags are almost expired, adding 50 new ones"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getPrefix() + "Tags expiring in " + availTimeLeft + ", adding 50 new ones"); //_log.error("** sendBestEffort available time left " + availTimeLeft); } else { - //_log.error("sendBestEffort old tags: " + oldTags + " available time left: " + availTimeLeft); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("sendBestEffort old tags: " + oldTags + " available time left: " + availTimeLeft); } + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("sendBestEffort is sending " + tagsSent.size() + " with " + availTimeLeft + + "ms left, " + oldTags + " tags known and " + + (tag == null ? "no tag" : " a valid tag")); } SessionKey newKey = null; @@ -184,7 +192,7 @@ class I2PSessionImpl2 extends I2PSessionImpl { long afterRemovingSync = _context.clock().now(); boolean found = state.received(MessageStatusMessage.STATUS_SEND_ACCEPTED); if (_log.shouldLog(Log.DEBUG)) - _log.debug(getPrefix() + "After waitFor sending state " + state.getMessageId().getMessageId() + _log.debug(getPrefix() + "After waitFor sending state " + state.getMessageId() + " / " + state.getNonce() + " found = " + found); if (found) { if (_log.shouldLog(Log.INFO)) @@ -210,7 +218,7 @@ class I2PSessionImpl2 extends I2PSessionImpl { Set sentTags = null; if (_context.sessionKeyManager().getAvailableTags(dest.getPublicKey(), key) < 10) { sentTags = createNewTags(50); - } else if (_context.sessionKeyManager().getAvailableTimeLeft(dest.getPublicKey(), key) < 30 * 1000) { + } else if (_context.sessionKeyManager().getAvailableTimeLeft(dest.getPublicKey(), key) < 2 * 60 * 1000) { // if we have > 10 tags, but they expire in under 30 seconds, we want more sentTags = createNewTags(50); if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Tags are almost expired, adding 50 new ones"); @@ -267,9 +275,10 @@ class I2PSessionImpl2 extends I2PSessionImpl { _sendingStates.remove(state); } long afterRemovingSync = _context.clock().now(); + boolean guaranteed = isGuaranteed(); boolean found = false; boolean accepted = state.received(MessageStatusMessage.STATUS_SEND_ACCEPTED); - if (isGuaranteed()) + if (guaranteed) found = state.received(MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS); else found = accepted; @@ -286,7 +295,8 @@ class I2PSessionImpl2 extends I2PSessionImpl { + ")"); //if (true) // throw new OutOfMemoryError("not really an OOM, but more of jr fucking shit up"); - nackTags(state); + if (guaranteed) + nackTags(state); return false; } @@ -294,19 +304,24 @@ class I2PSessionImpl2 extends I2PSessionImpl { _log.debug(getPrefix() + "After waitFor sending state " + state.getMessageId().getMessageId() + " / " + state.getNonce() + " found = " + found); - // WARNING: this will always be false for mode=BestEffort, even though the message may go - // through, causing every datagram to be ElGamal encrypted! - // TODO: Fix this to include support for acks received after the sendMessage completes - if (found) { - if (_log.shouldLog(Log.INFO)) - _log.info(getPrefix() + "Message sent after " + state.getElapsed() + "ms with " - + payload.length + " bytes"); - ackTags(state); + // the 'found' value is only useful for mode=Guaranteed, as mode=BestEffort + // doesn't block + if (guaranteed) { + if (found) { + if (_log.shouldLog(Log.INFO)) + _log.info(getPrefix() + "Message sent after " + state.getElapsed() + "ms with " + + payload.length + " bytes"); + ackTags(state); + } else { + if (_log.shouldLog(Log.INFO)) + _log.info(getPrefix() + "Message send failed after " + state.getElapsed() + "ms with " + + payload.length + " bytes"); + nackTags(state); + } } else { if (_log.shouldLog(Log.INFO)) - _log.info(getPrefix() + "Message send failed after " + state.getElapsed() + "ms with " + _log.info(getPrefix() + "Message send enqueued after " + state.getElapsed() + "ms with " + payload.length + " bytes"); - nackTags(state); } return found; } diff --git a/core/java/src/net/i2p/crypto/PersistentSessionKeyManager.java b/core/java/src/net/i2p/crypto/PersistentSessionKeyManager.java index 957fd186e3..b385e87af9 100644 --- a/core/java/src/net/i2p/crypto/PersistentSessionKeyManager.java +++ b/core/java/src/net/i2p/crypto/PersistentSessionKeyManager.java @@ -156,7 +156,7 @@ public class PersistentSessionKeyManager extends TransientSessionKeyManager { tag.setData(val); tags.add(tag); } - TagSet ts = new TagSet(tags, key); + TagSet ts = new TagSet(tags, key, _context.clock().now()); ts.setDate(date.getTime()); return ts; } diff --git a/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java b/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java index 5a44f2a19b..2222d74194 100644 --- a/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java +++ b/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java @@ -36,6 +36,7 @@ class TransientSessionKeyManager extends SessionKeyManager { private Log _log; private Map _outboundSessions; // PublicKey --> OutboundSession private Map _inboundTagSets; // SessionTag --> TagSet + protected I2PAppContext _context; /** * Let session tags sit around for 10 minutes before expiring them. We can now have such a large @@ -62,6 +63,7 @@ class TransientSessionKeyManager extends SessionKeyManager { public TransientSessionKeyManager(I2PAppContext context) { super(context); _log = context.logManager().getLog(TransientSessionKeyManager.class); + _context = context; _outboundSessions = new HashMap(64); _inboundTagSets = new HashMap(1024); } @@ -116,12 +118,14 @@ class TransientSessionKeyManager extends SessionKeyManager { public SessionKey getCurrentKey(PublicKey target) { OutboundSession sess = getSession(target); if (sess == null) return null; - long now = Clock.getInstance().now(); - if (sess.getEstablishedDate() < now - SESSION_LIFETIME_MAX_MS) { + long now = _context.clock().now(); + if (sess.getLastUsedDate() < now - SESSION_LIFETIME_MAX_MS) { if (_log.shouldLog(Log.INFO)) _log.info("Expiring old session key established on " + new Date(sess.getEstablishedDate()) - + " with target " + target); + + " but not used for " + + (now-sess.getLastUsedDate()) + + "ms with target " + target); return null; } return sess.getCurrentKey(); @@ -185,7 +189,11 @@ class TransientSessionKeyManager extends SessionKeyManager { OutboundSession sess = getSession(target); if (sess == null) { return 0; } if (sess.getCurrentKey().equals(key)) { - return (sess.getLastExpirationDate() + SESSION_TAG_DURATION_MS) - Clock.getInstance().now(); + long end = sess.getLastExpirationDate(); + if (end <= 0) + return 0; + else + return end - _context.clock().now(); } return 0; } @@ -203,7 +211,7 @@ class TransientSessionKeyManager extends SessionKeyManager { sess = getSession(target); } sess.setCurrentKey(key); - TagSet set = new TagSet(sessionTags, key); + TagSet set = new TagSet(sessionTags, key, _context.clock().now()); sess.addTags(set); if (_log.shouldLog(Log.DEBUG)) _log.debug("Tags delivered to set " + set + " on session " + sess); @@ -226,7 +234,7 @@ class TransientSessionKeyManager extends SessionKeyManager { * */ public void tagsReceived(SessionKey key, Set sessionTags) { - TagSet tagSet = new TagSet(sessionTags, key); + TagSet tagSet = new TagSet(sessionTags, key, _context.clock().now()); for (Iterator iter = sessionTags.iterator(); iter.hasNext();) { SessionTag tag = (SessionTag) iter.next(); if (_log.shouldLog(Log.DEBUG)) @@ -285,9 +293,14 @@ class TransientSessionKeyManager extends SessionKeyManager { private void removeSession(PublicKey target) { if (target == null) return; + OutboundSession session = null; synchronized (_outboundSessions) { - _outboundSessions.remove(target); + session = (OutboundSession)_outboundSessions.remove(target); } + if ( (session != null) && (_log.shouldLog(Log.WARN)) ) + _log.warn("Removing session tags with " + session.availableTags() + " available for " + + (session.getLastExpirationDate()-_context.clock().now()) + + "ms more", new Exception("Removed by")); } /** @@ -297,32 +310,47 @@ class TransientSessionKeyManager extends SessionKeyManager { */ public int aggressiveExpire() { int removed = 0; - long now = Clock.getInstance().now(); - Set tagsToDrop = new HashSet(64); + long now = _context.clock().now(); + Set tagsToDrop = null; // new HashSet(64); synchronized (_inboundTagSets) { for (Iterator iter = _inboundTagSets.keySet().iterator(); iter.hasNext();) { SessionTag tag = (SessionTag) iter.next(); TagSet ts = (TagSet) _inboundTagSets.get(tag); if (ts.getDate() < now - SESSION_LIFETIME_MAX_MS) { + if (tagsToDrop == null) + tagsToDrop = new HashSet(4); tagsToDrop.add(tag); } } - removed += tagsToDrop.size(); - for (Iterator iter = tagsToDrop.iterator(); iter.hasNext();) - _inboundTagSets.remove(iter.next()); + if (tagsToDrop != null) { + removed += tagsToDrop.size(); + for (Iterator iter = tagsToDrop.iterator(); iter.hasNext();) + _inboundTagSets.remove(iter.next()); + } } //_log.warn("Expiring tags: [" + tagsToDrop + "]"); synchronized (_outboundSessions) { - Set sessionsToDrop = new HashSet(64); + Set sessionsToDrop = null; for (Iterator iter = _outboundSessions.keySet().iterator(); iter.hasNext();) { PublicKey key = (PublicKey) iter.next(); OutboundSession sess = (OutboundSession) _outboundSessions.get(key); removed += sess.expireTags(); - if (sess.getTagSets().size() <= 0) sessionsToDrop.add(key); + if (sess.getTagSets().size() <= 0) { + if (sessionsToDrop == null) + sessionsToDrop = new HashSet(4); + sessionsToDrop.add(key); + } + } + if (sessionsToDrop != null) { + for (Iterator iter = sessionsToDrop.iterator(); iter.hasNext();) { + OutboundSession cur = (OutboundSession)_outboundSessions.remove(iter.next()); + if ( (cur != null) && (_log.shouldLog(Log.WARN)) ) + _log.warn("Removing session tags with " + cur.availableTags() + " available for " + + (cur.getLastExpirationDate()-_context.clock().now()) + + "ms more", new Exception("Removed by")); + } } - for (Iterator iter = sessionsToDrop.iterator(); iter.hasNext();) - _outboundSessions.remove(iter.next()); } return removed; } @@ -388,7 +416,7 @@ class TransientSessionKeyManager extends SessionKeyManager { private List _tagSets; public OutboundSession(PublicKey target) { - this(target, null, Clock.getInstance().now(), Clock.getInstance().now(), new ArrayList()); + this(target, null, _context.clock().now(), _context.clock().now(), new ArrayList()); } OutboundSession(PublicKey target, SessionKey curKey, long established, long lastUsed, List tagSets) { @@ -415,6 +443,7 @@ class TransientSessionKeyManager extends SessionKeyManager { } public void setCurrentKey(SessionKey key) { + _lastUsed = _context.clock().now(); if (_currentKey != null) { if (!_currentKey.equals(key)) { int dropped = 0; @@ -445,22 +474,24 @@ class TransientSessionKeyManager extends SessionKeyManager { * Expire old tags, returning the number of tag sets removed */ public int expireTags() { - long now = Clock.getInstance().now(); - Set toRemove = new HashSet(64); + long now = _context.clock().now(); + int removed = 0; synchronized (_tagSets) { for (int i = 0; i < _tagSets.size(); i++) { TagSet set = (TagSet) _tagSets.get(i); if (set.getDate() + SESSION_TAG_DURATION_MS <= now) { - toRemove.add(set); + _tagSets.remove(i); + i--; + removed++; } } - _tagSets.removeAll(toRemove); } - return toRemove.size(); + return removed; } public SessionTag consumeNext() { - long now = Clock.getInstance().now(); + long now = _context.clock().now(); + _lastUsed = now; synchronized (_tagSets) { while (_tagSets.size() > 0) { TagSet set = (TagSet) _tagSets.get(0); @@ -479,10 +510,12 @@ class TransientSessionKeyManager extends SessionKeyManager { public int availableTags() { int tags = 0; + long now = _context.clock().now(); synchronized (_tagSets) { for (int i = 0; i < _tagSets.size(); i++) { TagSet set = (TagSet) _tagSets.get(i); - tags += set.getTags().size(); + if (set.getDate() + SESSION_TAG_DURATION_MS > now) + tags += set.getTags().size(); } } return tags; @@ -498,13 +531,18 @@ class TransientSessionKeyManager extends SessionKeyManager { synchronized (_tagSets) { for (Iterator iter = _tagSets.iterator(); iter.hasNext();) { TagSet set = (TagSet) iter.next(); - if (set.getDate() > last) last = set.getDate(); + if ( (set.getDate() > last) && (set.getTags().size() > 0) ) + last = set.getDate(); } } - return last + SESSION_TAG_DURATION_MS; + if (last > 0) + return last + SESSION_TAG_DURATION_MS; + else + return -1; } public void addTags(TagSet set) { + _lastUsed = _context.clock().now(); synchronized (_tagSets) { _tagSets.add(set); } @@ -516,12 +554,12 @@ class TransientSessionKeyManager extends SessionKeyManager { private SessionKey _key; private long _date; - public TagSet(Set tags, SessionKey key) { + public TagSet(Set tags, SessionKey key, long date) { if (key == null) throw new IllegalArgumentException("Missing key"); if (tags == null) throw new IllegalArgumentException("Missing tags"); _sessionTags = tags; _key = key; - _date = Clock.getInstance().now(); + _date = date; } public long getDate() { diff --git a/history.txt b/history.txt index b5470d74cb..8cdc5b6d5d 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,11 @@ -$Id: history.txt,v 1.75 2004/11/17 14:42:53 jrandom Exp $ +$Id: history.txt,v 1.76 2004/11/19 18:04:27 jrandom Exp $ + +2004-11-21 jrandom + * Destroy ElGamal/AES+SessionTag keys after 15 minutes of inactivity + rather that every 15 minutes, and increase the warning period in which + we refresh tags from 30s to 2 minutes. + * Bugfix for a rare problem closing an I2PTunnel stream where we'd fail + to close the I2PSocket (leaving it to timeout). 2004-11-19 jrandom * Off-by-one fix to the tunnel pool management code, along side some diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 41d14e53eb..d88cdf641a 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.80 $ $Date: 2004/11/17 14:42:53 $"; + public final static String ID = "$Revision: 1.81 $ $Date: 2004/11/19 18:04:27 $"; public final static String VERSION = "0.4.1.4"; - public final static long BUILD = 9; + public final static long BUILD = 10; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID);