diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index 5d8332e9f..7c4e34d73 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -134,6 +134,29 @@ public class ConnectionPacketHandler { con.getOptions().setWindowSize(newWindowSize); con.setCongestionWindowEnd(newWindowSize + lowest); } + } else { + // received a message that doesn't contain a new ack + + // ehh. cant do this, as we SACK and the acks may be + // received out of order: + // Alice: RECEIVE 2 + // Alice: SEND ack 2 nack 1 + // Alice: RECEIVE 1 + // Alice: SEND ack 2 + // Bob: RECEIVE ack 2 + // Bob: RECEIVE ack 2 nack 1 <-- NOT bad + + /* + if (con.getUnackedPacketsSent() > 0) { + // peer got a dup + int oldSize = con.getOptions().getWindowSize(); + oldSize >>>= 1; + if (oldSize <= 0) + oldSize = 1; + con.getOptions().setWindowSize(oldSize); + return false; + } + */ } } return false; diff --git a/core/java/src/net/i2p/crypto/AESEngine.java b/core/java/src/net/i2p/crypto/AESEngine.java index 3b6b4e9ee..4f56b1ccd 100644 --- a/core/java/src/net/i2p/crypto/AESEngine.java +++ b/core/java/src/net/i2p/crypto/AESEngine.java @@ -56,11 +56,14 @@ public class AESEngine { int padding = ElGamalAESEngine.getPaddingSize(size, paddedSize); byte data[] = new byte[size + padding]; - Hash h = _context.sha().calculateHash(iv); + SHA256EntryCache.CacheEntry cache = _context.sha().cache().acquire(iv.length); + Hash h = _context.sha().calculateHash(iv, cache); int cur = 0; System.arraycopy(h.getData(), 0, data, cur, Hash.HASH_LENGTH); cur += Hash.HASH_LENGTH; + _context.sha().cache().release(cache); + DataHelper.toLong(data, cur, 4, payload.length); cur += 4; System.arraycopy(payload, 0, data, cur, payload.length); @@ -83,15 +86,18 @@ public class AESEngine { } int cur = 0; - byte h[] = _context.sha().calculateHash(iv).getData(); + SHA256EntryCache.CacheEntry cache = _context.sha().cache().acquire(iv.length); + byte h[] = _context.sha().calculateHash(iv, cache).getData(); for (int i = 0; i < Hash.HASH_LENGTH; i++) { if (decr[i] != h[i]) { _log.error("Hash does not match [key=" + sessionKey + " / iv =" + DataHelper.toString(iv, iv.length) + "]", new Exception("Hash error")); + _context.sha().cache().release(cache); return null; } } cur += Hash.HASH_LENGTH; + _context.sha().cache().release(cache); long len = DataHelper.fromLong(decr, cur, 4); cur += 4; diff --git a/core/java/src/net/i2p/crypto/CryptixAESKeyCache.java b/core/java/src/net/i2p/crypto/CryptixAESKeyCache.java index 780360ed1..c41c72092 100644 --- a/core/java/src/net/i2p/crypto/CryptixAESKeyCache.java +++ b/core/java/src/net/i2p/crypto/CryptixAESKeyCache.java @@ -19,11 +19,10 @@ final class CryptixAESKeyCache { private static final int BC = BLOCKSIZE / 4; private static final int KC = KEYSIZE / 4; + private static final int MAX_KEYS = 64; + public CryptixAESKeyCache() { - _availableKeys = new ArrayList(64); - for (int i = 0; i < 64; i++) { - _availableKeys.add(createNew()); - } + _availableKeys = new ArrayList(MAX_KEYS); } /** @@ -44,7 +43,8 @@ final class CryptixAESKeyCache { */ public final void releaseKey(KeyCacheEntry key) { synchronized (_availableKeys) { - _availableKeys.add(key); + if (_availableKeys.size() < MAX_KEYS) + _availableKeys.add(key); } } diff --git a/core/java/src/net/i2p/crypto/DSAEngine.java b/core/java/src/net/i2p/crypto/DSAEngine.java index 7a1f95ecb..d39a89308 100644 --- a/core/java/src/net/i2p/crypto/DSAEngine.java +++ b/core/java/src/net/i2p/crypto/DSAEngine.java @@ -30,6 +30,7 @@ package net.i2p.crypto; */ import java.math.BigInteger; +import java.util.Arrays; import net.i2p.I2PAppContext; import net.i2p.data.Hash; @@ -201,6 +202,8 @@ public class DSAEngine { H[x] = H0[x]; } int blocks = M0.length / 16; + + int[] W = new int[80]; for (int bl = 0; bl < blocks; bl++) { int a = H[0]; int b = H[1]; @@ -208,8 +211,8 @@ public class DSAEngine { int d = H[3]; int e = H[4]; - int[] W = new int[80]; - + Arrays.fill(W, 0); + for (x = 0; x < 80; x++) { if (x < 16) { W[x] = M0[bl * 16 + x]; diff --git a/core/java/src/net/i2p/crypto/ElGamalAESEngine.java b/core/java/src/net/i2p/crypto/ElGamalAESEngine.java index dc6f19a95..085b91798 100644 --- a/core/java/src/net/i2p/crypto/ElGamalAESEngine.java +++ b/core/java/src/net/i2p/crypto/ElGamalAESEngine.java @@ -66,11 +66,11 @@ public class ElGamalAESEngine { */ public byte[] decrypt(byte data[], PrivateKey targetPrivateKey) throws DataFormatException { if (data == null) { - if (_log.shouldLog(Log.WARN)) _log.warn("Null data being decrypted?"); + if (_log.shouldLog(Log.ERROR)) _log.error("Null data being decrypted?"); return null; } else if (data.length < MIN_ENCRYPTED_SIZE) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Data is less than the minimum size (" + data.length + " < " + MIN_ENCRYPTED_SIZE + ")"); + if (_log.shouldLog(Log.ERROR)) + _log.error("Data is less than the minimum size (" + data.length + " < " + MIN_ENCRYPTED_SIZE + ")"); return null; } @@ -162,9 +162,11 @@ public class ElGamalAESEngine { //_log.debug("Pre IV for decryptNewSession: " + DataHelper.toString(preIV, 32)); //_log.debug("SessionKey for decryptNewSession: " + DataHelper.toString(key.getData(), 32)); - Hash ivHash = _context.sha().calculateHash(preIV); + SHA256EntryCache.CacheEntry cache = _context.sha().cache().acquire(preIV.length); + Hash ivHash = _context.sha().calculateHash(preIV, cache); byte iv[] = new byte[16]; System.arraycopy(ivHash.getData(), 0, iv, 0, 16); + _context.sha().cache().release(cache); byte aesDecr[] = decryptAESBlock(data, 514, data.length-514, usedKey, iv, null, foundTags, foundKey); @@ -196,9 +198,11 @@ public class ElGamalAESEngine { SessionKey usedKey, SessionKey foundKey) throws DataFormatException { byte preIV[] = new byte[32]; System.arraycopy(data, 0, preIV, 0, preIV.length); - Hash ivHash = _context.sha().calculateHash(preIV); + SHA256EntryCache.CacheEntry cache = _context.sha().cache().acquire(32); + Hash ivHash = _context.sha().calculateHash(preIV, cache); byte iv[] = new byte[16]; System.arraycopy(ivHash.getData(), 0, iv, 0, 16); + _context.sha().cache().release(cache); usedKey.setData(key.getData()); @@ -252,39 +256,48 @@ public class ElGamalAESEngine { Hash readHash = null; List tags = new ArrayList(); - ByteArrayInputStream bais = new ByteArrayInputStream(decrypted); - long numTags = DataHelper.readLong(bais, 2); + //ByteArrayInputStream bais = new ByteArrayInputStream(decrypted); + int cur = 0; + long numTags = DataHelper.fromLong(decrypted, cur, 2); + cur += 2; //_log.debug("# tags: " + numTags); if ((numTags < 0) || (numTags > 65535)) throw new Exception("Invalid number of session tags"); + if (numTags * SessionTag.BYTE_LENGTH > decrypted.length - 2) { + throw new Exception("# tags: " + numTags + " is too many for " + (decrypted.length - 2)); + } for (int i = 0; i < numTags; i++) { - byte tag[] = new byte[32]; - int read = bais.read(tag); - if (read != 32) - throw new Exception("Invalid session tag - # tags: " + numTags + " curTag #: " + i + " read: " - + read); + byte tag[] = new byte[SessionTag.BYTE_LENGTH]; + System.arraycopy(decrypted, cur, tag, 0, SessionTag.BYTE_LENGTH); + cur += SessionTag.BYTE_LENGTH; tags.add(new SessionTag(tag)); } - long len = DataHelper.readLong(bais, 4); + long len = DataHelper.fromLong(decrypted, cur, 4); + cur += 4; //_log.debug("len: " + len); - if ((len < 0) || (len > encryptedLen)) throw new Exception("Invalid size of payload"); - byte hashval[] = new byte[32]; - int read = bais.read(hashval); - if (read != hashval.length) throw new Exception("Invalid size of hash"); + if ((len < 0) || (len > decrypted.length - cur - Hash.HASH_LENGTH - 1)) + throw new Exception("Invalid size of payload (" + len + ", remaining " + (decrypted.length-cur) +")"); + byte hashval[] = new byte[Hash.HASH_LENGTH]; + System.arraycopy(decrypted, cur, hashval, 0, Hash.HASH_LENGTH); + cur += Hash.HASH_LENGTH; readHash = new Hash(); readHash.setData(hashval); - byte flag = (byte) bais.read(); + byte flag = decrypted[cur++]; if (flag == 0x01) { - byte rekeyVal[] = new byte[32]; - read = bais.read(rekeyVal); - if (read != rekeyVal.length) throw new Exception("Invalid size of the rekeyed session key"); + byte rekeyVal[] = new byte[SessionKey.KEYSIZE_BYTES]; + System.arraycopy(decrypted, cur, rekeyVal, 0, SessionKey.KEYSIZE_BYTES); + cur += SessionKey.KEYSIZE_BYTES; newKey = new SessionKey(); newKey.setData(rekeyVal); } byte unencrData[] = new byte[(int) len]; - read = bais.read(unencrData); - if (read != unencrData.length) throw new Exception("Invalid size of the data read"); - Hash calcHash = _context.sha().calculateHash(unencrData); - if (calcHash.equals(readHash)) { + System.arraycopy(decrypted, cur, unencrData, 0, (int)len); + cur += len; + SHA256EntryCache.CacheEntry cache = _context.sha().cache().acquire(unencrData.length); + Hash calcHash = _context.sha().calculateHash(unencrData, cache); + boolean eq = calcHash.equals(readHash); + _context.sha().cache().release(cache); + + if (eq) { // everything matches. w00t. foundTags.addAll(tags); if (newKey != null) foundKey.setData(newKey.getData()); @@ -391,9 +404,11 @@ public class ElGamalAESEngine { } //_log.debug("ElGamal encrypted length: " + elgEncr.length + " elGamal source length: " + elgSrc.toByteArray().length); - Hash ivHash = _context.sha().calculateHash(preIV); + SHA256EntryCache.CacheEntry cache = _context.sha().cache().acquire(preIV.length); + Hash ivHash = _context.sha().calculateHash(preIV, cache); byte iv[] = new byte[16]; System.arraycopy(ivHash.getData(), 0, iv, 0, 16); + _context.sha().cache().release(cache); byte aesEncr[] = encryptAESBlock(data, key, iv, tagsForDelivery, newKey, paddedSize); //_log.debug("AES encrypted length: " + aesEncr.length); @@ -427,9 +442,11 @@ public class ElGamalAESEngine { //_log.debug("Pre IV for encryptExistingSession (aka tag): " + currentTag.toString()); //_log.debug("SessionKey for encryptNewSession: " + DataHelper.toString(key.getData(), 32)); - Hash ivHash = _context.sha().calculateHash(rawTag); + SHA256EntryCache.CacheEntry cache = _context.sha().cache().acquire(rawTag.length); + Hash ivHash = _context.sha().calculateHash(rawTag, cache); byte iv[] = new byte[16]; System.arraycopy(ivHash.getData(), 0, iv, 0, 16); + _context.sha().cache().release(cache); byte aesEncr[] = encryptAESBlock(data, key, iv, tagsForDelivery, newKey, paddedSize, SessionTag.BYTE_LENGTH); // that prepended SessionTag.BYTE_LENGTH bytes at the beginning of the buffer @@ -484,9 +501,11 @@ public class ElGamalAESEngine { DataHelper.toLong(aesData, cur, 4, data.length); cur += 4; //_log.debug("data length: " + data.length); - Hash hash = _context.sha().calculateHash(data); + SHA256EntryCache.CacheEntry cache = _context.sha().cache().acquire(data.length); + Hash hash = _context.sha().calculateHash(data, cache); System.arraycopy(hash.getData(), 0, aesData, cur, Hash.HASH_LENGTH); cur += Hash.HASH_LENGTH; + _context.sha().cache().release(cache); //_log.debug("hash of data: " + DataHelper.toString(hash.getData(), 32)); if (newKey == null) { @@ -536,4 +555,33 @@ public class ElGamalAESEngine { return numPadding; } + public static void main(String args[]) { + I2PAppContext ctx = new I2PAppContext(); + ElGamalAESEngine e = new ElGamalAESEngine(ctx); + Object kp[] = ctx.keyGenerator().generatePKIKeypair(); + PublicKey pubKey = (PublicKey)kp[0]; + PrivateKey privKey = (PrivateKey)kp[1]; + SessionKey sessionKey = ctx.keyGenerator().generateSessionKey(); + for (int i = 0; i < 10; i++) { + try { + Set tags = new HashSet(5); + if (i == 0) { + for (int j = 0; j < 5; j++) + tags.add(new SessionTag(true)); + } + byte encrypted[] = e.encrypt("blah".getBytes(), pubKey, sessionKey, tags, 1024); + byte decrypted[] = e.decrypt(encrypted, privKey); + if ("blah".equals(new String(decrypted))) { + System.out.println("equal on " + i); + } else { + System.out.println("NOT equal on " + i + ": " + new String(decrypted)); + break; + } + ctx.sessionKeyManager().tagsDelivered(pubKey, sessionKey, tags); + } catch (Exception ee) { + ee.printStackTrace(); + break; + } + } + } } \ No newline at end of file diff --git a/core/java/src/net/i2p/crypto/ElGamalEngine.java b/core/java/src/net/i2p/crypto/ElGamalEngine.java index b9c351388..4a6a9470c 100644 --- a/core/java/src/net/i2p/crypto/ElGamalEngine.java +++ b/core/java/src/net/i2p/crypto/ElGamalEngine.java @@ -99,8 +99,10 @@ public class ElGamalEngine { byte d2[] = new byte[1+Hash.HASH_LENGTH+data.length]; d2[0] = (byte)0xFF; - Hash hash = _context.sha().calculateHash(data); + SHA256EntryCache.CacheEntry cache = _context.sha().cache().acquire(data.length); + Hash hash = _context.sha().calculateHash(data, cache); System.arraycopy(hash.getData(), 0, d2, 1, Hash.HASH_LENGTH); + _context.sha().cache().release(cache); System.arraycopy(data, 0, d2, 1+Hash.HASH_LENGTH, data.length); long t0 = _context.clock().now(); @@ -180,21 +182,23 @@ public class ElGamalEngine { for (i = 0; i < val.length; i++) if (val[i] != (byte) 0x00) break; - ByteArrayInputStream bais = new ByteArrayInputStream(val, i, val.length - i); - Hash hash = new Hash(); - byte rv[] = null; - try { - bais.read(); // skip first byte - hash.readBytes(bais); - rv = new byte[val.length - i - 1 - 32]; - bais.read(rv); - } catch (Exception e) { - if (_log.shouldLog(Log.ERROR)) _log.error("Internal error reading value", e); + //ByteArrayInputStream bais = new ByteArrayInputStream(val, i, val.length - i); + byte hashData[] = new byte[Hash.HASH_LENGTH]; + System.arraycopy(val, i + 1, hashData, 0, Hash.HASH_LENGTH); + Hash hash = new Hash(hashData); + int payloadLen = val.length - i - 1 - Hash.HASH_LENGTH; + if (payloadLen < 0) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Decrypted data is too small (" + (val.length - i)+ ")"); return null; } + byte rv[] = new byte[payloadLen]; + System.arraycopy(val, i + 1 + Hash.HASH_LENGTH, rv, 0, rv.length); - Hash calcHash = _context.sha().calculateHash(rv); + SHA256EntryCache.CacheEntry cache = _context.sha().cache().acquire(payloadLen); + Hash calcHash = _context.sha().calculateHash(rv, cache); boolean ok = calcHash.equals(hash); + _context.sha().cache().release(cache); long end = _context.clock().now(); @@ -211,7 +215,7 @@ public class ElGamalEngine { return rv; } if (_log.shouldLog(Log.DEBUG)) - _log.debug("Doesn't match hash [calc=" + calcHash + " sent hash=" + hash + "]\ndata = " + _log.debug("Doesn't match hash [sent hash=" + hash + "]\ndata = " + Base64.encode(rv), new Exception("Doesn't match")); return null; } diff --git a/core/java/src/net/i2p/crypto/SHA256EntryCache.java b/core/java/src/net/i2p/crypto/SHA256EntryCache.java new file mode 100644 index 000000000..4b6abba37 --- /dev/null +++ b/core/java/src/net/i2p/crypto/SHA256EntryCache.java @@ -0,0 +1,238 @@ +package net.i2p.crypto; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import net.i2p.I2PAppContext; +import net.i2p.data.Hash; + +/** + * Cache the objects used in SHA256Generator's calculate method to reduce + * memory churn. The CacheEntry should be held onto as long as the + * data referenced in it is needed (which often is only one or two lines + * of code) + * + */ +public final class SHA256EntryCache { + private static final int ONE_KB = 0; + private static final int FOUR_KB = 1; + private static final int EIGHT_KB = 2; + private static final int SIXTEEN_KB = 3; + private static final int THIRTYTWO_KB = 4; + private static final int FOURTYEIGHT_KB = 5; + private static final int LARGER = 6; + /** + * Array of Lists of free CacheEntry objects, indexed + * by the payload size they are capable of handling + */ + private List _available[] = new List[6]; + /** count up how often we use the cache for each size */ + private long _used[] = new long[7]; + private int _sizes[] = new int[] { 1024,4*1024,8*1024,16*1024,32*1024,48*1024 }; + + /** no more than 32 at each size level */ + private static final int MAX_CACHED = 64; + + public SHA256EntryCache() { + for (int i = 0; i < _available.length; i++) { + _available[i] = new ArrayList(MAX_CACHED); + //for (int j = 0; j < MAX_CACHED; j++) + // _available[i].add(new CacheEntry(_sizes[i])); + } + } + + /** + * Get the next available structure, either from the cache or a brand new one + * + */ + public final CacheEntry acquire(int payload) { + int entrySize = getBucket(payload); + switch (entrySize) { + case 1024: + _used[ONE_KB]++; + synchronized (_available[ONE_KB]) { + if (_available[ONE_KB].size() > 0) { + return (CacheEntry)_available[ONE_KB].remove(0); + } + } + break; + case 4*1024: + _used[FOUR_KB]++; + synchronized (_available[FOUR_KB]) { + if (_available[FOUR_KB].size() > 0) { + return (CacheEntry)_available[FOUR_KB].remove(0); + } + } + break; + case 8*1024: + _used[EIGHT_KB]++; + synchronized (_available[EIGHT_KB]) { + if (_available[EIGHT_KB].size() > 0) { + return (CacheEntry)_available[EIGHT_KB].remove(0); + } + } + break; + case 16*1024: + _used[SIXTEEN_KB]++; + synchronized (_available[SIXTEEN_KB]) { + if (_available[SIXTEEN_KB].size() > 0) { + return (CacheEntry)_available[SIXTEEN_KB].remove(0); + } + } + break; + case 32*1024: + _used[THIRTYTWO_KB]++; + synchronized (_available[THIRTYTWO_KB]) { + if (_available[THIRTYTWO_KB].size() > 0) { + return (CacheEntry)_available[THIRTYTWO_KB].remove(0); + } + } + break; + case 48*1024: + _used[FOURTYEIGHT_KB]++; + synchronized (_available[FOURTYEIGHT_KB]) { + if (_available[FOURTYEIGHT_KB].size() > 0) { + return (CacheEntry)_available[FOURTYEIGHT_KB].remove(0); + } + } + break; + default: + _used[LARGER]++; + // not for the bucket, so make it exact + return new CacheEntry(payload); + } + return new CacheEntry(entrySize); + } + + /** + * Put this structure back onto the available cache for reuse + * + */ + public final void release(CacheEntry entry) { + entry.reset(); + if (false) return; + switch (entry.bucket) { + case 1024: + synchronized (_available[ONE_KB]) { + if (_available[ONE_KB].size() < MAX_CACHED) { + _available[ONE_KB].add(entry); + } + } + return; + case 4*1024: + synchronized (_available[FOUR_KB]) { + if (_available[FOUR_KB].size() < MAX_CACHED) { + _available[FOUR_KB].add(entry); + } + } + return; + case 8*1024: + synchronized (_available[EIGHT_KB]) { + if (_available[EIGHT_KB].size() < MAX_CACHED) { + _available[EIGHT_KB].add(entry); + } + } + return; + case 16*1024: + synchronized (_available[SIXTEEN_KB]) { + if (_available[SIXTEEN_KB].size() < MAX_CACHED) { + _available[SIXTEEN_KB].add(entry); + } + } + return; + case 32*1024: + synchronized (_available[THIRTYTWO_KB]) { + if (_available[THIRTYTWO_KB].size() < MAX_CACHED) { + _available[THIRTYTWO_KB].add(entry); + } + } + return; + case 48*1024: + synchronized (_available[FOURTYEIGHT_KB]) { + if (_available[FOURTYEIGHT_KB].size() < MAX_CACHED) { + _available[FOURTYEIGHT_KB].add(entry); + } + } + return; + } + } + + /** + * all the data alloc'ed in a calculateHash call + */ + public static final class CacheEntry { + byte hashbytes[]; + int W[]; + int M0[]; + int H[]; + Hash hash; + int wordlength; + int bucket; + + public CacheEntry(int payload) { + wordlength = SHA256Generator.getWordlength(payload); + bucket = payload; + hashbytes = new byte[32]; + M0 = new int[wordlength]; + W = new int[64]; + H = new int[8]; + hash = new Hash(); + hash.setData(hashbytes); + } + + public final void reset() { + Arrays.fill(hashbytes, (byte)0x0); + Arrays.fill(M0, (byte)0x0); + Arrays.fill(W, (byte)0x0); + Arrays.fill(H, (byte)0x0); + } + } + + private static final int getBucket(int payload) { + if (payload <= 1024) + return 1024; + else if (payload <= 4*1024) + return 4*1024; + else if (payload <= 8*1024) + return 8*1024; + else if (payload <= 16*1024) + return 16*1024; + else if (payload <= 32*1024) + return 32*1024; + else if (payload <= 48*1024) + return 48*1024; + else + return payload; + } + + public static void main(String args[]) { + I2PAppContext ctx = new I2PAppContext(); + for (int i = 1; i < 20000; i+=2) { + test(ctx, i); + } + } + private static void test(I2PAppContext ctx, int size) { + System.out.print("Size = " + size); + for (int i = 0; i < 2; i++) { + byte orig[] = new byte[size]; + ctx.random().nextBytes(orig); + CacheEntry cache = ctx.sha().cache().acquire(orig.length); + try { + Hash h = ctx.sha().calculateHash(orig, cache); + Hash h2 = ctx.sha().calculateHash(orig); + boolean eq = h.equals(h2); + ctx.sha().cache().release(cache); + if (eq) { + System.out.print("."); + } else { + System.out.print("ERROR " + i); + break; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + System.out.println(); + } +} diff --git a/core/java/src/net/i2p/crypto/SHA256Generator.java b/core/java/src/net/i2p/crypto/SHA256Generator.java index 749747c50..38a84aaa8 100644 --- a/core/java/src/net/i2p/crypto/SHA256Generator.java +++ b/core/java/src/net/i2p/crypto/SHA256Generator.java @@ -29,6 +29,7 @@ package net.i2p.crypto; * POSSIBILITY OF SUCH DAMAGE. */ +import java.util.Arrays; import net.i2p.I2PAppContext; import net.i2p.data.Hash; @@ -38,15 +39,18 @@ import net.i2p.data.Hash; * * @author thecrypto,jrandom */ -public class SHA256Generator { +public final class SHA256Generator { + private final SHA256EntryCache _cache = new SHA256EntryCache(); public SHA256Generator(I2PAppContext context) { // nop } - public static SHA256Generator getInstance() { + public static final SHA256Generator getInstance() { return I2PAppContext.getGlobalContext().sha(); } + + public final SHA256EntryCache cache() { return _cache; } - static int[] K = { 0x428a2f98, 0x71374491, 0xb5c0fbcf, 0xe9b5dba5, 0x3956c25b, 0x59f111f1, 0x923f82a4, 0xab1c5ed5, + static final int[] K = { 0x428a2f98, 0x71374491, 0xb5c0fbcf, 0xe9b5dba5, 0x3956c25b, 0x59f111f1, 0x923f82a4, 0xab1c5ed5, 0xd807aa98, 0x12835b01, 0x243185be, 0x550c7dc3, 0x72be5d74, 0x80deb1fe, 0x9bdc06a7, 0xc19bf174, 0xe49b69c1, 0xefbe4786, 0x0fc19dc6, 0x240ca1cc, 0x2de92c6f, 0x4a7484aa, 0x5cb0a9dc, 0x76f988da, 0x983e5152, 0xa831c66d, 0xb00327c8, 0xbf597fc7, 0xc6e00bf3, 0xd5a79147, 0x06ca6351, 0x14292967, @@ -55,16 +59,49 @@ public class SHA256Generator { 0x19a4c116, 0x1e376c08, 0x2748774c, 0x34b0bcb5, 0x391c0cb3, 0x4ed8aa4a, 0x5b9cca4f, 0x682e6ff3, 0x748f82ee, 0x78a5636f, 0x84c87814, 0x8cc70208, 0x90befffa, 0xa4506ceb, 0xbef9a3f7, 0xc67178f2}; - static int[] H0 = { 0x6a09e667, 0xbb67ae85, 0x3c6ef372, 0xa54ff53a, 0x510e527f, 0x9b05688c, 0x1f83d9ab, 0x5be0cd19}; + static final int[] H0 = { 0x6a09e667, 0xbb67ae85, 0x3c6ef372, 0xa54ff53a, 0x510e527f, 0x9b05688c, 0x1f83d9ab, 0x5be0cd19}; + public static final int getWordlength(int sourceLength) { + long length = sourceLength * 8; + int k = 448 - (int) ((length + 1) % 512); + if (k < 0) { + k += 512; + } + int padbytes = k / 8; + int rv = sourceLength / 4 + padbytes / 4 + 3; + return rv; + } + + private final SHA256EntryCache.CacheEntry getNewEntry(int payloadSize) { + return new SHA256EntryCache.CacheEntry(payloadSize); + } + /** Calculate the SHA-256 has of the source * @param source what to hash * @return hash of the source */ - public Hash calculateHash(byte[] source) { - return calculateHash(source, 0, source.length); + public final Hash calculateHash(byte[] source) { + byte rv[] = new byte[Hash.HASH_LENGTH]; + SHA256EntryCache.CacheEntry cache = _cache.acquire(source.length); + Hash hash = calculateHash(source, 0, source.length, cache); + System.arraycopy(hash.getData(), 0, rv, 0, Hash.HASH_LENGTH); + _cache.release(cache); + return new Hash(rv); } - public Hash calculateHash(byte[] source, int start, int len) { + public final Hash calculateHash(byte[] source, SHA256EntryCache.CacheEntry cache) { + return calculateHash(source, 0, source.length, cache); + } + public final Hash calculateHash(byte[] source, int start, int len) { + byte rv[] = new byte[Hash.HASH_LENGTH]; + SHA256EntryCache.CacheEntry cache = _cache.acquire(len); + Hash hash = calculateHash(source, start, len, cache); + System.arraycopy(hash.getData(), 0, rv, 0, Hash.HASH_LENGTH); + _cache.release(cache); + return new Hash(rv); + } + public final Hash calculateHash(byte[] source, int start, int len, SHA256EntryCache.CacheEntry cache) { + if (cache == null) + return calculateHash(source, start, len); long length = len * 8; int k = 448 - (int) ((length + 1) % 512); if (k < 0) { @@ -72,7 +109,10 @@ public class SHA256Generator { } int padbytes = k / 8; int wordlength = len / 4 + padbytes / 4 + 3; - int[] M0 = new int[wordlength]; + if (wordlength != getWordlength(len)) + throw new RuntimeException("len = " + len + " wordlength = " + wordlength + + " getWordlength = " + getWordlength(len)); + int[] M0 = cache.M0; int wordcount = 0; int x = 0; for (x = 0; x < (len / 4) * 4; x += 4) { @@ -104,11 +144,12 @@ public class SHA256Generator { } M0[wordlength - 2] = (int) (length >>> 32); M0[wordlength - 1] = (int) (length); - int[] H = new int[8]; + int[] H = cache.H; for (x = 0; x < 8; x++) { H[x] = H0[x]; } - int blocks = M0.length / 16; + int blocks = wordlength / 16; + int[] W = cache.W; for (int bl = 0; bl < blocks; bl++) { int a = H[0]; int b = H[1]; @@ -118,7 +159,7 @@ public class SHA256Generator { int f = H[5]; int g = H[6]; int h = H[7]; - int[] W = new int[64]; + Arrays.fill(W, 0); for (x = 0; x < 64; x++) { if (x < 16) { W[x] = M0[bl * 16 + x]; @@ -147,51 +188,49 @@ public class SHA256Generator { H[6] = add(g, H[6]); H[7] = add(h, H[7]); } - byte[] hashbytes = new byte[32]; + byte[] hashbytes = cache.hashbytes; for (x = 0; x < 8; x++) { hashbytes[x * 4] = (byte) (H[x] << 0 >>> 24); hashbytes[x * 4 + 1] = (byte) (H[x] << 8 >>> 24); hashbytes[x * 4 + 2] = (byte) (H[x] << 16 >>> 24); hashbytes[x * 4 + 3] = (byte) (H[x] << 24 >>> 24); } - Hash hash = new Hash(); - hash.setData(hashbytes); - return hash; + return cache.hash; } - private static int Ch(int x, int y, int z) { + private static final int Ch(int x, int y, int z) { return (x & y) ^ (~x & z); } - private static int Maj(int x, int y, int z) { + private static final int Maj(int x, int y, int z) { return (x & y) ^ (x & z) ^ (y & z); } - private static int ROTR(int x, int n) { + private static final int ROTR(int x, int n) { return (x >>> n) | (x << 32 - n); } - private static int e0(int x) { + private static final int e0(int x) { return ROTR(x, 2) ^ ROTR(x, 13) ^ ROTR(x, 22); } - private static int e1(int x) { + private static final int e1(int x) { return ROTR(x, 6) ^ ROTR(x, 11) ^ ROTR(x, 25); } - private static int SHR(int x, int n) { + private static final int SHR(int x, int n) { return x >>> n; } - private static int o0(int x) { + private static final int o0(int x) { return ROTR(x, 7) ^ ROTR(x, 18) ^ SHR(x, 3); } - private static int o1(int x) { + private static final int o1(int x) { return ROTR(x, 17) ^ ROTR(x, 19) ^ SHR(x, 10); } - private static int add(int x, int y) { + private static final int add(int x, int y) { return x + y; } } \ No newline at end of file diff --git a/core/java/src/net/i2p/data/TunnelId.java b/core/java/src/net/i2p/data/TunnelId.java index 133069360..415c293f0 100644 --- a/core/java/src/net/i2p/data/TunnelId.java +++ b/core/java/src/net/i2p/data/TunnelId.java @@ -71,6 +71,11 @@ public class TunnelId extends DataStructureImpl { if (_tunnelId < 0) throw new DataFormatException("Invalid tunnel ID: " + _tunnelId); DataHelper.writeLong(out, 4, _tunnelId); } + public int writeBytes(byte target[], int offset) throws DataFormatException { + if (_tunnelId < 0) throw new DataFormatException("Invalid tunnel ID: " + _tunnelId); + DataHelper.toLong(target, offset, 4, _tunnelId); + return 4; + } public boolean equals(Object obj) { if ( (obj == null) || !(obj instanceof TunnelId)) diff --git a/core/java/src/net/i2p/data/i2cp/MessagePayloadMessage.java b/core/java/src/net/i2p/data/i2cp/MessagePayloadMessage.java index 230de8836..d0f60436e 100644 --- a/core/java/src/net/i2p/data/i2cp/MessagePayloadMessage.java +++ b/core/java/src/net/i2p/data/i2cp/MessagePayloadMessage.java @@ -12,6 +12,7 @@ package net.i2p.data.i2cp; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import net.i2p.data.DataFormatException; import net.i2p.data.DataHelper; @@ -75,6 +76,15 @@ public class MessagePayloadMessage extends I2CPMessageImpl { } protected byte[] doWriteMessage() throws I2CPMessageException, IOException { + throw new RuntimeException("go away, we dont want any"); + } + + /** + * Write out the full message to the stream, including the 4 byte size and 1 + * byte type header. + * + */ + public void writeMessage(OutputStream out) throws I2CPMessageException, IOException { if (_sessionId == null) throw new I2CPMessageException("Unable to write out the message, as the session ID has not been defined"); if (_messageId == null) @@ -82,15 +92,17 @@ public class MessagePayloadMessage extends I2CPMessageImpl { if (_payload == null) throw new I2CPMessageException("Unable to write out the message, as the payload has not been defined"); - ByteArrayOutputStream os = new ByteArrayOutputStream(512); + int size = 2 + 4 + 4 + _payload.getSize(); try { - _sessionId.writeBytes(os); - _messageId.writeBytes(os); - _payload.writeBytes(os); + DataHelper.writeLong(out, 4, size); + DataHelper.writeLong(out, 1, getType()); + DataHelper.writeLong(out, 2, _sessionId.getSessionId()); + DataHelper.writeLong(out, 4, _messageId.getMessageId()); + DataHelper.writeLong(out, 4, _payload.getSize()); + out.write(_payload.getEncryptedData()); } catch (DataFormatException dfe) { - throw new I2CPMessageException("Error writing out the message data", dfe); + throw new I2CPMessageException("Unable to write the message length or type", dfe); } - return os.toByteArray(); } public int getType() { diff --git a/core/java/src/net/i2p/data/i2cp/SendMessageMessage.java b/core/java/src/net/i2p/data/i2cp/SendMessageMessage.java index 357bff1d9..c39f98967 100644 --- a/core/java/src/net/i2p/data/i2cp/SendMessageMessage.java +++ b/core/java/src/net/i2p/data/i2cp/SendMessageMessage.java @@ -75,6 +75,19 @@ public class SendMessageMessage extends I2CPMessageImpl { } protected void doReadMessage(InputStream in, int size) throws I2CPMessageException, IOException { + if (true) throw new IllegalStateException("wtf, do not run me"); + } + + /** + * Read the body into the data structures + * + */ + public void readMessage(InputStream in, int length, int type) throws I2CPMessageException, IOException { + if (type != getType()) + throw new I2CPMessageException("Invalid message type (found: " + type + " supported: " + getType() + + " class: " + getClass().getName() + ")"); + if (length < 0) throw new IOException("Negative payload size"); + try { _sessionId = new SessionId(); _sessionId.readBytes(in); diff --git a/core/java/src/net/i2p/util/Log.java b/core/java/src/net/i2p/util/Log.java index ff14968cb..74aea9237 100644 --- a/core/java/src/net/i2p/util/Log.java +++ b/core/java/src/net/i2p/util/Log.java @@ -25,6 +25,7 @@ import net.i2p.data.DataHelper; */ public class Log { private Class _class; + private String _className; private String _name; private int _minPriority; private LogScope _scope; @@ -90,6 +91,7 @@ public class Log { Log(LogManager manager, Class cls, String name) { _manager = manager; _class = cls; + _className = cls != null ? cls.getName() : null; _name = name; _minPriority = DEBUG; _scope = new LogScope(name, cls); @@ -161,33 +163,37 @@ public class Log { } public String getName() { - if (_class != null) return _class.getName(); + if (_className != null) return _className; return _name; } public Object getScope() { return _scope; } + static String getScope(String name, Class cls) { + if ( (name == null) && (cls == null) ) return "f00"; + if (cls == null) return name; + if (name == null) return cls.getName(); + return name + "" + cls.getName(); + } private static final class LogScope { private String _scopeName; private Class _scopeClass; + private String _scopeCache; public LogScope(String name, Class cls) { _scopeName = name; _scopeClass = cls; + _scopeCache = getScope(name, cls); } public int hashCode() { - if (_scopeClass != null) - return _scopeClass.hashCode(); - else if (_scopeName != null) - return _scopeName.hashCode(); - else - return 42; + return _scopeCache.hashCode(); } public boolean equals(Object obj) { if (obj == null) throw new NullPointerException("Null object scope?"); if (obj instanceof LogScope) { LogScope s = (LogScope)obj; - return DataHelper.eq(s._scopeName, _scopeName) && - DataHelper.eq(s._scopeClass, _scopeClass); + return s._scopeCache.equals(_scopeCache); + } else if (obj instanceof String) { + return obj.equals(_scopeCache); } return false; diff --git a/core/java/src/net/i2p/util/LogManager.java b/core/java/src/net/i2p/util/LogManager.java index f51b21662..dfcab56dc 100644 --- a/core/java/src/net/i2p/util/LogManager.java +++ b/core/java/src/net/i2p/util/LogManager.java @@ -141,14 +141,11 @@ public class LogManager { public Log getLog(Class cls, String name) { Log rv = null; synchronized (_logs) { - Log newLog = new Log(this, cls, name); - if (_logs.containsKey(newLog.getScope())) { - Log oldLog = (Log)_logs.get(newLog.getScope()); - rv = oldLog; - //_log.error("Duplicate log creation for " + cls); - } else { - _logs.put(newLog.getScope(), newLog); - rv = newLog; + String scope = Log.getScope(name, cls); + rv = (Log)_logs.get(scope); + if (rv == null) { + rv = new Log(this, cls, name); + _logs.put(scope, rv); } } updateLimit(rv); @@ -483,14 +480,16 @@ public class LogManager { List limits = getLimits(log); LogLimit max = null; LogLimit notMax = null; - for (int i = 0; i < limits.size(); i++) { - LogLimit cur = (LogLimit) limits.get(i); - if (max == null) - max = cur; - else { - if (cur.getRootName().length() > max.getRootName().length()) { - notMax = max; + if (limits != null) { + for (int i = 0; i < limits.size(); i++) { + LogLimit cur = (LogLimit) limits.get(i); + if (max == null) max = cur; + else { + if (cur.getRootName().length() > max.getRootName().length()) { + notMax = max; + max = cur; + } } } } @@ -504,11 +503,15 @@ public class LogManager { } private List getLimits(Log log) { - ArrayList limits = new ArrayList(4); + ArrayList limits = null; // new ArrayList(4); synchronized (_limits) { for (int i = 0; i < _limits.size(); i++) { LogLimit limit = (LogLimit)_limits.get(i); - if (limit.matches(log)) limits.add(limit); + if (limit.matches(log)) { + if (limits == null) + limits = new ArrayList(4); + limits.add(limit); + } } } return limits; diff --git a/core/java/test/net/i2p/crypto/SHA256Bench.java b/core/java/test/net/i2p/crypto/SHA256Bench.java index 05527a0e2..7c1fce418 100644 --- a/core/java/test/net/i2p/crypto/SHA256Bench.java +++ b/core/java/test/net/i2p/crypto/SHA256Bench.java @@ -45,6 +45,17 @@ public class SHA256Bench { long maxMed = 0; long minLong = 0; long maxLong = 0; + + long shorttime1 = 0; + long medtime1 = 0; + long longtime1 = 0; + long minShort1 = 0; + long maxShort1 = 0; + long minMed1 = 0; + long maxMed1 = 0; + long minLong1 = 0; + long maxLong1 = 0; + byte[] smess = new String("abc").getBytes(); StringBuffer buf = new StringBuffer(); for (int x = 0; x < 10*1024; x++) { @@ -56,6 +67,12 @@ public class SHA256Bench { buf.append("a"); } byte[] lmess = buf.toString().getBytes(); + + SHA256EntryCache cache = new SHA256EntryCache(); + SHA256EntryCache.CacheEntry scache = cache.acquire(smess.length); + SHA256EntryCache.CacheEntry mcache = cache.acquire(mmess.length); + SHA256EntryCache.CacheEntry lcache = cache.acquire(lmess.length); + // warm up the engines SHA256Generator.getInstance().calculateHash(smess); SHA256Generator.getInstance().calculateHash(mmess); @@ -63,16 +80,42 @@ public class SHA256Bench { // now do it for (int x = 0; x < times; x++) { long startshort = System.currentTimeMillis(); - Hash s = SHA256Generator.getInstance().calculateHash(smess); + boolean cacheOnly = false; + // no caching + Hash s = cacheOnly ? null : SHA256Generator.getInstance().calculateHash(smess); long endshortstartmed = System.currentTimeMillis(); - Hash m = SHA256Generator.getInstance().calculateHash(mmess); + Hash m = cacheOnly ? null : SHA256Generator.getInstance().calculateHash(mmess); long endmedstartlong = System.currentTimeMillis(); - Hash l = SHA256Generator.getInstance().calculateHash(lmess); + Hash l = cacheOnly ? null : SHA256Generator.getInstance().calculateHash(lmess); long endlong = System.currentTimeMillis(); - System.out.print("."); + + // now do it with caching + Hash s1 = SHA256Generator.getInstance().calculateHash(smess, 0, smess.length, scache); + long endshortstartmed1 = System.currentTimeMillis(); + Hash m1 = SHA256Generator.getInstance().calculateHash(mmess, 0, mmess.length, mcache); + long endmedstartlong1 = System.currentTimeMillis(); + Hash l1 = SHA256Generator.getInstance().calculateHash(lmess, 0, lmess.length, lcache); + long endlong1 = System.currentTimeMillis(); + + if (cacheOnly) { + // dont verify + } else { + if (!s.equals(s1) || !m.equals(m1) || !l.equals(l1)) { + System.err.println("*ERR* match failed on " + x + + "s="+ s.equals(s1) + " m=" + m.equals(m1) + + " l=" + l.equals(l1)); + return; + } + } + + //System.out.print("."); shorttime += endshortstartmed - startshort; medtime += endmedstartlong - endshortstartmed; longtime += endlong - endmedstartlong; + + shorttime1 += endshortstartmed1 - endlong; + medtime1 += endmedstartlong1 - endshortstartmed1; + longtime1 += endlong1 - endmedstartlong1; if ((minShort == 0) && (minMed == 0) && (minLong == 0) ) { minShort = endshortstartmed - startshort; @@ -81,6 +124,13 @@ public class SHA256Bench { maxMed = endmedstartlong - endshortstartmed; minLong = endlong - endmedstartlong; maxLong = endlong - endmedstartlong; + + minShort1 = endshortstartmed1 - endlong; + maxShort1 = endshortstartmed1 - endlong; + minMed1 = endmedstartlong1 - endshortstartmed1; + maxMed1 = endmedstartlong1 - endshortstartmed1; + minLong1 = endlong1 - endmedstartlong1; + maxLong1 = endlong1 - endmedstartlong1; } else { if (minShort > endshortstartmed - startshort) minShort = endshortstartmed - startshort; if (maxShort < endshortstartmed - startshort) maxShort = endshortstartmed - startshort; @@ -88,12 +138,23 @@ public class SHA256Bench { if (maxMed < endmedstartlong - endshortstartmed) maxMed = endmedstartlong - endshortstartmed; if (minLong > endlong - endmedstartlong) minLong = endlong - endmedstartlong; if (maxLong < endlong - endmedstartlong) maxLong = endlong - endmedstartlong; + + if (minShort1 > endshortstartmed1 - endlong) minShort1 = endshortstartmed1 - endlong; + if (maxShort1 < endshortstartmed1 - endlong) maxShort1 = endshortstartmed1 - endlong; + if (minMed1 > endmedstartlong1 - endshortstartmed1) minMed1 = endmedstartlong - endshortstartmed; + if (maxMed1 < endmedstartlong1 - endshortstartmed1) maxMed1 = endmedstartlong - endshortstartmed; + if (minLong1 > endlong1 - endmedstartlong1) minLong1 = endlong1 - endmedstartlong1; + if (maxLong1 < endlong1 - endmedstartlong1) maxLong1 = endlong1 - endmedstartlong1; } } System.out.println(); System.out.println("Short Message Time Average : " + (shorttime/times) + "\ttotal: " + shorttime + "\tmin: " + minShort + "\tmax: " + maxShort + "\tBps: " + (shorttime == 0 ? "NaN" : ""+(times*smess.length)/shorttime)); - System.out.println("Medium Message Time Average : " + (medtime/times) + "\ttotal: " + medtime + "\tmin: " + minMed + "\tmax: " + maxMed + "\tBps: " + (times*mmess.length*1000)/medtime); - System.out.println("Long Message Time Average : " + (longtime/times) + "\ttotal: " + longtime + "\tmin: " + minLong + "\tmax: " + maxLong + "\tBps: " + (times*lmess.length*1000)/longtime); + System.out.println("Medium Message Time Average : " + (medtime/times) + "\ttotal: " + medtime + "\tmin: " + minMed + "\tmax: " + maxMed + "\tBps: " + (medtime == 0 ? "NaN" : ""+(times*mmess.length*1000)/medtime)); + System.out.println("Long Message Time Average : " + (longtime/times) + "\ttotal: " + longtime + "\tmin: " + minLong + "\tmax: " + maxLong + "\tBps: " + (longtime == 0 ? "NaN" : "" + (times*lmess.length*1000)/longtime)); + + System.out.println("Short Message Time (cache) : " + (shorttime1/times) + "\ttotal: " + shorttime1 + "\tmin: " + minShort1 + "\tmax: " + maxShort1 + "\tBps: " + (shorttime1 == 0 ? "NaN" : ""+(times*smess.length)/shorttime1)); + System.out.println("Medium Message Time (cache) : " + (medtime1/times) + "\ttotal: " + medtime1 + "\tmin: " + minMed1 + "\tmax: " + maxMed1 + "\tBps: " + (times*mmess.length*1000)/medtime1); + System.out.println("Long Message Time (cache) : " + (longtime1/times) + "\ttotal: " + longtime1 + "\tmin: " + minLong1 + "\tmax: " + maxLong1 + "\tBps: " + (times*lmess.length*1000)/longtime1); } } diff --git a/history.txt b/history.txt index ba6aa69c7..e5ce9ad1e 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,16 @@ -$Id: history.txt,v 1.58 2004/10/29 21:40:52 jrandom Exp $ +$Id: history.txt,v 1.59 2004/10/30 18:44:01 jrandom Exp $ + +2004-11-01 jrandom + * Increase the tunnel test timeout rapidly if our tunnels are failing. + * Honor message expirations for some tunnel jobs that were prematurely + expired. + * Streamline memory usage with temporary object caches and more efficient + serialization for SHA256 calculation, logging, and both I2CP and I2NP + message handling. + * Fix some situations where we forward messages too eagerly. For a + request at the tunnel endpoint, if the tunnel is inbound and the target + is remote, honor the message by tunnel routing the data rather than + sending it directly to the requested location. 2004-10-30 jrandom * Cache the temporary objects used in the AES encryption/decryption diff --git a/router/java/src/net/i2p/data/i2np/DeliveryInstructions.java b/router/java/src/net/i2p/data/i2np/DeliveryInstructions.java index eefbb0263..94368d92f 100644 --- a/router/java/src/net/i2p/data/i2np/DeliveryInstructions.java +++ b/router/java/src/net/i2p/data/i2np/DeliveryInstructions.java @@ -126,6 +126,61 @@ public class DeliveryInstructions extends DataStructureImpl { } } + public int readBytes(byte data[], int offset) throws DataFormatException { + int cur = offset; + long flags = DataHelper.fromLong(data, cur, 1); + cur++; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Read flags: " + flags + " mode: " + flagMode(flags)); + + if (flagEncrypted(flags)) { + byte kd[] = new byte[SessionKey.KEYSIZE_BYTES]; + System.arraycopy(data, cur, kd, 0, SessionKey.KEYSIZE_BYTES); + cur += SessionKey.KEYSIZE_BYTES; + setEncryptionKey(new SessionKey(kd)); + setEncrypted(true); + } else { + setEncrypted(false); + } + + setDeliveryMode(flagMode(flags)); + switch (flagMode(flags)) { + case FLAG_MODE_LOCAL: + break; + case FLAG_MODE_DESTINATION: + byte destHash[] = new byte[Hash.HASH_LENGTH]; + System.arraycopy(data, cur, destHash, 0, Hash.HASH_LENGTH); + cur += Hash.HASH_LENGTH; + setDestination(new Hash(destHash)); + break; + case FLAG_MODE_ROUTER: + byte routerHash[] = new byte[Hash.HASH_LENGTH]; + System.arraycopy(data, cur, routerHash, 0, Hash.HASH_LENGTH); + cur += Hash.HASH_LENGTH; + setRouter(new Hash(routerHash)); + break; + case FLAG_MODE_TUNNEL: + byte tunnelRouterHash[] = new byte[Hash.HASH_LENGTH]; + System.arraycopy(data, cur, tunnelRouterHash, 0, Hash.HASH_LENGTH); + cur += Hash.HASH_LENGTH; + setRouter(new Hash(tunnelRouterHash)); + setTunnelId(new TunnelId(DataHelper.fromLong(data, cur, 4))); + cur += 4; + break; + } + + if (flagDelay(flags)) { + long delay = DataHelper.fromLong(data, cur, 4); + cur += 4; + setDelayRequested(true); + setDelaySeconds(delay); + } else { + setDelayRequested(false); + } + return cur - offset; + } + + private boolean flagEncrypted(long flags) { return (0 != (flags & FLAG_ENCRYPTED)); } diff --git a/router/java/src/net/i2p/data/i2np/I2NPMessage.java b/router/java/src/net/i2p/data/i2np/I2NPMessage.java index fd967c114..45f0dd87e 100644 --- a/router/java/src/net/i2p/data/i2np/I2NPMessage.java +++ b/router/java/src/net/i2p/data/i2np/I2NPMessage.java @@ -35,7 +35,8 @@ public interface I2NPMessage extends DataStructure { * @throws IOException if there is a problem reading from the stream */ public int readBytes(InputStream in, int type, byte buffer[]) throws I2NPMessageException, IOException; - + public int readBytes(byte data[], int type, int offset) throws I2NPMessageException, IOException; + /** * Read the body into the data structures, after the initial type byte and * the uniqueId / expiration, using the current class's format as defined by diff --git a/router/java/src/net/i2p/data/i2np/I2NPMessageHandler.java b/router/java/src/net/i2p/data/i2np/I2NPMessageHandler.java index fcd5dd323..08c3925c4 100644 --- a/router/java/src/net/i2p/data/i2np/I2NPMessageHandler.java +++ b/router/java/src/net/i2p/data/i2np/I2NPMessageHandler.java @@ -18,7 +18,7 @@ import net.i2p.data.DataHelper; import net.i2p.util.Log; /** - * Handle messages from router to router + * Handle messages from router to router. This class is NOT threadsafe * */ public class I2NPMessageHandler { @@ -28,6 +28,8 @@ public class I2NPMessageHandler { private long _lastReadEnd; private int _lastSize; private byte _messageBuffer[]; + private I2NPMessage _lastRead; + public I2NPMessageHandler(I2PAppContext context) { _context = context; _log = context.logManager().getLog(I2NPMessageHandler.class); @@ -68,6 +70,51 @@ public class I2NPMessageHandler { throw new I2NPMessageException("Error reading the message", dfe); } } + + /** clear the last message read from a byte array with an offset */ + public I2NPMessage lastRead() { + I2NPMessage rv = _lastRead; + _lastRead = null; + return rv; + } + + /** + * Read an I2NPMessage from the stream and return the fully populated object. + * + * @throws IOException if there is an IO problem reading from the stream + * @throws I2NPMessageException if there is a problem handling the particular + * message - if it is an unknown type or has improper formatting, etc. + */ + public I2NPMessage readMessage(byte data[]) throws IOException, I2NPMessageException { + int offset = readMessage(data, 0); + return lastRead(); + } + public int readMessage(byte data[], int offset) throws IOException, I2NPMessageException { + int cur = offset; + int type = (int)DataHelper.fromLong(data, cur, 1); + cur++; + _lastReadBegin = System.currentTimeMillis(); + I2NPMessage msg = createMessage(type); + if (msg == null) + throw new I2NPMessageException("The type "+ type + " is an unknown I2NP message"); + try { + _lastSize = msg.readBytes(data, type, cur); + cur += _lastSize; + } catch (IOException ioe) { + throw ioe; + } catch (I2NPMessageException ime) { + throw ime; + } catch (Exception e) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Error reading the stream", e); + throw new IOException("Unknown error reading the " + msg.getClass().getName() + + ": " + e.getMessage()); + } + _lastReadEnd = System.currentTimeMillis(); + _lastRead = msg; + return cur - offset; + } + public long getLastReadTime() { return _lastReadEnd - _lastReadBegin; } public int getLastSize() { return _lastSize; } diff --git a/router/java/src/net/i2p/data/i2np/I2NPMessageImpl.java b/router/java/src/net/i2p/data/i2np/I2NPMessageImpl.java index dcf55eb56..aef8c7c24 100644 --- a/router/java/src/net/i2p/data/i2np/I2NPMessageImpl.java +++ b/router/java/src/net/i2p/data/i2np/I2NPMessageImpl.java @@ -15,6 +15,7 @@ import java.io.OutputStream; import java.util.Date; import net.i2p.I2PAppContext; +import net.i2p.crypto.SHA256EntryCache; import net.i2p.data.DataFormatException; import net.i2p.data.DataHelper; import net.i2p.data.DataStructureImpl; @@ -74,8 +75,11 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM cur += numRead; } - Hash calc = _context.sha().calculateHash(buffer, 0, size); - if (!calc.equals(h)) + SHA256EntryCache.CacheEntry cache = _context.sha().cache().acquire(size); + Hash calc = _context.sha().calculateHash(buffer, 0, size, cache); + boolean eq = calc.equals(h); + _context.sha().cache().release(cache); + if (!eq) throw new I2NPMessageException("Hash does not match"); long start = _context.clock().now(); @@ -90,6 +94,46 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM throw new I2NPMessageException("Error reading the message header", dfe); } } + public int readBytes(byte data[], int type, int offset) throws I2NPMessageException, IOException { + if (type < 0) { + type = (int)DataHelper.fromLong(data, offset, 1); + offset++; + } + _uniqueId = DataHelper.fromLong(data, offset, 4); + offset += 4; + _expiration = DataHelper.fromDate(data, offset); + offset += DataHelper.DATE_LENGTH; + int size = (int)DataHelper.fromLong(data, offset, 2); + offset += 2; + Hash h = new Hash(); + byte hdata[] = new byte[Hash.HASH_LENGTH]; + System.arraycopy(data, offset, hdata, 0, Hash.HASH_LENGTH); + offset += Hash.HASH_LENGTH; + h.setData(hdata); + + if (offset + size > data.length) + throw new I2NPMessageException("Payload is too short [" + + "data.len=" + data.length + + " offset=" + offset + + " wanted=" + size + "]"); + + SHA256EntryCache.CacheEntry cache = _context.sha().cache().acquire(size); + Hash calc = _context.sha().calculateHash(data, offset, size, cache); + boolean eq = calc.equals(h); + _context.sha().cache().release(cache); + if (!eq) + throw new I2NPMessageException("Hash does not match"); + + long start = _context.clock().now(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Reading bytes: type = " + type + " / uniqueId : " + _uniqueId + " / expiration : " + _expiration); + readMessage(data, offset, size, type); + long time = _context.clock().now() - start; + if (time > 50) + _context.statManager().addRateData("i2np.readTime", time, time); + return size + Hash.HASH_LENGTH + 1 + 4 + DataHelper.DATE_LENGTH; + } + public void writeBytes(OutputStream out) throws DataFormatException, IOException { int size = getMessageSize(); if (size < 47) throw new DataFormatException("Unable to build the message"); diff --git a/router/java/src/net/i2p/data/i2np/I2NPMessageReader.java b/router/java/src/net/i2p/data/i2np/I2NPMessageReader.java index e5b8279a2..602853b03 100644 --- a/router/java/src/net/i2p/data/i2np/I2NPMessageReader.java +++ b/router/java/src/net/i2p/data/i2np/I2NPMessageReader.java @@ -117,12 +117,14 @@ public class I2NPMessageReader { while (!_context.throttle().acceptNetworkMessage()) { try { Thread.sleep(500 + _context.random().nextInt(512)); } catch (InterruptedException ie) {} } + // do read try { I2NPMessage msg = _handler.readMessage(_stream); if (msg != null) { long msToRead = _handler.getLastReadTime(); int bytesRead = _handler.getLastSize(); + msToRead += injectLag(bytesRead); _listener.messageReceived(I2NPMessageReader.this, msg, msToRead, bytesRead); } } catch (I2NPMessageException ime) { @@ -145,5 +147,33 @@ public class I2NPMessageReader { } // boom bye bye bad bwoy } + + private final long injectLag(int size) { + if (true) { + return 0; + } else { + boolean shouldLag = _context.random().nextInt(1000) > size; + long readLag = getReadLag(); + if (readLag > 0) { + long lag = _context.random().nextLong(readLag); + if (lag > 0) { + try { Thread.sleep(lag); } catch (InterruptedException ie) {} + return lag; + } else { + return 0; + } + } else { + return 0; + } + } + } + + private final long getReadLag() { + try { + return Long.parseLong(_context.getProperty("router.injectLagMs", "0")); + } catch (NumberFormatException nfe) { + return 0; + } + } } } diff --git a/router/java/src/net/i2p/router/MessageValidator.java b/router/java/src/net/i2p/router/MessageValidator.java index e9f17b968..27fe1f5e5 100644 --- a/router/java/src/net/i2p/router/MessageValidator.java +++ b/router/java/src/net/i2p/router/MessageValidator.java @@ -34,7 +34,7 @@ public class MessageValidator { public MessageValidator(RouterContext context) { _log = context.logManager().getLog(MessageValidator.class); _receivedIdExpirations = new TreeMap(); - _receivedIds = new HashSet(32*1024); + _receivedIds = new HashSet(256); _receivedIdLock = new Object(); _context = context; } @@ -108,12 +108,16 @@ public class MessageValidator { * */ private void locked_cleanReceivedIds(long now) { - Set toRemoveIds = new HashSet(4); - Set toRemoveDates = new HashSet(4); + Set toRemoveIds = null; + Set toRemoveDates = null; for (Iterator iter = _receivedIdExpirations.keySet().iterator(); iter.hasNext(); ) { Long date = (Long)iter.next(); if (date.longValue() <= now) { // no need to keep track of things in the past + if (toRemoveIds == null) { + toRemoveIds = new HashSet(2); + toRemoveDates = new HashSet(2); + } toRemoveDates.add(date); toRemoveIds.add(_receivedIdExpirations.get(date)); } else { @@ -122,12 +126,16 @@ public class MessageValidator { break; } } - for (Iterator iter = toRemoveDates.iterator(); iter.hasNext(); ) - _receivedIdExpirations.remove(iter.next()); - for (Iterator iter = toRemoveIds.iterator(); iter.hasNext(); ) - _receivedIds.remove(iter.next()); - if (_log.shouldLog(Log.INFO)) - _log.info("Cleaned out " + toRemoveDates.size() + " expired messageIds, leaving " + _receivedIds.size() + " remaining"); + if (toRemoveIds != null) { + for (Iterator iter = toRemoveDates.iterator(); iter.hasNext(); ) + _receivedIdExpirations.remove(iter.next()); + for (Iterator iter = toRemoveIds.iterator(); iter.hasNext(); ) + _receivedIds.remove(iter.next()); + if (_log.shouldLog(Log.INFO)) + _log.info("Cleaned out " + toRemoveDates.size() + + " expired messageIds, leaving " + + _receivedIds.size() + " remaining"); + } } void shutdown() { diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 2b5d0e2ed..7394a8944 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.64 $ $Date: 2004/10/29 21:40:52 $"; + public final static String ID = "$Revision: 1.65 $ $Date: 2004/10/30 18:44:01 $"; public final static String VERSION = "0.4.1.3"; - public final static long BUILD = 5; + public final static long BUILD = 6; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/message/HandleGarlicMessageJob.java b/router/java/src/net/i2p/router/message/HandleGarlicMessageJob.java index 0b5ee85fe..e56bca34f 100644 --- a/router/java/src/net/i2p/router/message/HandleGarlicMessageJob.java +++ b/router/java/src/net/i2p/router/message/HandleGarlicMessageJob.java @@ -166,9 +166,11 @@ public class HandleGarlicMessageJob extends JobImpl { return; } long sendExpiration = clove.getExpiration().getTime(); + // if the clove targets something remote, tunnel route it + boolean sendDirect = false; _handler.handleMessage(clove.getInstructions(), clove.getData(), clove.getCloveId(), _from, _fromHash, - sendExpiration, FORWARD_PRIORITY); + sendExpiration, FORWARD_PRIORITY, sendDirect); } public void dropped() { diff --git a/router/java/src/net/i2p/router/message/HandleTunnelMessageJob.java b/router/java/src/net/i2p/router/message/HandleTunnelMessageJob.java index 4d2b4d207..6727ecd5d 100644 --- a/router/java/src/net/i2p/router/message/HandleTunnelMessageJob.java +++ b/router/java/src/net/i2p/router/message/HandleTunnelMessageJob.java @@ -11,6 +11,7 @@ package net.i2p.router.message; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.List; import net.i2p.data.DataFormatException; import net.i2p.data.DataHelper; @@ -37,6 +38,7 @@ import net.i2p.router.ReplyJob; import net.i2p.router.Router; import net.i2p.router.RouterContext; import net.i2p.router.TunnelInfo; +import net.i2p.router.TunnelSelectionCriteria; import net.i2p.util.Log; public class HandleTunnelMessageJob extends JobImpl { @@ -333,20 +335,57 @@ public class HandleTunnelMessageJob extends JobImpl { if (_log.shouldLog(Log.DEBUG)) _log.debug("Sending on to requested tunnel " + id.getTunnelId() + " on router " + router.toBase64()); - TunnelMessage msg = new TunnelMessage(getContext()); - msg.setTunnelId(id); - msg.setData(body.toByteArray()); - getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), msg, router, FORWARD_TIMEOUT, FORWARD_PRIORITY)); - String bodyType = body.getClass().getName(); - getContext().messageHistory().wrap(bodyType, body.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId()); + int timeoutMs = (int)(body.getMessageExpiration().getTime() - getContext().clock().now()); + if (timeoutMs < 5000) + timeoutMs = FORWARD_TIMEOUT; + + TunnelInfo curInfo = getContext().tunnelManager().getTunnelInfo(_message.getTunnelId()); + if (curInfo.getTunnelId().getType() != TunnelId.TYPE_INBOUND) { + // we are not processing a request at the end of an inbound tunnel, so + // there's no reason to hide our location. honor the request directly + + TunnelMessage msg = new TunnelMessage(getContext()); + msg.setTunnelId(id); + msg.setData(body.toByteArray()); + + getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), msg, router, timeoutMs, FORWARD_PRIORITY)); + + String bodyType = body.getClass().getName(); + getContext().messageHistory().wrap(bodyType, body.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId()); + } else { + // the instructions request that we forward a message remotely from + // the hidden location. honor it by sending it out a tunnel + TunnelId outTunnelId = selectOutboundTunnelId(); + if (outTunnelId == null) { + if (_log.shouldLog(Log.WARN)) + _log.warn("No outbound tunnels available to forward the message, dropping it"); + return; + } + getContext().jobQueue().addJob(new SendTunnelMessageJob(getContext(), body, outTunnelId, router, id, + null, null, null, null, timeoutMs, FORWARD_PRIORITY)); + } + } + + private TunnelId selectOutboundTunnelId() { + TunnelSelectionCriteria criteria = new TunnelSelectionCriteria(); + criteria.setMinimumTunnelsRequired(1); + criteria.setMaximumTunnelsRequired(1); + List ids = getContext().tunnelManager().selectOutboundTunnelIds(criteria); + if ( (ids == null) || (ids.size() <= 0) ) + return null; + else + return (TunnelId)ids.get(0); } private void sendToRouter(Hash router, I2NPMessage body) { // TODO: we may want to send it via a tunnel later on, but for now, direct will do. if (_log.shouldLog(Log.DEBUG)) _log.debug("Sending on to requested router " + router.toBase64()); - getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), body, router, FORWARD_TIMEOUT, FORWARD_PRIORITY)); + int timeoutMs = (int)(body.getMessageExpiration().getTime() - getContext().clock().now()); + if (timeoutMs < 5000) + timeoutMs = FORWARD_TIMEOUT; + getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), body, router, timeoutMs, FORWARD_PRIORITY)); } private void sendToLocal(I2NPMessage body) { @@ -390,7 +429,7 @@ public class HandleTunnelMessageJob extends JobImpl { private I2NPMessage getBody(byte body[]) { try { - return _handler.readMessage(new ByteArrayInputStream(body)); + return _handler.readMessage(body); // new ByteArrayInputStream(body)); } catch (I2NPMessageException ime) { if (_log.shouldLog(Log.ERROR)) _log.error("Error parsing the message body", ime); diff --git a/router/java/src/net/i2p/router/message/MessageHandler.java b/router/java/src/net/i2p/router/message/MessageHandler.java index 27a968b6c..08c7b2843 100644 --- a/router/java/src/net/i2p/router/message/MessageHandler.java +++ b/router/java/src/net/i2p/router/message/MessageHandler.java @@ -10,6 +10,8 @@ package net.i2p.router.message; import java.io.ByteArrayOutputStream; +import java.util.List; + import net.i2p.data.Hash; import net.i2p.data.Payload; import net.i2p.data.RouterIdentity; @@ -22,6 +24,7 @@ import net.i2p.router.ClientMessage; import net.i2p.router.InNetMessage; import net.i2p.router.MessageReceptionInfo; import net.i2p.router.RouterContext; +import net.i2p.router.TunnelSelectionCriteria; import net.i2p.util.Log; /** @@ -40,7 +43,7 @@ class MessageHandler { public void handleMessage(DeliveryInstructions instructions, I2NPMessage message, long replyId, RouterIdentity from, Hash fromHash, - long expiration, int priority) { + long expiration, int priority, boolean sendDirect) { switch (instructions.getDeliveryMode()) { case DeliveryInstructions.DELIVERY_MODE_LOCAL: _log.debug("Instructions for LOCAL DELIVERY"); @@ -75,7 +78,7 @@ class MessageHandler { _log.debug("Instructions for TUNNEL DELIVERY to" + instructions.getTunnelId().getTunnelId() + " on " + instructions.getRouter().toBase64()); - handleTunnel(instructions, expiration, message, priority); + handleTunnel(instructions, expiration, message, priority, sendDirect); break; default: _log.error("Message has instructions that are not yet implemented: mode = " + instructions.getDeliveryMode()); @@ -111,7 +114,7 @@ class MessageHandler { _context.jobQueue().addJob(j); } - private void handleTunnel(DeliveryInstructions instructions, long expiration, I2NPMessage message, int priority) { + private void handleTunnel(DeliveryInstructions instructions, long expiration, I2NPMessage message, int priority, boolean direct) { Hash to = instructions.getRouter(); long timeoutMs = expiration - _context.clock().now(); TunnelId tunnelId = instructions.getTunnelId(); @@ -131,20 +134,44 @@ class MessageHandler { } } - if (_log.shouldLog(Log.INFO)) - _log.info("Handle " + message.getClass().getName() + " to send to remote tunnel " - + tunnelId.getTunnelId() + " on router " + to.toBase64()); - TunnelMessage msg = new TunnelMessage(_context); - msg.setData(message.toByteArray()); - msg.setTunnelId(tunnelId); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Placing message of type " + message.getClass().getName() - + " into the new tunnel message bound for " + tunnelId.getTunnelId() - + " on " + to.toBase64()); - _context.jobQueue().addJob(new SendMessageDirectJob(_context, msg, to, (int)timeoutMs, priority)); - - String bodyType = message.getClass().getName(); - _context.messageHistory().wrap(bodyType, message.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId()); + if (direct) { + if (_log.shouldLog(Log.INFO)) + _log.info("Handle " + message.getClass().getName() + " to send to remote tunnel " + + tunnelId.getTunnelId() + " on router " + to.toBase64()); + TunnelMessage msg = new TunnelMessage(_context); + msg.setData(message.toByteArray()); + msg.setTunnelId(tunnelId); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Placing message of type " + message.getClass().getName() + + " into the new tunnel message bound for " + tunnelId.getTunnelId() + + " on " + to.toBase64()); + _context.jobQueue().addJob(new SendMessageDirectJob(_context, msg, to, (int)timeoutMs, priority)); + + String bodyType = message.getClass().getName(); + _context.messageHistory().wrap(bodyType, message.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId()); + } else { + // we received a message with instructions to send it somewhere, but we shouldn't + // expose where we are in the process of honoring it. so, send it out a tunnel + TunnelId outTunnelId = selectOutboundTunnelId(); + if (outTunnelId == null) { + if (_log.shouldLog(Log.WARN)) + _log.warn("No outbound tunnels available to forward the message, dropping it"); + return; + } + _context.jobQueue().addJob(new SendTunnelMessageJob(_context, message, outTunnelId, to, tunnelId, + null, null, null, null, timeoutMs, priority)); + } + } + + private TunnelId selectOutboundTunnelId() { + TunnelSelectionCriteria criteria = new TunnelSelectionCriteria(); + criteria.setMinimumTunnelsRequired(1); + criteria.setMaximumTunnelsRequired(1); + List ids = _context.tunnelManager().selectOutboundTunnelIds(criteria); + if ( (ids == null) || (ids.size() <= 0) ) + return null; + else + return (TunnelId)ids.get(0); } private void handleLocalDestination(DeliveryInstructions instructions, I2NPMessage message, Hash fromHash) { diff --git a/router/java/src/net/i2p/router/tunnelmanager/TestTunnelJob.java b/router/java/src/net/i2p/router/tunnelmanager/TestTunnelJob.java index bd04844c3..599d6e913 100644 --- a/router/java/src/net/i2p/router/tunnelmanager/TestTunnelJob.java +++ b/router/java/src/net/i2p/router/tunnelmanager/TestTunnelJob.java @@ -75,6 +75,7 @@ class TestTunnelJob extends JobImpl { private final static long DEFAULT_TEST_TIMEOUT = 10*1000; // 10 seconds for a test to succeed private final static long DEFAULT_MINIMUM_TEST_TIMEOUT = 5*1000; // 5 second min + private final static long MAXIMUM_TEST_TIMEOUT = 60*1000; // 60 second max private final static int TEST_PRIORITY = 100; /** @@ -86,7 +87,7 @@ class TestTunnelJob extends JobImpl { if (rs != null) { Rate r = rs.getRate(10*60*1000); if (r != null) { - if (r.getLifetimeEventCount() > 0) { + if (r.getLifetimeEventCount() > 10) { if (r.getLastEventCount() <= 0) rv = (long)(r.getLifetimeAverageValue() * getTunnelTestDeviationLimit()); else @@ -94,9 +95,28 @@ class TestTunnelJob extends JobImpl { } } } - long min = getMinimumTestTimeout(); - if (rv < min) - rv = min; + + // lets back off if we're failing + rs = getContext().statManager().getRate("tunnel.failAfterTime"); + if (rs != null) { + Rate r = rs.getRate(5*60*1000); + if (r != null) { + long failures = r.getLastEventCount() + r.getCurrentEventCount(); + if (failures > 0) { + rv <<= failures; + if (_log.shouldLog(Log.WARN)) + _log.warn("Tunnels are failing (" + failures + "), so set the timeout to " + rv); + } + } + } + + if (rv > MAXIMUM_TEST_TIMEOUT) { + rv = MAXIMUM_TEST_TIMEOUT; + } else { + long min = getMinimumTestTimeout(); + if (rv < min) + rv = min; + } return rv; } @@ -143,10 +163,11 @@ class TestTunnelJob extends JobImpl { TunnelInfo inboundInfo = _pool.getTunnelInfo(_secondaryId); inboundInfo.setLastTested(getContext().clock().now()); - + + long timeout = getTunnelTestTimeout(); TestFailedJob failureJob = new TestFailedJob(); - MessageSelector selector = new TestMessageSelector(msg.getMessageId(), info.getTunnelId().getTunnelId()); - SendTunnelMessageJob testJob = new SendTunnelMessageJob(getContext(), msg, info.getTunnelId(), us, _secondaryId, null, new TestSuccessfulJob(), failureJob, selector, getTunnelTestTimeout(), TEST_PRIORITY); + MessageSelector selector = new TestMessageSelector(msg.getMessageId(), info.getTunnelId().getTunnelId(), timeout); + SendTunnelMessageJob testJob = new SendTunnelMessageJob(getContext(), msg, info.getTunnelId(), us, _secondaryId, null, new TestSuccessfulJob(timeout), failureJob, selector, timeout, TEST_PRIORITY); getContext().jobQueue().addJob(testJob); } @@ -169,9 +190,10 @@ class TestTunnelJob extends JobImpl { TunnelInfo outboundInfo = _pool.getTunnelInfo(_secondaryId); outboundInfo.setLastTested(getContext().clock().now()); + long timeout = getTunnelTestTimeout(); TestFailedJob failureJob = new TestFailedJob(); - MessageSelector selector = new TestMessageSelector(msg.getMessageId(), info.getTunnelId().getTunnelId()); - SendTunnelMessageJob j = new SendTunnelMessageJob(getContext(), msg, _secondaryId, info.getThisHop(), info.getTunnelId(), null, new TestSuccessfulJob(), failureJob, selector, getTunnelTestTimeout(), TEST_PRIORITY); + MessageSelector selector = new TestMessageSelector(msg.getMessageId(), info.getTunnelId().getTunnelId(), timeout); + SendTunnelMessageJob j = new SendTunnelMessageJob(getContext(), msg, _secondaryId, info.getThisHop(), info.getTunnelId(), null, new TestSuccessfulJob(timeout), failureJob, selector, timeout, TEST_PRIORITY); getContext().jobQueue().addJob(j); } @@ -255,9 +277,11 @@ class TestTunnelJob extends JobImpl { private class TestSuccessfulJob extends JobImpl implements ReplyJob { private DeliveryStatusMessage _msg; - public TestSuccessfulJob() { + private long _timeout; + public TestSuccessfulJob(long timeout) { super(TestTunnelJob.this.getContext()); _msg = null; + _timeout = timeout; } public String getName() { return "Tunnel Test Successful"; } @@ -267,7 +291,7 @@ class TestTunnelJob extends JobImpl { _log.info("Test of tunnel " + _primaryId+ " successfull after " + time + "ms waiting for " + _nonce); - if (time > getTunnelTestTimeout()) { + if (time > _timeout) { return; // the test failed job should already have run } @@ -305,11 +329,11 @@ class TestTunnelJob extends JobImpl { private long _tunnelId; private boolean _found; private long _expiration; - public TestMessageSelector(long id, long tunnelId) { + public TestMessageSelector(long id, long tunnelId, long timeoutMs) { _id = id; _tunnelId = tunnelId; _found = false; - _expiration = getContext().clock().now() + getTunnelTestTimeout(); + _expiration = getContext().clock().now() + timeoutMs; if (_log.shouldLog(Log.DEBUG)) _log.debug("the expiration while testing tunnel " + tunnelId + " waiting for nonce " + id + ": " + new Date(_expiration));