+ Inbound TCP connection configuration:
+ Externally reachable hostname or IP address:
+ " />
+ (dyndns and the like are fine)
+ Externally reachable TCP port:
+ " />
+
You do not need to allow inbound TCP connections - outbound connections work with no
+ configuration. However, if you want to receive inbound TCP connections, you must poke a hole
+ in your NAT or firewall for unsolicited TCP connections. If you specify the wrong IP address or
+ hostname, or do not properly configure your NAT or firewall, your network performance will degrade
+ substantially. When in doubt, leave the hostname and port number blank.
+
Note: changing this setting will terminate all of your connections and effectively
+ restart your router.
+
Dynamic Router Keys:
/>
diff --git a/core/java/src/gnu/crypto/hash/BaseHashStandalone.java b/core/java/src/gnu/crypto/hash/BaseHashStandalone.java
new file mode 100644
index 0000000000..6cfab4cc0f
--- /dev/null
+++ b/core/java/src/gnu/crypto/hash/BaseHashStandalone.java
@@ -0,0 +1,198 @@
+package gnu.crypto.hash;
+
+// ----------------------------------------------------------------------------
+// $Id: BaseHashStandalone.java,v 1.1 2006/02/26 16:30:59 jrandom Exp $
+//
+// Copyright (C) 2001, 2002, Free Software Foundation, Inc.
+//
+// This file is part of GNU Crypto.
+//
+// GNU Crypto is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2, or (at your option)
+// any later version.
+//
+// GNU Crypto is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; see the file COPYING. If not, write to the
+//
+// Free Software Foundation Inc.,
+// 51 Franklin Street, Fifth Floor,
+// Boston, MA 02110-1301
+// USA
+//
+// Linking this library statically or dynamically with other modules is
+// making a combined work based on this library. Thus, the terms and
+// conditions of the GNU General Public License cover the whole
+// combination.
+//
+// As a special exception, the copyright holders of this library give
+// you permission to link this library with independent modules to
+// produce an executable, regardless of the license terms of these
+// independent modules, and to copy and distribute the resulting
+// executable under terms of your choice, provided that you also meet,
+// for each linked independent module, the terms and conditions of the
+// license of that module. An independent module is a module which is
+// not derived from or based on this library. If you modify this
+// library, you may extend this exception to your version of the
+// library, but you are not obligated to do so. If you do not wish to
+// do so, delete this exception statement from your version.
+// ----------------------------------------------------------------------------
+
+/**
+ *
A base abstract class to facilitate hash implementations.
+ *
+ * @version $Revision: 1.1 $
+ */
+public abstract class BaseHashStandalone implements IMessageDigestStandalone {
+
+ // Constants and variables
+ // -------------------------------------------------------------------------
+
+ /** The canonical name prefix of the hash. */
+ protected String name;
+
+ /** The hash (output) size in bytes. */
+ protected int hashSize;
+
+ /** The hash (inner) block size in bytes. */
+ protected int blockSize;
+
+ /** Number of bytes processed so far. */
+ protected long count;
+
+ /** Temporary input buffer. */
+ protected byte[] buffer;
+
+ // Constructor(s)
+ // -------------------------------------------------------------------------
+
+ /**
+ *
Trivial constructor for use by concrete subclasses.
+ *
+ * @param name the canonical name prefix of this instance.
+ * @param hashSize the block size of the output in bytes.
+ * @param blockSize the block size of the internal transform.
+ */
+ protected BaseHashStandalone(String name, int hashSize, int blockSize) {
+ super();
+
+ this.name = name;
+ this.hashSize = hashSize;
+ this.blockSize = blockSize;
+ this.buffer = new byte[blockSize];
+
+ resetContext();
+ }
+
+ // Class methods
+ // -------------------------------------------------------------------------
+
+ // Instance methods
+ // -------------------------------------------------------------------------
+
+ // IMessageDigestStandalone interface implementation ---------------------------------
+
+ public String name() {
+ return name;
+ }
+
+ public int hashSize() {
+ return hashSize;
+ }
+
+ public int blockSize() {
+ return blockSize;
+ }
+
+ public void update(byte b) {
+ // compute number of bytes still unhashed; ie. present in buffer
+ int i = (int)(count % blockSize);
+ count++;
+ buffer[i] = b;
+ if (i == (blockSize - 1)) {
+ transform(buffer, 0);
+ }
+ }
+
+ public void update(byte[] b) {
+ update(b, 0, b.length);
+ }
+
+ public void update(byte[] b, int offset, int len) {
+ int n = (int)(count % blockSize);
+ count += len;
+ int partLen = blockSize - n;
+ int i = 0;
+
+ if (len >= partLen) {
+ System.arraycopy(b, offset, buffer, n, partLen);
+ transform(buffer, 0);
+ for (i = partLen; i + blockSize - 1 < len; i+= blockSize) {
+ transform(b, offset + i);
+ }
+ n = 0;
+ }
+
+ if (i < len) {
+ System.arraycopy(b, offset + i, buffer, n, len - i);
+ }
+ }
+
+ public byte[] digest() {
+ byte[] tail = padBuffer(); // pad remaining bytes in buffer
+ update(tail, 0, tail.length); // last transform of a message
+ byte[] result = getResult(); // make a result out of context
+
+ reset(); // reset this instance for future re-use
+
+ return result;
+ }
+
+ public void reset() { // reset this instance for future re-use
+ count = 0L;
+ for (int i = 0; i < blockSize; ) {
+ buffer[i++] = 0;
+ }
+
+ resetContext();
+ }
+
+ // methods to be implemented by concrete subclasses ------------------------
+
+ public abstract Object clone();
+
+ public abstract boolean selfTest();
+
+ /**
+ *
Returns the byte array to use as padding before completing a hash
+ * operation.
+ *
+ * @return the bytes to pad the remaining bytes in the buffer before
+ * completing a hash operation.
+ */
+ protected abstract byte[] padBuffer();
+
+ /**
+ *
Constructs the result from the contents of the current context.
+ *
+ * @return the output of the completed hash operation.
+ */
+ protected abstract byte[] getResult();
+
+ /** Resets the instance for future re-use. */
+ protected abstract void resetContext();
+
+ /**
+ *
The block digest transformation per se.
+ *
+ * @param in the blockSize long block, as an array of bytes to digest.
+ * @param offset the index where the data to digest is located within the
+ * input buffer.
+ */
+ protected abstract void transform(byte[] in, int offset);
+}
diff --git a/core/java/src/gnu/crypto/hash/IMessageDigestStandalone.java b/core/java/src/gnu/crypto/hash/IMessageDigestStandalone.java
new file mode 100644
index 0000000000..79fbab2020
--- /dev/null
+++ b/core/java/src/gnu/crypto/hash/IMessageDigestStandalone.java
@@ -0,0 +1,141 @@
+package gnu.crypto.hash;
+
+// ----------------------------------------------------------------------------
+// $Id: IMessageDigestStandalone.java,v 1.1 2006/02/26 16:30:59 jrandom Exp $
+//
+// Copyright (C) 2001, 2002, Free Software Foundation, Inc.
+//
+// This file is part of GNU Crypto.
+//
+// GNU Crypto is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2, or (at your option)
+// any later version.
+//
+// GNU Crypto is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; see the file COPYING. If not, write to the
+//
+// Free Software Foundation Inc.,
+// 51 Franklin Street, Fifth Floor,
+// Boston, MA 02110-1301
+// USA
+//
+// Linking this library statically or dynamically with other modules is
+// making a combined work based on this library. Thus, the terms and
+// conditions of the GNU General Public License cover the whole
+// combination.
+//
+// As a special exception, the copyright holders of this library give
+// you permission to link this library with independent modules to
+// produce an executable, regardless of the license terms of these
+// independent modules, and to copy and distribute the resulting
+// executable under terms of your choice, provided that you also meet,
+// for each linked independent module, the terms and conditions of the
+// license of that module. An independent module is a module which is
+// not derived from or based on this library. If you modify this
+// library, you may extend this exception to your version of the
+// library, but you are not obligated to do so. If you do not wish to
+// do so, delete this exception statement from your version.
+// ----------------------------------------------------------------------------
+
+/**
+ *
The basic visible methods of any hash algorithm.
+ *
+ *
A hash (or message digest) algorithm produces its output by iterating a
+ * basic compression function on blocks of data.
+ *
+ * @return the canonical name of this instance.
+ */
+ String name();
+
+ /**
+ *
Returns the output length in bytes of this message digest algorithm.
+ *
+ * @return the output length in bytes of this message digest algorithm.
+ */
+ int hashSize();
+
+ /**
+ *
Returns the algorithm's (inner) block size in bytes.
+ *
+ * @return the algorithm's inner block size in bytes.
+ */
+ int blockSize();
+
+ /**
+ *
Continues a message digest operation using the input byte.
+ *
+ * @param b the input byte to digest.
+ */
+ void update(byte b);
+
+ /**
+ *
Continues a message digest operation, by filling the buffer, processing
+ * data in the algorithm's HASH_SIZE-bit block(s), updating the context and
+ * count, and buffering the remaining bytes in buffer for the next
+ * operation.
+ *
+ * @param in the input block.
+ */
+ void update(byte[] in);
+
+ /**
+ *
Continues a message digest operation, by filling the buffer, processing
+ * data in the algorithm's HASH_SIZE-bit block(s), updating the context and
+ * count, and buffering the remaining bytes in buffer for the next
+ * operation.
+ *
+ * @param in the input block.
+ * @param offset start of meaningful bytes in input block.
+ * @param length number of bytes, in input block, to consider.
+ */
+ void update(byte[] in, int offset, int length);
+
+ /**
+ *
Completes the message digest by performing final operations such as
+ * padding and resetting the instance.
+ *
+ * @return the array of bytes representing the hash value.
+ */
+ byte[] digest();
+
+ /**
+ *
Resets the current context of this instance clearing any eventually cached
+ * intermediary values.
+ */
+ void reset();
+
+ /**
+ *
A basic test. Ensures that the digest of a pre-determined message is equal
+ * to a known pre-computed value.
+ *
+ * @return true if the implementation passes a basic self-test.
+ * Returns false otherwise.
+ */
+ boolean selfTest();
+
+ /**
+ *
Returns a clone copy of this instance.
+ *
+ * @return a clone copy of this instance.
+ */
+ Object clone();
+}
diff --git a/core/java/src/gnu/crypto/hash/Sha256Standalone.java b/core/java/src/gnu/crypto/hash/Sha256Standalone.java
index a42742e43e..85d5cb97e0 100644
--- a/core/java/src/gnu/crypto/hash/Sha256Standalone.java
+++ b/core/java/src/gnu/crypto/hash/Sha256Standalone.java
@@ -1,7 +1,7 @@
package gnu.crypto.hash;
// ----------------------------------------------------------------------------
-// $Id: Sha256Standalone.java,v 1.1 2006/02/26 16:30:59 jrandom Exp $
+// $Id: Sha256Standalone.java,v 1.2 2006/03/16 16:45:19 jrandom Exp $
//
// Copyright (C) 2003 Free Software Foundation, Inc.
//
@@ -59,9 +59,9 @@ package gnu.crypto.hash;
* renamed from Sha256 to avoid conflicts with JVMs using gnu-crypto as their JCE
* provider.
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
-public class Sha256Standalone extends BaseHash {
+public class Sha256Standalone extends BaseHashStandalone {
// Constants and variables
// -------------------------------------------------------------------------
private static final int[] k = {
@@ -143,7 +143,7 @@ public class Sha256Standalone extends BaseHash {
return new Sha256Standalone(this);
}
- // Implementation of concrete methods in BaseHash --------------------------
+ // Implementation of concrete methods in BaseHashStandalone --------------------------
private int transformResult[] = new int[8];
protected void transform(byte[] in, int offset) {
diff --git a/core/java/src/gnu/crypto/prng/AsyncFortunaStandalone.java b/core/java/src/gnu/crypto/prng/AsyncFortunaStandalone.java
new file mode 100644
index 0000000000..9b3031e8c9
--- /dev/null
+++ b/core/java/src/gnu/crypto/prng/AsyncFortunaStandalone.java
@@ -0,0 +1,172 @@
+package gnu.crypto.prng;
+
+import java.util.*;
+
+/**
+ * fortuna instance that tries to avoid blocking if at all possible by using separate
+ * filled buffer segments rather than one buffer (and blocking when that buffer's data
+ * has been eaten)
+ */
+public class AsyncFortunaStandalone extends FortunaStandalone implements Runnable {
+ private static final int BUFFERS = 16;
+ private static final int BUFSIZE = 256*1024;
+ private final byte asyncBuffers[][] = new byte[BUFFERS][BUFSIZE];
+ private final int status[] = new int[BUFFERS];
+ private int nextBuf = 0;
+
+ private static final int STATUS_NEED_FILL = 0;
+ private static final int STATUS_FILLING = 1;
+ private static final int STATUS_FILLED = 2;
+ private static final int STATUS_LIVE = 3;
+
+ public AsyncFortunaStandalone() {
+ super();
+ for (int i = 0; i < BUFFERS; i++)
+ status[i] = STATUS_NEED_FILL;
+ }
+
+ public void startup() {
+ Thread refillThread = new Thread(this, "PRNG");
+ refillThread.setDaemon(true);
+ refillThread.setPriority(Thread.MIN_PRIORITY+1);
+ refillThread.start();
+ }
+
+ /** the seed is only propogated once the prng is started with startup() */
+ public void seed(byte val[]) {
+ Map props = new HashMap(1);
+ props.put(SEED, (Object)val);
+ init(props);
+ //fillBlock();
+ }
+
+ protected void allocBuffer() {}
+
+ /**
+ * make the next available filled buffer current, scheduling any unfilled
+ * buffers for refill, and blocking until at least one buffer is ready
+ */
+ protected void rotateBuffer() {
+ synchronized (asyncBuffers) {
+ // wait until we get some filled
+ long before = System.currentTimeMillis();
+ long waited = 0;
+ while (status[nextBuf] != STATUS_FILLED) {
+ System.out.println(Thread.currentThread().getName() + ": Next PRNG buffer "
+ + nextBuf + " isn't ready (" + status[nextBuf] + ")");
+ //new Exception("source").printStackTrace();
+ asyncBuffers.notifyAll();
+ try {
+ asyncBuffers.wait();
+ } catch (InterruptedException ie) {}
+ waited = System.currentTimeMillis()-before;
+ }
+ if (waited > 0)
+ System.out.println(Thread.currentThread().getName() + ": Took " + waited
+ + "ms for a full PRNG buffer to be found");
+ //System.out.println(Thread.currentThread().getName() + ": Switching to prng buffer " + nextBuf);
+ buffer = asyncBuffers[nextBuf];
+ status[nextBuf] = STATUS_LIVE;
+ int prev=nextBuf-1;
+ if (prev<0)
+ prev = BUFFERS-1;
+ if (status[prev] == STATUS_LIVE)
+ status[prev] = STATUS_NEED_FILL;
+ nextBuf++;
+ if (nextBuf >= BUFFERS)
+ nextBuf = 0;
+ asyncBuffers.notify();
+ }
+ }
+
+ public void run() {
+ while (true) {
+ int toFill = -1;
+ try {
+ synchronized (asyncBuffers) {
+ for (int i = 0; i < BUFFERS; i++) {
+ if (status[i] == STATUS_NEED_FILL) {
+ status[i] = STATUS_FILLING;
+ toFill = i;
+ break;
+ }
+ }
+ if (toFill == -1) {
+ //System.out.println(Thread.currentThread().getName() + ": All pending buffers full");
+ asyncBuffers.wait();
+ }
+ }
+ } catch (InterruptedException ie) {}
+
+ if (toFill != -1) {
+ //System.out.println(Thread.currentThread().getName() + ": Filling prng buffer " + toFill);
+ long before = System.currentTimeMillis();
+ doFill(asyncBuffers[toFill]);
+ long after = System.currentTimeMillis();
+ synchronized (asyncBuffers) {
+ status[toFill] = STATUS_FILLED;
+ //System.out.println(Thread.currentThread().getName() + ": Prng buffer " + toFill + " filled after " + (after-before));
+ asyncBuffers.notifyAll();
+ }
+ Thread.yield();
+ try { Thread.sleep((after-before)*5); } catch (InterruptedException ie) {}
+ }
+ }
+ }
+
+ public void fillBlock()
+ {
+ rotateBuffer();
+ }
+
+ private void doFill(byte buf[]) {
+ long start = System.currentTimeMillis();
+ if (pool0Count >= MIN_POOL_SIZE
+ && System.currentTimeMillis() - lastReseed > 100)
+ {
+ reseedCount++;
+ //byte[] seed = new byte[0];
+ for (int i = 0; i < NUM_POOLS; i++)
+ {
+ if (reseedCount % (1 << i) == 0) {
+ generator.addRandomBytes(pools[i].digest());
+ }
+ }
+ lastReseed = System.currentTimeMillis();
+ }
+ generator.nextBytes(buf);
+ long now = System.currentTimeMillis();
+ long diff = now-lastRefill;
+ lastRefill = now;
+ long refillTime = now-start;
+ //System.out.println("Refilling " + (++refillCount) + " after " + diff + " for the PRNG took " + refillTime);
+ }
+
+ public static void main(String args[]) {
+ try {
+ AsyncFortunaStandalone rand = new AsyncFortunaStandalone();
+
+ byte seed[] = new byte[1024];
+ rand.seed(seed);
+ System.out.println("Before starting prng");
+ rand.startup();
+ System.out.println("Starting prng, waiting 1 minute");
+ try { Thread.sleep(60*1000); } catch (InterruptedException ie) {}
+ System.out.println("PRNG started, beginning test");
+
+ long before = System.currentTimeMillis();
+ byte buf[] = new byte[1024];
+ java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream();
+ java.util.zip.GZIPOutputStream gos = new java.util.zip.GZIPOutputStream(baos);
+ for (int i = 0; i < 1024; i++) {
+ rand.nextBytes(buf);
+ gos.write(buf);
+ }
+ long after = System.currentTimeMillis();
+ gos.finish();
+ byte compressed[] = baos.toByteArray();
+ System.out.println("Compressed size of 1MB: " + compressed.length + " took " + (after-before));
+ } catch (Exception e) { e.printStackTrace(); }
+ try { Thread.sleep(5*60*1000); } catch (InterruptedException ie) {}
+ }
+}
diff --git a/core/java/src/gnu/crypto/prng/BasePRNGStandalone.java b/core/java/src/gnu/crypto/prng/BasePRNGStandalone.java
index c52f5cae71..f306c45780 100644
--- a/core/java/src/gnu/crypto/prng/BasePRNGStandalone.java
+++ b/core/java/src/gnu/crypto/prng/BasePRNGStandalone.java
@@ -49,7 +49,7 @@ import java.util.Map;
* Modified slightly by jrandom for I2P (removing unneeded exceptions)
* @version $Revision: 1.1 $
*/
-public abstract class BasePRNGStandalone implements IRandom {
+public abstract class BasePRNGStandalone implements IRandomStandalone {
// Constants and variables
// -------------------------------------------------------------------------
@@ -88,7 +88,7 @@ public abstract class BasePRNGStandalone implements IRandom {
// Instance methods
// -------------------------------------------------------------------------
- // IRandom interface implementation ----------------------------------------
+ // IRandomStandalone interface implementation ----------------------------------------
public String name() {
return name;
diff --git a/core/java/src/gnu/crypto/prng/FortunaStandalone.java b/core/java/src/gnu/crypto/prng/FortunaStandalone.java
index d6e2675865..764c042b43 100644
--- a/core/java/src/gnu/crypto/prng/FortunaStandalone.java
+++ b/core/java/src/gnu/crypto/prng/FortunaStandalone.java
@@ -97,20 +97,22 @@ import net.i2p.crypto.CryptixAESKeyCache;
* gnu-crypto implementation, which has been imported into GNU/classpath
*
*/
-public class FortunaStandalone extends BasePRNGStandalone implements Serializable, RandomEventListener
+public class FortunaStandalone extends BasePRNGStandalone implements Serializable, RandomEventListenerStandalone
{
private static final long serialVersionUID = 0xFACADE;
private static final int SEED_FILE_SIZE = 64;
- private static final int NUM_POOLS = 32;
- private static final int MIN_POOL_SIZE = 64;
- private final Generator generator;
- private final Sha256Standalone[] pools;
- private long lastReseed;
- private int pool;
- private int pool0Count;
- private int reseedCount;
+ static final int NUM_POOLS = 32;
+ static final int MIN_POOL_SIZE = 64;
+ final Generator generator;
+ final Sha256Standalone[] pools;
+ long lastReseed;
+ int pool;
+ int pool0Count;
+ int reseedCount;
+ static long refillCount = 0;
+ static long lastRefill = System.currentTimeMillis();
public static final String SEED = "gnu.crypto.prng.fortuna.seed";
@@ -124,6 +126,9 @@ public class FortunaStandalone extends BasePRNGStandalone implements Serializabl
lastReseed = 0;
pool = 0;
pool0Count = 0;
+ allocBuffer();
+ }
+ protected void allocBuffer() {
buffer = new byte[4*1024*1024]; //256]; // larger buffer to reduce churn
}
@@ -145,6 +150,7 @@ public class FortunaStandalone extends BasePRNGStandalone implements Serializabl
public void fillBlock()
{
+ long start = System.currentTimeMillis();
if (pool0Count >= MIN_POOL_SIZE
&& System.currentTimeMillis() - lastReseed > 100)
{
@@ -159,6 +165,11 @@ public class FortunaStandalone extends BasePRNGStandalone implements Serializabl
lastReseed = System.currentTimeMillis();
}
generator.nextBytes(buffer);
+ long now = System.currentTimeMillis();
+ long diff = now-lastRefill;
+ lastRefill = now;
+ long refillTime = now-start;
+ System.out.println("Refilling " + (++refillCount) + " after " + diff + " for the PRNG took " + refillTime);
}
public void addRandomByte(byte b)
@@ -177,7 +188,7 @@ public class FortunaStandalone extends BasePRNGStandalone implements Serializabl
pool = (pool + 1) % NUM_POOLS;
}
- public void addRandomEvent(RandomEvent event)
+ public void addRandomEvent(RandomEventStandalone event)
{
if (event.getPoolNumber() < 0 || event.getPoolNumber() >= pools.length)
throw new IllegalArgumentException("pool number out of range: "
@@ -338,6 +349,34 @@ public class FortunaStandalone extends BasePRNGStandalone implements Serializabl
}
public static void main(String args[]) {
+ byte in[] = new byte[16];
+ byte out[] = new byte[16];
+ byte key[] = new byte[32];
+ try {
+ CryptixAESKeyCache.KeyCacheEntry buf = CryptixAESKeyCache.createNew();
+ Object cryptixKey = CryptixRijndael_Algorithm.makeKey(key, 16, buf);
+ long beforeAll = System.currentTimeMillis();
+ for (int i = 0; i < 256; i++) {
+ //long before =System.currentTimeMillis();
+ for (int j = 0; j < 1024; j++)
+ CryptixRijndael_Algorithm.blockEncrypt(in, out, 0, 0, cryptixKey);
+ //long after = System.currentTimeMillis();
+ //System.out.println("encrypting 16KB took " + (after-before));
+ }
+ long after = System.currentTimeMillis();
+ System.out.println("encrypting 4MB took " + (after-beforeAll));
+ } catch (Exception e) { e.printStackTrace(); }
+
+ try {
+ CryptixAESKeyCache.KeyCacheEntry buf = CryptixAESKeyCache.createNew();
+ Object cryptixKey = CryptixRijndael_Algorithm.makeKey(key, 16, buf);
+ byte data[] = new byte[4*1024*1024];
+ long beforeAll = System.currentTimeMillis();
+ CryptixRijndael_Algorithm.ecbBulkEncrypt(data, data, cryptixKey);
+ long after = System.currentTimeMillis();
+ System.out.println("encrypting 4MB took " + (after-beforeAll));
+ } catch (Exception e) { e.printStackTrace(); }
+ /*
FortunaStandalone f = new FortunaStandalone();
java.util.HashMap props = new java.util.HashMap();
byte initSeed[] = new byte[1234];
@@ -351,5 +390,6 @@ public class FortunaStandalone extends BasePRNGStandalone implements Serializabl
}
long time = System.currentTimeMillis() - before;
System.out.println("512MB took " + time + ", or " + (8*64d)/((double)time/1000d) +"MBps");
+ */
}
}
diff --git a/core/java/src/gnu/crypto/prng/IRandom.java b/core/java/src/gnu/crypto/prng/IRandomStandalone.java
similarity index 81%
rename from core/java/src/gnu/crypto/prng/IRandom.java
rename to core/java/src/gnu/crypto/prng/IRandomStandalone.java
index 9dc2109cdb..a7f343378d 100644
--- a/core/java/src/gnu/crypto/prng/IRandom.java
+++ b/core/java/src/gnu/crypto/prng/IRandomStandalone.java
@@ -1,7 +1,7 @@
package gnu.crypto.prng;
// ----------------------------------------------------------------------------
-// $Id: IRandom.java,v 1.12 2005/10/06 04:24:17 rsdio Exp $
+// $Id: IRandomStandalone.java,v 1.1 2005/10/22 13:10:00 jrandom Exp $
//
// Copyright (C) 2001, 2002, 2003 Free Software Foundation, Inc.
//
@@ -82,9 +82,9 @@ import java.util.Map;
* Menezes, A., van Oorschot, P. and S. Vanstone.
*
*
- * @version $Revision: 1.12 $
+ * @version $Revision: 1.1 $
*/
-public interface IRandom extends Cloneable {
+public interface IRandomStandalone extends Cloneable {
// Constants
// -------------------------------------------------------------------------
@@ -110,32 +110,32 @@ public interface IRandom extends Cloneable {
void init(Map attributes);
/**
- *
Returns the next 8 bits of random data generated from this instance.
- *
- * @return the next 8 bits of random data generated from this instance.
- * @exception IllegalStateException if the instance is not yet initialised.
- * @exception LimitReachedException if this instance has reached its
- * theoretical limit for generating non-repetitive pseudo-random data.
- */
- byte nextByte() throws IllegalStateException, LimitReachedException;
+ *
Returns the next 8 bits of random data generated from this instance.
+ *
+ * @return the next 8 bits of random data generated from this instance.
+ * @exception IllegalStateException if the instance is not yet initialised.
+ * @exception LimLimitReachedExceptionStandalone this instance has reached its
+ * theoretical limit for generating non-repetitive pseudo-random data.
+ */
+ byte nextByte() throws IllegalStateException, LimitReachedExceptionStandalone;
/**
- *
Fills the designated byte array, starting from byte at index
- * offset, for a maximum of length bytes with the
- * output of this generator instance.
- *
- * @param out the placeholder to contain the generated random bytes.
- * @param offset the starting index in out to consider. This method
- * does nothing if this parameter is not within 0 and
- * out.length.
- * @param length the maximum number of required random bytes. This method
- * does nothing if this parameter is less than 1.
- * @exception IllegalStateException if the instance is not yet initialised.
- * @exception LimitReachedException if this instance has reached its
- * theoretical limit for generating non-repetitive pseudo-random data.
- */
+ *
Fills the designated byte array, starting from byte at index
+ * offset, for a maximum of length bytes with the
+ * output of this generator instance.
+ *
+ * @param out the placeholder to contain the generated random bytes.
+ * @param offset the starting index in out to consider. This method
+ * does nothing if this parameter is not within 0 and
+ * out.length.
+ * @param length the maximum number of required random bytes. This method
+ * does nothing if this parameter is less than 1.
+ * @exception IllegalStateException if the instance is not yet initialised.
+ * @exception LimitLimitReachedExceptionStandalonehis instance has reached its
+ * theoretical limit for generating non-repetitive pseudo-random data.
+ */
void nextBytes(byte[] out, int offset, int length)
- throws IllegalStateException, LimitReachedException;
+ throws IllegalStateException, LimitReachedExceptionStandalone;
/**
*
Supplement, or possibly replace, the random state of this PRNG with
diff --git a/core/java/src/gnu/crypto/prng/LimitReachedException.java b/core/java/src/gnu/crypto/prng/LimitReachedExceptionStandalone.java
similarity index 90%
rename from core/java/src/gnu/crypto/prng/LimitReachedException.java
rename to core/java/src/gnu/crypto/prng/LimitReachedExceptionStandalone.java
index 4e5f26f3f0..509a2d9713 100644
--- a/core/java/src/gnu/crypto/prng/LimitReachedException.java
+++ b/core/java/src/gnu/crypto/prng/LimitReachedExceptionStandalone.java
@@ -1,7 +1,7 @@
package gnu.crypto.prng;
// ----------------------------------------------------------------------------
-// $Id: LimitReachedException.java,v 1.5 2005/10/06 04:24:17 rsdio Exp $
+// $Id: LimitReachedExceptionStandalone.java,v 1.1 2005/10/22 13:10:00 jrandom Exp $
//
// Copyright (C) 2001, 2002, Free Software Foundation, Inc.
//
@@ -47,9 +47,9 @@ package gnu.crypto.prng;
* A checked exception that indicates that a pseudo random number generated has
* reached its theoretical limit in generating random bytes.
*
- * @version $Revision: 1.5 $
+ * @version $Revision: 1.1 $
*/
-public class LimitReachedException extends Exception {
+public class LimitReachedExceptionStandalone extends Exception {
// Constants and variables
// -------------------------------------------------------------------------
@@ -57,11 +57,11 @@ public class LimitReachedException extends Exception {
// Constructor(s)
// -------------------------------------------------------------------------
- public LimitReachedException() {
+ public LimitReachedExceptionStandalone() {
super();
}
- public LimitReachedException(String msg) {
+ public LimitReachedExceptionStandalone(String msg) {
super(msg);
}
diff --git a/core/java/src/gnu/crypto/prng/RandomEventListener.java b/core/java/src/gnu/crypto/prng/RandomEventListenerStandalone.java
similarity index 91%
rename from core/java/src/gnu/crypto/prng/RandomEventListener.java
rename to core/java/src/gnu/crypto/prng/RandomEventListenerStandalone.java
index 0c4542217f..dee897bfd0 100644
--- a/core/java/src/gnu/crypto/prng/RandomEventListener.java
+++ b/core/java/src/gnu/crypto/prng/RandomEventListenerStandalone.java
@@ -1,4 +1,4 @@
-/* RandomEventListener.java -- event listener
+/* RandomEventListenerStandalone.java -- event listener
Copyright (C) 2004 Free Software Foundation, Inc.
This file is part of GNU Crypto.
@@ -47,7 +47,7 @@ import java.util.EventListener;
* An interface for entropy accumulators that will be notified of random
* events.
*/
-public interface RandomEventListener extends EventListener
+public interface RandomEventListenerStandalone extends EventListener
{
- void addRandomEvent(RandomEvent event);
+ void addRandomEvent(RandomEventStandalone event);
}
diff --git a/core/java/src/gnu/crypto/prng/RandomEvent.java b/core/java/src/gnu/crypto/prng/RandomEventStandalone.java
similarity index 92%
rename from core/java/src/gnu/crypto/prng/RandomEvent.java
rename to core/java/src/gnu/crypto/prng/RandomEventStandalone.java
index f9a678f951..666439df2a 100644
--- a/core/java/src/gnu/crypto/prng/RandomEvent.java
+++ b/core/java/src/gnu/crypto/prng/RandomEventStandalone.java
@@ -1,4 +1,4 @@
-/* RandomEvent.java -- a random event.
+/* RandomEventStandalone.java -- a random event.
Copyright (C) 2004 Free Software Foundation, Inc.
This file is part of GNU Crypto.
@@ -47,14 +47,14 @@ import java.util.EventObject;
* An interface for entropy accumulators that will be notified of random
* events.
*/
-public class RandomEvent extends EventObject
+public class RandomEventStandalone extends EventObject
{
private final byte sourceNumber;
private final byte poolNumber;
private final byte[] data;
- public RandomEvent(Object source, byte sourceNumber, byte poolNumber,
+ public RandomEventStandalone(Object source, byte sourceNumber, byte poolNumber,
byte[] data)
{
super(source);
diff --git a/core/java/src/net/i2p/crypto/DHSessionKeyBuilder.java b/core/java/src/net/i2p/crypto/DHSessionKeyBuilder.java
index a1754a3ad2..949d3a70b2 100644
--- a/core/java/src/net/i2p/crypto/DHSessionKeyBuilder.java
+++ b/core/java/src/net/i2p/crypto/DHSessionKeyBuilder.java
@@ -320,9 +320,9 @@ public class DHSessionKeyBuilder {
if (_myPrivateValue == null) generateMyValue();
_sessionKey = calculateSessionKey(_myPrivateValue, _peerValue);
} else {
- System.err.println("Not ready yet.. privateValue and peerValue must be set ("
- + (_myPrivateValue != null ? "set" : "null") + ","
- + (_peerValue != null ? "set" : "null") + ")");
+ //System.err.println("Not ready yet.. privateValue and peerValue must be set ("
+ // + (_myPrivateValue != null ? "set" : "null") + ","
+ // + (_peerValue != null ? "set" : "null") + ")");
}
return _sessionKey;
}
diff --git a/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java b/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java
index f7a2579ce9..fbe0448b46 100644
--- a/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java
+++ b/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java
@@ -67,7 +67,7 @@ class TransientSessionKeyManager extends SessionKeyManager {
_log = context.logManager().getLog(TransientSessionKeyManager.class);
_context = context;
_outboundSessions = new HashMap(1024);
- _inboundTagSets = new HashMap(64*1024);
+ _inboundTagSets = new HashMap(1024);
context.statManager().createRateStat("crypto.sessionTagsExpired", "How many tags/sessions are expired?", "Encryption", new long[] { 10*60*1000, 60*60*1000, 3*60*60*1000 });
context.statManager().createRateStat("crypto.sessionTagsRemaining", "How many tags/sessions are remaining after a cleanup?", "Encryption", new long[] { 10*60*1000, 60*60*1000, 3*60*60*1000 });
SimpleTimer.getInstance().addEvent(new CleanupEvent(), 60*1000);
diff --git a/core/java/src/net/i2p/data/DataHelper.java b/core/java/src/net/i2p/data/DataHelper.java
index b66749c0d9..ea84546112 100644
--- a/core/java/src/net/i2p/data/DataHelper.java
+++ b/core/java/src/net/i2p/data/DataHelper.java
@@ -822,6 +822,8 @@ public class DataHelper {
return (ms / (60 * 1000)) + "m";
} else if (ms < 3 * 24 * 60 * 60 * 1000) {
return (ms / (60 * 60 * 1000)) + "h";
+ } else if (ms > 365 * 24 * 60 * 60 * 1000) {
+ return "n/a";
} else {
return (ms / (24 * 60 * 60 * 1000)) + "d";
}
diff --git a/core/java/src/net/i2p/util/FortunaRandomSource.java b/core/java/src/net/i2p/util/FortunaRandomSource.java
index 74eb4938cc..2d1a691965 100644
--- a/core/java/src/net/i2p/util/FortunaRandomSource.java
+++ b/core/java/src/net/i2p/util/FortunaRandomSource.java
@@ -14,7 +14,7 @@ import java.security.SecureRandom;
import net.i2p.I2PAppContext;
import net.i2p.crypto.EntropyHarvester;
-import gnu.crypto.prng.FortunaStandalone;
+import gnu.crypto.prng.AsyncFortunaStandalone;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
@@ -26,13 +26,13 @@ import java.io.IOException;
*
*/
public class FortunaRandomSource extends RandomSource implements EntropyHarvester {
- private FortunaStandalone _fortuna;
+ private AsyncFortunaStandalone _fortuna;
private double _nextGaussian;
private boolean _haveNextGaussian;
public FortunaRandomSource(I2PAppContext context) {
super(context);
- _fortuna = new FortunaStandalone();
+ _fortuna = new AsyncFortunaStandalone();
byte seed[] = new byte[1024];
if (initSeed(seed)) {
_fortuna.seed(seed);
@@ -41,6 +41,7 @@ public class FortunaRandomSource extends RandomSource implements EntropyHarveste
sr.nextBytes(seed);
_fortuna.seed(seed);
}
+ _fortuna.startup();
// kickstart it
_fortuna.nextBytes(seed);
_haveNextGaussian = false;
@@ -202,6 +203,13 @@ public class FortunaRandomSource extends RandomSource implements EntropyHarveste
public static void main(String args[]) {
try {
RandomSource rand = I2PAppContext.getGlobalContext().random();
+ if (true) {
+ for (int i = 0; i < 1000; i++)
+ if (rand.nextFloat() < 0)
+ throw new RuntimeException("negative!");
+ System.out.println("All positive");
+ return;
+ }
java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream();
java.util.zip.GZIPOutputStream gos = new java.util.zip.GZIPOutputStream(baos);
for (int i = 0; i < 1024*1024; i++) {
diff --git a/core/java/src/net/i2p/util/I2PThread.java b/core/java/src/net/i2p/util/I2PThread.java
index b4bd8d740c..f33d6f08d5 100644
--- a/core/java/src/net/i2p/util/I2PThread.java
+++ b/core/java/src/net/i2p/util/I2PThread.java
@@ -48,6 +48,12 @@ public class I2PThread extends Thread {
if ( (_log == null) || (_log.shouldLog(Log.DEBUG)) )
_createdBy = new Exception("Created by");
}
+ public I2PThread(Runnable r, String name, boolean isDaemon) {
+ super(r, name);
+ setDaemon(isDaemon);
+ if ( (_log == null) || (_log.shouldLog(Log.DEBUG)) )
+ _createdBy = new Exception("Created by");
+ }
private void log(int level, String msg) { log(level, msg, null); }
private void log(int level, String msg, Throwable t) {
@@ -113,4 +119,4 @@ public class I2PThread extends Thread {
} catch (Throwable tt) { // nop
}
}
-}
\ No newline at end of file
+}
diff --git a/core/java/src/net/i2p/util/SimpleTimer.java b/core/java/src/net/i2p/util/SimpleTimer.java
index 9cde50032f..c86b1379b9 100644
--- a/core/java/src/net/i2p/util/SimpleTimer.java
+++ b/core/java/src/net/i2p/util/SimpleTimer.java
@@ -31,14 +31,14 @@ public class SimpleTimer {
_context = I2PAppContext.getGlobalContext();
_log = _context.logManager().getLog(SimpleTimer.class);
_events = new TreeMap();
- _eventTimes = new HashMap(1024);
+ _eventTimes = new HashMap(256);
_readyEvents = new ArrayList(4);
I2PThread runner = new I2PThread(new SimpleTimerRunner());
runner.setName(name);
runner.setDaemon(true);
runner.start();
for (int i = 0; i < 3; i++) {
- I2PThread executor = new I2PThread(new Executor());
+ I2PThread executor = new I2PThread(new Executor(_context, _log, _readyEvents));
executor.setName(name + "Executor " + i);
executor.setDaemon(true);
executor.start();
@@ -114,7 +114,7 @@ public class SimpleTimer {
long timeToAdd = System.currentTimeMillis() - now;
if (timeToAdd > 50) {
if (_log.shouldLog(Log.WARN))
- _log.warn("timer contention: took " + timeToAdd + "ms to add a job");
+ _log.warn("timer contention: took " + timeToAdd + "ms to add a job with " + totalEvents + " queued");
}
}
@@ -141,14 +141,6 @@ public class SimpleTimer {
public void timeReached();
}
- private void log(String msg, Throwable t) {
- synchronized (this) {
- if (_log == null)
- _log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class);
- }
- _log.log(Log.CRIT, msg, t);
- }
-
private long _occurredTime;
private long _occurredEventCount;
private TimedEvent _recentEvents[] = new TimedEvent[5];
@@ -228,30 +220,45 @@ public class SimpleTimer {
}
}
}
-
- private class Executor implements Runnable {
- public void run() {
- while (true) {
- TimedEvent evt = null;
- synchronized (_readyEvents) {
- if (_readyEvents.size() <= 0)
- try { _readyEvents.wait(); } catch (InterruptedException ie) {}
- if (_readyEvents.size() > 0)
- evt = (TimedEvent)_readyEvents.remove(0);
- }
-
- if (evt != null) {
- long before = _context.clock().now();
- try {
- evt.timeReached();
- } catch (Throwable t) {
- log("wtf, event borked: " + evt, t);
- }
- long time = _context.clock().now() - before;
- if ( (time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN)) )
- _log.warn("wtf, event execution took " + time + ": " + evt);
+}
+
+class Executor implements Runnable {
+ private I2PAppContext _context;
+ private Log _log;
+ private List _readyEvents;
+ public Executor(I2PAppContext ctx, Log log, List events) {
+ _context = ctx;
+ _readyEvents = events;
+ }
+ public void run() {
+ while (true) {
+ SimpleTimer.TimedEvent evt = null;
+ synchronized (_readyEvents) {
+ if (_readyEvents.size() <= 0)
+ try { _readyEvents.wait(); } catch (InterruptedException ie) {}
+ if (_readyEvents.size() > 0)
+ evt = (SimpleTimer.TimedEvent)_readyEvents.remove(0);
+ }
+
+ if (evt != null) {
+ long before = _context.clock().now();
+ try {
+ evt.timeReached();
+ } catch (Throwable t) {
+ log("wtf, event borked: " + evt, t);
}
+ long time = _context.clock().now() - before;
+ if ( (time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN)) )
+ _log.warn("wtf, event execution took " + time + ": " + evt);
}
}
}
+
+ private void log(String msg, Throwable t) {
+ synchronized (this) {
+ if (_log == null)
+ _log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class);
+ }
+ _log.log(Log.CRIT, msg, t);
+ }
}
diff --git a/history.txt b/history.txt
index 7c77dfe93a..bfc35f21ff 100644
--- a/history.txt
+++ b/history.txt
@@ -1,4 +1,20 @@
-$Id: history.txt,v 1.490 2006-06-14 00:24:35 cervantes Exp $
+$Id: history.txt,v 1.491 2006-07-01 17:44:34 complication Exp $
+
+2006-07-04 jrandom
+ * New NIO-based tcp transport (NTCP), enabled by default for outbound
+ connections only. Those who configure their NAT/firewall to allow
+ inbound connections and specify the external host and port
+ (dyndns/etc is ok) on /config.jsp can receive inbound connections.
+ SSU is still enabled for use by default for all users as a fallback.
+ * Substantial bugfix to the tunnel gateway processing to transfer
+ messages sequentially instead of interleaved
+ * Renamed GNU/crypto classes to avoid name clashes with kaffe and other
+ GNU/Classpath based JVMs
+ * Adjust the Fortuna PRNG's pooling system to reduce contention on
+ refill with a background thread to refill the output buffer
+ * Add per-transport support for the shitlist
+ * Add a new async pumped tunnel gateway to reduce tunnel dispatcher
+ contention
2006-07-01 Complication
* Ensure that the I2PTunnel web interface won't update tunnel settings
diff --git a/router/java/src/net/i2p/router/OutNetMessage.java b/router/java/src/net/i2p/router/OutNetMessage.java
index c19ce5f181..2240af423c 100644
--- a/router/java/src/net/i2p/router/OutNetMessage.java
+++ b/router/java/src/net/i2p/router/OutNetMessage.java
@@ -48,6 +48,7 @@ public class OutNetMessage {
private MessageSelector _replySelector;
private Set _failedTransports;
private long _sendBegin;
+ private long _transmitBegin;
private Exception _createdBy;
private long _created;
/** for debugging, contains a mapping of even name to Long (e.g. "begin sending", "handleOutbound", etc) */
@@ -57,6 +58,10 @@ public class OutNetMessage {
* (some JVMs have less than 10ms resolution, so the Long above doesn't guarantee order)
*/
private List _timestampOrder;
+ private int _queueSize;
+ private long _prepareBegin;
+ private long _prepareEnd;
+ private Object _preparationBuf;
public OutNetMessage(RouterContext context) {
_context = context;
@@ -148,6 +153,7 @@ public class OutNetMessage {
_messageType = msg.getClass().getName();
_messageTypeId = msg.getType();
_messageId = msg.getUniqueId();
+ _messageSize = _message.getMessageSize();
}
}
@@ -235,9 +241,31 @@ public class OutNetMessage {
/** when did the sending process begin */
public long getSendBegin() { return _sendBegin; }
public void beginSend() { _sendBegin = _context.clock().now(); }
+ public void beginTransmission() { _transmitBegin = _context.clock().now(); }
+ public void beginPrepare() { _prepareBegin = _context.clock().now(); }
+ public void prepared() { prepared(null); }
+ public void prepared(Object buf) {
+ _prepareEnd = _context.clock().now();
+ _preparationBuf = buf;
+ }
+ public Object releasePreparationBuffer() {
+ Object rv = _preparationBuf;
+ _preparationBuf = null;
+ return rv;
+ }
public long getCreated() { return _created; }
+ /** time since the message was created */
public long getLifetime() { return _context.clock().now() - _created; }
+ /** time the transport tries to send the message (including any queueing) */
+ public long getSendTime() { return _context.clock().now() - _sendBegin; }
+ /** time during which the i2np message is actually in flight */
+ public long getTransmissionTime() { return _context.clock().now() - _transmitBegin; }
+ /** how long it took to prepare the i2np message for transmission (including serialization and transport layer encryption) */
+ public long getPreparationTime() { return _prepareEnd - _prepareBegin; }
+ /** number of messages ahead of this one going to the targetted peer when it is first enqueued */
+ public int getQueueSize() { return _queueSize; }
+ public void setQueueSize(int size) { _queueSize = size; }
/**
* We've done what we need to do with the data from this message, though
@@ -245,6 +273,8 @@ public class OutNetMessage {
*/
public void discardData() {
long timeToDiscard = _context.clock().now() - _created;
+ if ( (_message != null) && (_messageSize <= 0) )
+ _messageSize = _message.getMessageSize();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Discard " + _messageSize + "byte " + _messageType + " message after "
+ timeToDiscard);
diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java
index d208f3b7a3..e75936cc3b 100644
--- a/router/java/src/net/i2p/router/RouterVersion.java
+++ b/router/java/src/net/i2p/router/RouterVersion.java
@@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
- public final static String ID = "$Revision: 1.429 $ $Date: 2006-06-13 21:17:48 $";
+ public final static String ID = "$Revision: 1.430 $ $Date: 2006-07-01 17:44:36 $";
public final static String VERSION = "0.6.1.21";
- public final static long BUILD = 1;
+ public final static long BUILD = 2;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID);
diff --git a/router/java/src/net/i2p/router/Shitlist.java b/router/java/src/net/i2p/router/Shitlist.java
index e46b545a89..ea3adf761a 100644
--- a/router/java/src/net/i2p/router/Shitlist.java
+++ b/router/java/src/net/i2p/router/Shitlist.java
@@ -10,10 +10,8 @@ package net.i2p.router;
import java.io.IOException;
import java.io.Writer;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
+import java.util.*;
+import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.router.peermanager.PeerProfile;
@@ -28,29 +26,36 @@ import net.i2p.util.Log;
public class Shitlist {
private Log _log;
private RouterContext _context;
- private Map _shitlist; // H(routerIdent) --> Date
- private Map _shitlistCause; // H(routerIdent) --> String
+ private Map _entries;
+
+ private class Entry {
+ /** when it should expire, per the i2p clock */
+ long expireOn;
+ /** why they were shitlisted */
+ String cause;
+ /** what transports they were shitlisted for (String), or null for all transports */
+ Set transports;
+ }
public final static long SHITLIST_DURATION_MS = 4*60*1000; // 4 minute shitlist
public Shitlist(RouterContext context) {
_context = context;
_log = context.logManager().getLog(Shitlist.class);
- _shitlist = new HashMap(5);
- _shitlistCause = new HashMap(5);
+ _entries = new HashMap(32);
}
public int getRouterCount() {
- purge();
- synchronized (_shitlist) {
- return _shitlist.size();
+ synchronized (_entries) {
+ return _entries.size();
}
}
public boolean shitlistRouter(Hash peer) {
return shitlistRouter(peer, null);
}
- public boolean shitlistRouter(Hash peer, String reason) {
+ public boolean shitlistRouter(Hash peer, String reason) { return shitlistRouter(peer, reason, null); }
+ public boolean shitlistRouter(Hash peer, String reason, String transport) {
if (peer == null) {
_log.error("wtf, why did we try to shitlist null?", new Exception("shitfaced"));
return false;
@@ -73,14 +78,26 @@ public class Shitlist {
if (period > 60*60*1000)
period = 60*60*1000;
- synchronized (_shitlist) {
- Date oldDate = (Date)_shitlist.put(peer, new Date(_context.clock().now() + period));
- wasAlready = (null == oldDate);
- if (reason != null) {
- if (!wasAlready || (!_shitlistCause.containsKey(peer)) )
- _shitlistCause.put(peer, reason);
- } else {
- _shitlistCause.remove(peer);
+ Entry e = new Entry();
+ e.expireOn = _context.clock().now() + period;
+ e.cause = reason;
+ e.transports = null;
+ if (transport != null) {
+ e.transports = new HashSet(1);
+ e.transports.add(transport);
+ }
+
+ synchronized (_entries) {
+ Entry old = (Entry)_entries.put(peer, e);
+ if (old != null) {
+ wasAlready = true;
+ _entries.put(peer, old);
+ if (e.transports == null) {
+ old.transports = null;
+ } else if (old.transports != null) {
+ old.transports.addAll(e.transports);
+ }
+ e = old;
}
}
@@ -95,87 +112,89 @@ public class Shitlist {
public void unshitlistRouter(Hash peer) {
unshitlistRouter(peer, true);
}
- private void unshitlistRouter(Hash peer, boolean realUnshitlist) {
+ private void unshitlistRouter(Hash peer, boolean realUnshitlist) { unshitlistRouter(peer, realUnshitlist, null); }
+ public void unshitlistRouter(Hash peer, String transport) { unshitlistRouter(peer, true, transport); }
+ private void unshitlistRouter(Hash peer, boolean realUnshitlist, String transport) {
if (peer == null) return;
if (_log.shouldLog(Log.INFO))
- _log.info("Unshitlisting router " + peer.toBase64());
- synchronized (_shitlist) {
- _shitlist.remove(peer);
- _shitlistCause.remove(peer);
+ _log.info("Unshitlisting router " + peer.toBase64()
+ + (transport != null ? "/" + transport : ""));
+ boolean fully = false;
+ synchronized (_entries) {
+ Entry e = (Entry)_entries.remove(peer);
+ if ( (e == null) || (e.transports == null) || (transport == null) || (e.transports.size() <= 1) ) {
+ // fully unshitlisted
+ fully = true;
+ } else {
+ e.transports.remove(transport);
+ _entries.put(peer, e);
+ }
}
- if (realUnshitlist) {
+ if (fully) {
+ if (realUnshitlist) {
+ PeerProfile prof = _context.profileOrganizer().getProfile(peer);
+ if (prof != null)
+ prof.unshitlist();
+ }
+ _context.messageHistory().unshitlist(peer);
+ }
+ }
+
+ public boolean isShitlisted(Hash peer) { return isShitlisted(peer, null); }
+ public boolean isShitlisted(Hash peer, String transport) {
+ boolean rv = false;
+ boolean unshitlist = false;
+ synchronized (_entries) {
+ Entry entry = (Entry)_entries.get(peer);
+ if (entry == null) {
+ rv = false;
+ } else {
+ if (entry.expireOn <= _context.clock().now()) {
+ _entries.remove(peer);
+ unshitlist = true;
+ rv = false;
+ } else {
+ if (entry.transports == null) {
+ rv = true;
+ } else if (entry.transports.contains(transport)) {
+ rv = true;
+ } else {
+ rv = false;
+ }
+ }
+ }
+ }
+
+ if (unshitlist) {
PeerProfile prof = _context.profileOrganizer().getProfile(peer);
if (prof != null)
prof.unshitlist();
- }
- _context.messageHistory().unshitlist(peer);
- }
-
- public boolean isShitlisted(Hash peer) {
- Date shitlistDate = null;
- synchronized (_shitlist) {
- shitlistDate = (Date)_shitlist.get(peer);
- }
- if (shitlistDate == null) return false;
-
- // check validity
- if (shitlistDate.getTime() > _context.clock().now()) {
- return true;
- } else {
- unshitlistRouter(peer, false);
- return false;
- }
- }
-
- /**
- * We already unshitlist on isShitlisted, but this purge
- * lets us get the correct value when rendering the HTML or
- * getting the shitlist count. wheee
- *
- */
- private void purge() {
- Map shitlist = null;
- synchronized (_shitlist) {
- shitlist = new HashMap(_shitlist);
+ _context.messageHistory().unshitlist(peer);
}
- long limit = _context.clock().now();
-
- for (Iterator iter = shitlist.keySet().iterator(); iter.hasNext(); ) {
- Hash key = (Hash)iter.next();
- Date shitDate = (Date)shitlist.get(key);
- if (shitDate.getTime() < limit) {
- unshitlistRouter(key, false);
- }
- }
+ return rv;
}
-
public void renderStatusHTML(Writer out) throws IOException {
StringBuffer buf = new StringBuffer(1024);
buf.append("
\n");
+ out.write(buf.toString());
+ buf.setLength(0);
+ }
+
+ private static NumberFormat _rateFmt = new DecimalFormat("#,#00.00");
+ private static String formatRate(float rate) {
+ synchronized (_rateFmt) { return _rateFmt.format(rate); }
+ }
+
+ private Comparator getComparator(int sortFlags) {
+ Comparator rv = null;
+ switch (Math.abs(sortFlags)) {
+ default:
+ rv = AlphaComparator.instance();
+ }
+ if (sortFlags < 0)
+ rv = new InverseComparator(rv);
+ return rv;
+ }
+ private static class AlphaComparator extends PeerComparator {
+ private static final AlphaComparator _instance = new AlphaComparator();
+ public static final AlphaComparator instance() { return _instance; }
+ }
+ private static class InverseComparator implements Comparator {
+ private Comparator _comp;
+ public InverseComparator(Comparator comp) { _comp = comp; }
+ public int compare(Object lhs, Object rhs) {
+ return -1 * _comp.compare(lhs, rhs);
+ }
+ }
+ private static class PeerComparator implements Comparator {
+ public int compare(Object lhs, Object rhs) {
+ if ( (lhs == null) || (rhs == null) || !(lhs instanceof NTCPConnection) || !(rhs instanceof NTCPConnection))
+ throw new IllegalArgumentException("rhs = " + rhs + " lhs = " + lhs);
+ return compare((NTCPConnection)lhs, (NTCPConnection)rhs);
+ }
+ protected int compare(NTCPConnection l, NTCPConnection r) {
+ // base64 retains binary ordering
+ return DataHelper.compareTo(l.getRemotePeer().calculateHash().getData(), r.getRemotePeer().calculateHash().getData());
+ }
+ }
+
+ /**
+ * Cache the bid to reduce object churn
+ */
+ private class SharedBid extends TransportBid {
+ public SharedBid(int ms) { super(); setLatencyMs(ms); }
+ public Transport getTransport() { return NTCPTransport.this; }
+ public String toString() { return "NTCP bid @ " + getLatencyMs(); }
+ }
+}
diff --git a/router/java/src/net/i2p/router/transport/ntcp/Reader.java b/router/java/src/net/i2p/router/transport/ntcp/Reader.java
new file mode 100644
index 0000000000..af69edf630
--- /dev/null
+++ b/router/java/src/net/i2p/router/transport/ntcp/Reader.java
@@ -0,0 +1,165 @@
+package net.i2p.router.transport.ntcp;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import net.i2p.router.RouterContext;
+import net.i2p.util.I2PThread;
+import net.i2p.util.Log;
+
+/**
+ * Pool of running threads which will process any read bytes on any of the
+ * NTCPConnections, including the decryption of the data read, connection
+ * handshaking, parsing bytes into I2NP messages, etc.
+ *
+ */
+class Reader {
+ private RouterContext _context;
+ private Log _log;
+ private List _pendingConnections;
+ private List _liveReads;
+ private List _readAfterLive;
+ private List _runners;
+
+ public Reader(RouterContext ctx) {
+ _context = ctx;
+ _log = ctx.logManager().getLog(getClass());
+ _pendingConnections = new ArrayList(16);
+ _runners = new ArrayList(5);
+ _liveReads = new ArrayList(5);
+ _readAfterLive = new ArrayList();
+ }
+
+ public void startReading(int numReaders) {
+ for (int i = 0; i < numReaders; i++) {
+ Runner r = new Runner();
+ I2PThread t = new I2PThread(r, "NTCP read " + i, true);
+ _runners.add(r);
+ t.start();
+ }
+ }
+ public void stopReading() {
+ while (_runners.size() > 0) {
+ Runner r = (Runner)_runners.remove(0);
+ r.stop();
+ }
+ synchronized (_pendingConnections) {
+ _readAfterLive.clear();
+ _pendingConnections.notifyAll();
+ }
+ }
+
+ public void wantsRead(NTCPConnection con) {
+ boolean already = false;
+ synchronized (_pendingConnections) {
+ if (_liveReads.contains(con)) {
+ if (!_readAfterLive.contains(con)) {
+ _readAfterLive.add(con);
+ }
+ already = true;
+ } else if (!_pendingConnections.contains(con)) {
+ _pendingConnections.add(con);
+ }
+ _pendingConnections.notifyAll();
+ }
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("wantsRead: " + con + " already live? " + already);
+ }
+ public void connectionClosed(NTCPConnection con) {
+ synchronized (_pendingConnections) {
+ _readAfterLive.remove(con);
+ _pendingConnections.remove(con);
+ _pendingConnections.notifyAll();
+ }
+ }
+
+ private class Runner implements Runnable {
+ private boolean _stop;
+ public Runner() { _stop = false; }
+ public void stop() { _stop = true; }
+ public void run() {
+ if (_log.shouldLog(Log.INFO)) _log.info("Starting reader");
+ NTCPConnection con = null;
+ while (!_stop) {
+ try {
+ synchronized (_pendingConnections) {
+ boolean keepReading = (con != null) && _readAfterLive.remove(con);
+ if (keepReading) {
+ // keep on reading the same one
+ } else {
+ _liveReads.remove(con);
+ con = null;
+ if (_pendingConnections.size() <= 0) {
+ _pendingConnections.wait();
+ } else {
+ con = (NTCPConnection)_pendingConnections.remove(0);
+ _liveReads.add(con);
+ }
+ }
+ }
+ } catch (InterruptedException ie) {}
+ if (!_stop && (con != null) ) {
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("begin read for " + con);
+ try {
+ processRead(con);
+ } catch (RuntimeException re) {
+ _log.log(Log.CRIT, "Error in the ntcp reader", re);
+ }
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("end read for " + con);
+ }
+ }
+ if (_log.shouldLog(Log.INFO)) _log.info("Stopping reader");
+ }
+ }
+
+ /**
+ * process everything read
+ */
+ private void processRead(NTCPConnection con) {
+ if (con.isClosed())
+ return;
+ ByteBuffer buf = null;
+ while (!con.isClosed() && !con.isEstablished() && ( (buf = con.getNextReadBuf()) != null) ) {
+ EstablishState est = con.getEstablishState();
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("Processing read buffer as an establishment for " + con + " with [" + est + "]");
+ if (est == null) {
+ if (!con.isEstablished()) {
+ // establish state is only removed when the connection is fully established,
+ // yet if that happens, con.isEstablished() should return true...
+ throw new RuntimeException("connection was not established, yet the establish state is null for " + con);
+ } else {
+ // hmm, there shouldn't be a race here - only one reader should
+ // be running on a con at a time...
+ if (_log.shouldLog(Log.ERROR))
+ _log.error("no establishment state but " + con + " is established... race?");
+ break;
+ }
+ }
+ if (est.isComplete()) {
+ // why is it complete yet !con.isEstablished?
+ if (_log.shouldLog(Log.ERROR))
+ _log.error("establishment state [" + est + "] is complete, yet the connection isn't established? "
+ + con.isEstablished() + " (inbound? " + con.isInbound() + " " + con + ")");
+ break;
+ }
+ est.receive(buf);
+ if (est.isCorrupt()) {
+ if (_log.shouldLog(Log.WARN))
+ _log.warn("closing connection on establishment because: " +est.getError(), est.getException());
+ con.close();
+ return;
+ } else if (buf.remaining() <= 0) {
+ con.removeReadBuf(buf);
+ }
+ }
+ while (!con.isClosed() && (buf = con.getNextReadBuf()) != null) {
+ // decrypt the data and push it into an i2np message
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("Processing read buffer as part of an i2np message (" + buf.remaining() + " bytes)");
+ con.recvEncryptedI2NP(buf);
+ con.removeReadBuf(buf);
+ }
+ }
+}
diff --git a/router/java/src/net/i2p/router/transport/ntcp/Writer.java b/router/java/src/net/i2p/router/transport/ntcp/Writer.java
new file mode 100644
index 0000000000..d808e8a64b
--- /dev/null
+++ b/router/java/src/net/i2p/router/transport/ntcp/Writer.java
@@ -0,0 +1,113 @@
+package net.i2p.router.transport.ntcp;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import net.i2p.router.RouterContext;
+import net.i2p.util.I2PThread;
+import net.i2p.util.Log;
+
+/**
+ * Pool of running threads which will transform the next I2NP message into
+ * something ready to be transferred over an NTCP connection, including the
+ * encryption of the data read.
+ *
+ */
+class Writer {
+ private RouterContext _context;
+ private Log _log;
+ private List _pendingConnections;
+ private List _liveWrites;
+ private List _writeAfterLive;
+ private List _runners;
+
+ public Writer(RouterContext ctx) {
+ _context = ctx;
+ _log = ctx.logManager().getLog(getClass());
+ _pendingConnections = new ArrayList(16);
+ _runners = new ArrayList(5);
+ _liveWrites = new ArrayList(5);
+ _writeAfterLive = new ArrayList(5);
+ }
+
+ public void startWriting(int numWriters) {
+ for (int i = 0; i < numWriters; i++) {
+ Runner r = new Runner();
+ I2PThread t = new I2PThread(r, "NTCP write " + i, true);
+ _runners.add(r);
+ t.start();
+ }
+ }
+ public void stopWriting() {
+ while (_runners.size() > 0) {
+ Runner r = (Runner)_runners.remove(0);
+ r.stop();
+ }
+ synchronized (_pendingConnections) {
+ _writeAfterLive.clear();
+ _pendingConnections.notifyAll();
+ }
+ }
+
+ public void wantsWrite(NTCPConnection con) {
+ //if (con.getCurrentOutbound() != null)
+ // throw new RuntimeException("Current outbound message already in play on " + con);
+ boolean already = false;
+ synchronized (_pendingConnections) {
+ if (_liveWrites.contains(con)) {
+ if (!_writeAfterLive.contains(con)) {
+ _writeAfterLive.add(con);
+ }
+ already = true;
+ } else if (!_pendingConnections.contains(con)) {
+ _pendingConnections.add(con);
+ }
+ _pendingConnections.notifyAll();
+ }
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("wantsWrite: " + con + " already live? " + already);
+ }
+ public void connectionClosed(NTCPConnection con) {
+ synchronized (_pendingConnections) {
+ _writeAfterLive.remove(con);
+ _pendingConnections.remove(con);
+ _pendingConnections.notifyAll();
+ }
+ }
+
+ private class Runner implements Runnable {
+ private boolean _stop;
+ public Runner() { _stop = false; }
+ public void stop() { _stop = true; }
+ public void run() {
+ if (_log.shouldLog(Log.INFO)) _log.info("Starting writer");
+ NTCPConnection con = null;
+ while (!_stop) {
+ try {
+ synchronized (_pendingConnections) {
+ boolean keepWriting = (con != null) && _writeAfterLive.remove(con);
+ if (keepWriting) {
+ // keep on writing the same one
+ } else {
+ _liveWrites.remove(con);
+ con = null;
+ if (_pendingConnections.size() <= 0) {
+ _pendingConnections.wait();
+ } else {
+ con = (NTCPConnection)_pendingConnections.remove(0);
+ _liveWrites.add(con);
+ }
+ }
+ }
+ } catch (InterruptedException ie) {}
+ if (!_stop && (con != null)) {
+ try {
+ con.prepareNextWrite();
+ } catch (RuntimeException re) {
+ _log.log(Log.CRIT, "Error in the ntcp writer", re);
+ }
+ }
+ }
+ if (_log.shouldLog(Log.INFO)) _log.info("Stopping writer");
+ }
+ }
+}
diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java
index 381e4a469d..d0ef6e7a99 100644
--- a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java
+++ b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java
@@ -299,7 +299,7 @@ public class TCPTransport extends TransportImpl {
}
}
- _context.shitlist().unshitlistRouter(ident.calculateHash());
+ _context.shitlist().unshitlistRouter(ident.calculateHash(), STYLE);
con.runConnection();
if (_log.shouldLog(Log.DEBUG))
diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java
index 7a757c9187..4ff591b562 100644
--- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java
+++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java
@@ -533,7 +533,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_log.warn("Peer already connected: old=" + oldPeer + " new=" + peer, new Exception("dup"));
_activeThrottle.unchoke(peer.getRemotePeer());
- _context.shitlist().unshitlistRouter(peer.getRemotePeer());
+ _context.shitlist().unshitlistRouter(peer.getRemotePeer(), STYLE);
if (SHOULD_FLOOD_PEERS)
_flooder.addPeer(peer);
diff --git a/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java b/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java
index 30822c4a3d..01288a2a8a 100644
--- a/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java
+++ b/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java
@@ -65,19 +65,35 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
}
public boolean preprocessQueue(List pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
- if (_log.shouldLog(Log.DEBUG))
+ StringBuffer timingBuf = null;
+ if (_log.shouldLog(Log.DEBUG)) {
_log.debug("Preprocess queue with " + pending.size() + " to send");
-
+ timingBuf = new StringBuffer(128);
+ timingBuf.append("Preprocess with " + pending.size() + " to send. ");
+ }
+ if (DISABLE_BATCHING) {
+ if (_log.shouldLog(Log.INFO))
+ _log.info("Disabled batching, pushing " + pending + " immediately");
+ return super.preprocessQueue(pending, sender, rec);
+ }
+ long start = System.currentTimeMillis();
+
int batchCount = 0;
int beforeLooping = pending.size();
while (pending.size() > 0) {
int allocated = 0;
+ long beforePendingLoop = System.currentTimeMillis();
for (int i = 0; i < pending.size(); i++) {
+ long pendingStart = System.currentTimeMillis();
TunnelGateway.Pending msg = (TunnelGateway.Pending)pending.get(i);
int instructionsSize = getInstructionsSize(msg);
instructionsSize += getInstructionAugmentationSize(msg, allocated, instructionsSize);
int curWanted = msg.getData().length - msg.getOffset() + instructionsSize;
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("pending " + i + "/" +pending.size()
+ + " (" + msg.getMessageId() + ") curWanted=" + curWanted
+ + " instructionSize=" + instructionsSize + " allocated=" + allocated);
allocated += curWanted;
if (allocated >= FULL_SIZE) {
if (allocated - curWanted + instructionsSize >= FULL_SIZE) {
@@ -93,8 +109,10 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
long waited = _context.clock().now() - _pendingSince;
_context.statManager().addRateData("tunnel.batchDelaySent", pending.size(), waited);
}
+ long beforeSend = System.currentTimeMillis();
_pendingSince = 0;
send(pending, 0, i, sender, rec);
+ long afterSend = System.currentTimeMillis();
if (_log.shouldLog(Log.INFO))
_log.info("Allocated=" + allocated + " so we sent " + (i+1)
+ " (last complete? " + (msg.getOffset() >= msg.getData().length)
@@ -105,26 +123,40 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
if (cur.getOffset() < cur.getData().length)
throw new IllegalArgumentException("i=" + i + " j=" + j + " off=" + cur.getOffset()
+ " len=" + cur.getData().length + " alloc=" + allocated);
+ if (timingBuf != null)
+ timingBuf.append(" sent " + cur);
notePreprocessing(cur.getMessageId(), cur.getFragmentNumber(), cur.getData().length, cur.getMessageIds(), "flushed allocated");
_context.statManager().addRateData("tunnel.writeDelay", cur.getLifetime(), cur.getData().length);
}
if (msg.getOffset() >= msg.getData().length) {
// ok, this last message fit perfectly, remove it too
TunnelGateway.Pending cur = (TunnelGateway.Pending)pending.remove(0);
+ if (timingBuf != null)
+ timingBuf.append(" sent perfect fit " + cur).append(".");
notePreprocessing(cur.getMessageId(), cur.getFragmentNumber(), msg.getData().length, msg.getMessageIds(), "flushed tail, remaining: " + pending);
_context.statManager().addRateData("tunnel.writeDelay", cur.getLifetime(), cur.getData().length);
}
if (i > 0)
_context.statManager().addRateData("tunnel.batchMultipleCount", i+1, 0);
allocated = 0;
- // don't break - we may have enough source messages for multiple full tunnel messages
- //break;
batchCount++;
+ long pendingEnd = System.currentTimeMillis();
+ if (timingBuf != null)
+ timingBuf.append(" After sending " + (i+1) + "/"+pending.size() +" in " + (afterSend-beforeSend)
+ + " after " + (beforeSend-pendingStart)
+ + " since " + (beforeSend-beforePendingLoop)
+ + "/" + (beforeSend-start)
+ + " pending current " + (pendingEnd-pendingStart)).append(".");
+ break;
}
+ if (timingBuf != null)
+ timingBuf.append(" After pending loop " + (System.currentTimeMillis()-beforePendingLoop)).append(".");
}
- display(allocated, pending, "after looping to clear " + (beforeLooping - pending.size()));
-
+ long afterCleared = System.currentTimeMillis();
+ if (_log.shouldLog(Log.INFO))
+ display(allocated, pending, "after looping to clear " + (beforeLooping - pending.size()));
+ long afterDisplayed = System.currentTimeMillis();
if (allocated > 0) {
// after going through the entire pending list, we still don't
// have enough data to send a full message
@@ -152,13 +184,26 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
_pendingSince = _context.clock().now();
_context.statManager().addRateData("tunnel.batchFlushRemaining", pending.size(), beforeSize);
display(allocated, pending, "flushed, some remain");
+
+ if (timingBuf != null) {
+ timingBuf.append(" flushed, some remain (displayed to now: " + (System.currentTimeMillis()-afterDisplayed) + ")");
+ timingBuf.append(" total time: " + (System.currentTimeMillis()-start));
+ _log.debug(timingBuf.toString());
+ }
return true;
} else {
long delayAmount = _context.clock().now() - _pendingSince;
_pendingSince = 0;
if (batchCount > 1)
_context.statManager().addRateData("tunnel.batchCount", batchCount, 0);
- display(allocated, pending, "flushed " + (beforeSize) + ", no remaining after " + delayAmount);
+ if (_log.shouldLog(Log.INFO))
+ display(allocated, pending, "flushed " + (beforeSize) + ", no remaining after " + delayAmount);
+
+ if (timingBuf != null) {
+ timingBuf.append(" flushed, none remain (displayed to now: " + (System.currentTimeMillis()-afterDisplayed) + ")");
+ timingBuf.append(" total time: " + (System.currentTimeMillis()-start));
+ _log.debug(timingBuf.toString());
+ }
return false;
}
} else {
@@ -169,17 +214,31 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
_context.statManager().addRateData("tunnel.batchCount", batchCount, 0);
// not yet time to send the delayed flush
display(allocated, pending, "dont flush");
+
+ if (timingBuf != null) {
+ timingBuf.append(" dont flush (displayed to now: " + (System.currentTimeMillis()-afterDisplayed) + ")");
+ timingBuf.append(" total time: " + (System.currentTimeMillis()-start));
+ _log.debug(timingBuf.toString());
+ }
return true;
}
} else {
// ok, we sent some, but haven't gone back for another
// pass yet. keep looping
+
+ if (timingBuf != null)
+ timingBuf.append(" Keep looping");
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sent everything on the list (pending=" + pending.size() + ")");
+ if (timingBuf != null)
+ timingBuf.append(" total time: " + (System.currentTimeMillis()-start));
+
+ if (timingBuf != null)
+ _log.debug(timingBuf.toString());
// sent everything from the pending list, no need to delayed flush
return false;
}
@@ -244,6 +303,8 @@ public class BatchedPreprocessor extends TrivialPreprocessor {
TunnelGateway.Pending cur = (TunnelGateway.Pending)pending.get(i);
cur.addMessageId(msgId);
}
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("Sent " + startAt + ":" + sendThrough + " out of " + pending + " in message " + msgId);
}
/**
diff --git a/router/java/src/net/i2p/router/tunnel/BuildMessageProcessor.java b/router/java/src/net/i2p/router/tunnel/BuildMessageProcessor.java
index 6eaa59e3ba..a21261afa4 100644
--- a/router/java/src/net/i2p/router/tunnel/BuildMessageProcessor.java
+++ b/router/java/src/net/i2p/router/tunnel/BuildMessageProcessor.java
@@ -32,12 +32,22 @@ public class BuildMessageProcessor {
Log log = ctx.logManager().getLog(getClass());
BuildRequestRecord rv = null;
int ourHop = -1;
+ long beforeActualDecrypt = 0;
+ long afterActualDecrypt = 0;
+ long totalEq = 0;
+ long totalDup = 0;
+ long beforeLoop = System.currentTimeMillis();
for (int i = 0; i < TunnelBuildMessage.RECORD_COUNT; i++) {
ByteArray rec = msg.getRecord(i);
int off = rec.getOffset();
int len = BuildRequestRecord.PEER_SIZE;
- if (DataHelper.eq(ourHash.getData(), 0, rec.getData(), off, len)) {
+ long beforeEq = System.currentTimeMillis();
+ boolean eq = DataHelper.eq(ourHash.getData(), 0, rec.getData(), off, len);
+ totalEq += System.currentTimeMillis()-beforeEq;
+ if (eq) {
+ long beforeIsDup = System.currentTimeMillis();
boolean isDup = _filter.add(rec.getData(), off + len, 32);
+ totalDup += System.currentTimeMillis()-beforeIsDup;
if (isDup) {
if (log.shouldLog(Log.WARN))
log.debug(msg.getUniqueId() + ": A record matching our hash was found, but it seems to be a duplicate");
@@ -45,7 +55,9 @@ public class BuildMessageProcessor {
return null;
}
BuildRequestRecord req = new BuildRequestRecord();
+ beforeActualDecrypt = System.currentTimeMillis();
boolean ok = req.decryptRecord(ctx, privKey, ourHash, rec);
+ afterActualDecrypt = System.currentTimeMillis();
if (ok) {
if (log.shouldLog(Log.DEBUG))
log.debug(msg.getUniqueId() + ": A record matching our hash was found and decrypted");
@@ -64,6 +76,8 @@ public class BuildMessageProcessor {
log.debug(msg.getUniqueId() + ": No records matching our hash was found");
return null;
}
+
+ long beforeEncrypt = System.currentTimeMillis();
SessionKey replyKey = rv.readReplyKey();
byte iv[] = rv.readReplyIV();
int ivOff = 0;
@@ -76,7 +90,17 @@ public class BuildMessageProcessor {
iv, ivOff, data.getValid());
}
}
+ long afterEncrypt = System.currentTimeMillis();
msg.setRecord(ourHop, null);
+ if (afterEncrypt-beforeLoop > 1000) {
+ if (log.shouldLog(Log.WARN))
+ log.warn("Slow decryption, total=" + (afterEncrypt-beforeLoop)
+ + " looping=" + (beforeEncrypt-beforeLoop)
+ + " decrypt=" + (afterActualDecrypt-beforeActualDecrypt)
+ + " eq=" + totalEq
+ + " dup=" + totalDup
+ + " encrypt=" + (afterEncrypt-beforeEncrypt));
+ }
return rv;
}
}
diff --git a/router/java/src/net/i2p/router/tunnel/OutboundReceiver.java b/router/java/src/net/i2p/router/tunnel/OutboundReceiver.java
index f3b53eb4b6..962ebf683f 100644
--- a/router/java/src/net/i2p/router/tunnel/OutboundReceiver.java
+++ b/router/java/src/net/i2p/router/tunnel/OutboundReceiver.java
@@ -40,8 +40,8 @@ class OutboundReceiver implements TunnelGateway.Receiver {
send(msg, ri);
return msg.getUniqueId();
} else {
- if (_log.shouldLog(Log.DEBUG))
- _log.debug("lookup of " + _config.getPeer(1).toBase64().substring(0,4)
+ if (_log.shouldLog(Log.ERROR))
+ _log.error("lookup of " + _config.getPeer(1).toBase64().substring(0,4)
+ " required for " + msg);
_context.netDb().lookupRouterInfo(_config.getPeer(1), new SendJob(_context, msg), new FailedJob(_context), 10*1000);
return -1;
@@ -50,7 +50,7 @@ class OutboundReceiver implements TunnelGateway.Receiver {
private void send(TunnelDataMessage msg, RouterInfo ri) {
if (_log.shouldLog(Log.DEBUG))
- _log.debug("forwarding encrypted data out " + _config + ": " + msg);
+ _log.debug("forwarding encrypted data out " + _config + ": " + msg.getUniqueId());
OutNetMessage m = new OutNetMessage(_context);
m.setMessage(msg);
m.setExpiration(msg.getMessageExpiration());
diff --git a/router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java b/router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java
new file mode 100644
index 0000000000..9335ae9ee7
--- /dev/null
+++ b/router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java
@@ -0,0 +1,145 @@
+package net.i2p.router.tunnel;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import net.i2p.I2PAppContext;
+import net.i2p.data.Hash;
+import net.i2p.data.TunnelId;
+import net.i2p.data.i2np.I2NPMessage;
+import net.i2p.data.i2np.TunnelGatewayMessage;
+import net.i2p.router.Router;
+import net.i2p.util.Log;
+import net.i2p.util.SimpleTimer;
+
+/**
+ * Serve as the gatekeeper for a tunnel, accepting messages, coallescing and/or
+ * fragmenting them before wrapping them up for tunnel delivery. The flow here
+ * is:
+ *
add an I2NPMessage (and a target tunnel/router, if necessary)
+ *
that message is queued up into a TunnelGateway.Pending and offered to the
+ * assigned QueuePreprocessor.
+ *
that QueuePreprocessor may then take off any of the TunnelGateway.Pending
+ * messages or instruct the TunnelGateway to offer it the messages again in
+ * a short while (in an attempt to coallesce them).
+ *
when the QueueProcessor accepts a TunnelGateway.Pending, it preprocesses
+ * it into fragments, forwarding each preprocessed fragment group through
+ * the Sender.
+ *
the Sender then encrypts the preprocessed data and delivers it to the
+ * Receiver.
+ *
the Receiver now has the encrypted message and may do with it as it
+ * pleases (e.g. wrap it as necessary and enqueue it onto the OutNetMessagePool,
+ * or if debugging, verify that it can be decrypted properly)
+ *
+ *
+ */
+public class PumpedTunnelGateway extends TunnelGateway {
+ private List _prequeue;
+ private TunnelGatewayPumper _pumper;
+
+ /**
+ * @param preprocessor this pulls Pending messages off a list, builds some
+ * full preprocessed messages, and pumps those into the sender
+ * @param sender this takes a preprocessed message, encrypts it, and sends it to
+ * the receiver
+ * @param receiver this receives the encrypted message and forwards it off
+ * to the first hop
+ */
+ public PumpedTunnelGateway(I2PAppContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver, TunnelGatewayPumper pumper) {
+ super(context, preprocessor, sender, receiver);
+ _prequeue = new ArrayList(4);
+ _pumper = pumper;
+ }
+
+ /**
+ * Add a message to be sent down the tunnel, either sending it now (perhaps
+ * coallesced with other pending messages) or after a brief pause (_flushFrequency).
+ * If it is queued up past its expiration, it is silently dropped
+ *
+ * @param msg message to be sent through the tunnel
+ * @param toRouter router to send to after the endpoint (or null for endpoint processing)
+ * @param toTunnel tunnel to send to after the endpoint (or null for endpoint or router processing)
+ */
+ public void add(I2NPMessage msg, Hash toRouter, TunnelId toTunnel) {
+ _messagesSent++;
+ long startAdd = System.currentTimeMillis();
+ Pending cur = new PendingImpl(msg, toRouter, toTunnel);
+ long beforeLock = System.currentTimeMillis();
+ long afterAdded = -1;
+ synchronized (_prequeue) {
+ _prequeue.add(cur);
+ afterAdded = System.currentTimeMillis();
+ }
+ _pumper.wantsPumping(this);
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("GW prequeue time: " + (System.currentTimeMillis()-beforeLock) + " for " + msg.getUniqueId() + " on " + toString());
+ }
+
+ /**
+ * run in one of the TunnelGatewayPumper's threads, this pulls pending messages
+ * off the prequeue, adds them to the queue and then tries to preprocess the queue,
+ * scheduling a later delayed flush as necessary. this allows the gw.add call to
+ * go quickly, rather than blocking its callers on potentially substantial
+ * processing.
+ */
+ void pump(List queueBuf) {
+ synchronized (_prequeue) {
+ if (_prequeue.size() > 0) {
+ queueBuf.addAll(_prequeue);
+ _prequeue.clear();
+ } else {
+ return;
+ }
+ }
+ long startAdd = System.currentTimeMillis();
+ long beforeLock = System.currentTimeMillis();
+ long afterAdded = -1;
+ boolean delayedFlush = false;
+ long delayAmount = -1;
+ int remaining = 0;
+ long afterPreprocess = 0;
+ long afterExpire = 0;
+ synchronized (_queue) {
+ _queue.addAll(queueBuf);
+ afterAdded = System.currentTimeMillis();
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("Added before direct flush preprocessing for " + toString() + ": " + _queue);
+ delayedFlush = _preprocessor.preprocessQueue(_queue, _sender, _receiver);
+ afterPreprocess = System.currentTimeMillis();
+ if (delayedFlush)
+ delayAmount = _preprocessor.getDelayAmount();
+ _lastFlush = _context.clock().now();
+
+ // expire any as necessary, even if its framented
+ for (int i = 0; i < _queue.size(); i++) {
+ Pending m = (Pending)_queue.get(i);
+ if (m.getExpiration() + Router.CLOCK_FUDGE_FACTOR < _lastFlush) {
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("Expire on the queue (size=" + _queue.size() + "): " + m);
+ _queue.remove(i);
+ i--;
+ }
+ }
+ afterExpire = System.currentTimeMillis();
+ remaining = _queue.size();
+ if ( (remaining > 0) && (_log.shouldLog(Log.DEBUG)) )
+ _log.debug("Remaining after preprocessing: " + _queue);
+ }
+
+ if (delayedFlush) {
+ FlushTimer.getInstance().addEvent(_delayedFlush, delayAmount);
+ }
+ _context.statManager().addRateData("tunnel.lockedGatewayAdd", afterAdded-beforeLock, remaining);
+ long complete = System.currentTimeMillis();
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("Time to add " + queueBuf.size() + " messages to " + toString() + ": " + (complete-startAdd)
+ + " delayed? " + delayedFlush + " remaining: " + remaining
+ + " prepare: " + (beforeLock-startAdd)
+ + " add: " + (afterAdded-beforeLock)
+ + " preprocess: " + (afterPreprocess-afterAdded)
+ + " expire: " + (afterExpire-afterPreprocess)
+ + " queue flush: " + (complete-afterExpire));
+ queueBuf.clear();
+ }
+
+}
diff --git a/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java b/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java
index 10a02dd865..d687df726f 100644
--- a/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java
+++ b/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java
@@ -24,7 +24,7 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
public static final int PREPROCESSED_SIZE = 1024;
protected static final int IV_SIZE = HopProcessor.IV_LENGTH;
- protected static final ByteCache _dataCache = ByteCache.getInstance(512, PREPROCESSED_SIZE);
+ protected static final ByteCache _dataCache = ByteCache.getInstance(32, PREPROCESSED_SIZE);
protected static final ByteCache _ivCache = ByteCache.getInstance(128, IV_SIZE);
protected static final ByteCache _hashCache = ByteCache.getInstance(128, Hash.HASH_LENGTH);
@@ -42,14 +42,30 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
*
*/
public boolean preprocessQueue(List pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
+ long begin = System.currentTimeMillis();
+ StringBuffer buf = null;
+ if (_log.shouldLog(Log.DEBUG)) {
+ buf = new StringBuffer(256);
+ buf.append("Trivial preprocessing of ").append(pending.size()).append(" ");
+ }
while (pending.size() > 0) {
TunnelGateway.Pending msg = (TunnelGateway.Pending)pending.remove(0);
+ long beforePreproc = System.currentTimeMillis();
byte preprocessed[][] = preprocess(msg);
+ long afterPreproc = System.currentTimeMillis();
+ if (buf != null)
+ buf.append("preprocessed into " + preprocessed.length + " fragments after " + (afterPreproc-beforePreproc) + ". ");
for (int i = 0; i < preprocessed.length; i++) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Preprocessed: fragment " + i + "/" + (preprocessed.length-1) + " in "
- + msg.getMessageId() + ": " + Base64.encode(preprocessed[i]));
+ + msg.getMessageId() + ": "
+ + " send through " + sender + " receive with " + rec);
+ //Base64.encode(preprocessed[i]));
+ long beforeSend = System.currentTimeMillis();
long id = sender.sendPreprocessed(preprocessed[i], rec);
+ long afterSend = System.currentTimeMillis();
+ if (buf != null)
+ buf.append("send of " + msg.getMessageId() + " took " + (afterSend-beforeSend) + ". ");
msg.addMessageId(id);
}
notePreprocessing(msg.getMessageId(), msg.getFragmentNumber(), preprocessed.length, msg.getMessageIds(), null);
@@ -58,6 +74,12 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
+ msg.getFragmentNumber() + "/" + preprocessed.length + " fragments, size = "
+ msg.getData().length);
}
+ if (buf != null)
+ buf.append("all fragments sent after " + (System.currentTimeMillis()-afterPreproc) + ". ");
+ }
+ if (buf != null) {
+ buf.append("queue preprocessed after " + (System.currentTimeMillis()-begin) + ".");
+ _log.debug(buf.toString());
}
return false;
}
@@ -69,8 +91,8 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
while (msg.getOffset() < msg.getData().length) {
fragments.add(preprocessFragment(msg));
- if (_log.shouldLog(Log.DEBUG))
- _log.debug("\n\nafter preprocessing fragment\n\n");
+ //if (_log.shouldLog(Log.DEBUG))
+ // _log.debug("\n\nafter preprocessing fragment\n\n");
}
byte rv[][] = new byte[fragments.size()][];
@@ -236,7 +258,8 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
if (_log.shouldLog(Log.DEBUG))
_log.debug("initial fragment[" + msg.getMessageId() + "/" + msg.getFragmentNumber()+ "/"
+ (PREPROCESSED_SIZE - offset - payloadLength) + "/" + payloadLength + "]: "
- + Base64.encode(target, offset, payloadLength));
+ );
+ //+ Base64.encode(target, offset, payloadLength));
offset += payloadLength;
@@ -277,7 +300,8 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
if (_log.shouldLog(Log.DEBUG))
_log.debug("subsequent fragment[" + msg.getMessageId() + "/" + msg.getFragmentNumber()+ "/"
+ offset + "/" + payloadLength + "]: "
- + Base64.encode(target, offset, payloadLength));
+ );
+ //+ Base64.encode(target, offset, payloadLength));
offset += payloadLength;
diff --git a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java
index 2805ff95a0..c7dbd82537 100644
--- a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java
+++ b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java
@@ -40,6 +40,7 @@ public class TunnelDispatcher implements Service {
private LeaveTunnel _leaveJob;
/** what is the date/time we last deliberately dropped a tunnel? **/
private long _lastDropTime;
+ private TunnelGatewayPumper _pumper;
/** Creates a new instance of TunnelDispatcher */
public TunnelDispatcher(RouterContext ctx) {
@@ -53,6 +54,7 @@ public class TunnelDispatcher implements Service {
_lastParticipatingExpiration = 0;
_lastDropTime = 0;
_validator = null;
+ _pumper = new TunnelGatewayPumper(ctx);
_leaveJob = new LeaveTunnel(ctx);
ctx.statManager().createRateStat("tunnel.participatingTunnels",
"How many tunnels are we participating in?", "Tunnels",
@@ -142,7 +144,8 @@ public class TunnelDispatcher implements Service {
TunnelGateway.QueuePreprocessor preproc = createPreprocessor(cfg);
TunnelGateway.Sender sender = new OutboundSender(_context, cfg);
TunnelGateway.Receiver receiver = new OutboundReceiver(_context, cfg);
- TunnelGateway gw = new TunnelGateway(_context, preproc, sender, receiver);
+ //TunnelGateway gw = new TunnelGateway(_context, preproc, sender, receiver);
+ TunnelGateway gw = new PumpedTunnelGateway(_context, preproc, sender, receiver, _pumper);
TunnelId outId = cfg.getConfig(0).getSendTunnel();
synchronized (_outboundGateways) {
_outboundGateways.put(outId, gw);
@@ -245,7 +248,8 @@ public class TunnelDispatcher implements Service {
TunnelGateway.QueuePreprocessor preproc = createPreprocessor(cfg);
TunnelGateway.Sender sender = new InboundSender(_context, cfg);
TunnelGateway.Receiver receiver = new InboundGatewayReceiver(_context, cfg);
- TunnelGateway gw = new TunnelGateway(_context, preproc, sender, receiver);
+ //TunnelGateway gw = new TunnelGateway(_context, preproc, sender, receiver);
+ TunnelGateway gw = new PumpedTunnelGateway(_context, preproc, sender, receiver, _pumper);
TunnelId recvId = cfg.getReceiveTunnel();
synchronized (_inboundGateways) {
_inboundGateways.put(recvId, gw);
@@ -367,7 +371,7 @@ public class TunnelDispatcher implements Service {
*
*/
public void dispatch(TunnelDataMessage msg, Hash recvFrom) {
- long before = _context.clock().now();
+ long before = System.currentTimeMillis();
TunnelParticipant participant = null;
synchronized (_participants) {
participant = (TunnelParticipant)_participants.get(msg.getTunnelIdObj());
@@ -404,7 +408,9 @@ public class TunnelDispatcher implements Service {
}
}
- long dispatchTime = _context.clock().now() - before;
+ long dispatchTime = System.currentTimeMillis() - before;
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("Dispatch data time: " + dispatchTime + " participant? " + participant);
_context.statManager().addRateData("tunnel.dispatchDataTime", dispatchTime, dispatchTime);
}
@@ -414,7 +420,7 @@ public class TunnelDispatcher implements Service {
*
*/
public void dispatch(TunnelGatewayMessage msg) {
- long before = _context.clock().now();
+ long before = System.currentTimeMillis();
TunnelGateway gw = null;
synchronized (_inboundGateways) {
gw = (TunnelGateway)_inboundGateways.get(msg.getTunnelId());
@@ -451,7 +457,10 @@ public class TunnelDispatcher implements Service {
+ " existing = " + _inboundGateways.size(), new Exception("source"));
}
- long dispatchTime = _context.clock().now() - before;
+ long dispatchTime = System.currentTimeMillis() - before;
+
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("Dispatch in gw time: " + dispatchTime + " gateway? " + gw);
_context.statManager().addRateData("tunnel.dispatchGatewayTime", dispatchTime, dispatchTime);
}
@@ -519,6 +528,9 @@ public class TunnelDispatcher implements Service {
}
long dispatchTime = _context.clock().now() - before;
+ if (dispatchTime > 1000) {
+ _log.error("wtf, took " + dispatchTime + " to dispatch " + msg + " out " + outboundTunnel + " in " + gw);
+ }
if (gw instanceof TunnelGatewayZeroHop)
_context.statManager().addRateData("tunnel.dispatchOutboundZeroHopTime", dispatchTime, dispatchTime);
else
@@ -603,6 +615,7 @@ public class TunnelDispatcher implements Service {
if (_validator != null)
_validator.destroy();
_validator = null;
+ _pumper.stopPumping();
}
public void restart() {
shutdown();
diff --git a/router/java/src/net/i2p/router/tunnel/TunnelGateway.java b/router/java/src/net/i2p/router/tunnel/TunnelGateway.java
index 0646ce870c..a9ed0cc1e7 100644
--- a/router/java/src/net/i2p/router/tunnel/TunnelGateway.java
+++ b/router/java/src/net/i2p/router/tunnel/TunnelGateway.java
@@ -34,16 +34,16 @@ import net.i2p.util.SimpleTimer;
*
*/
public class TunnelGateway {
- private I2PAppContext _context;
- private Log _log;
- private List _queue;
- private QueuePreprocessor _preprocessor;
- private Sender _sender;
- private Receiver _receiver;
- private long _lastFlush;
- private int _flushFrequency;
- private DelayedFlush _delayedFlush;
- private int _messagesSent;
+ protected I2PAppContext _context;
+ protected Log _log;
+ protected List _queue;
+ protected QueuePreprocessor _preprocessor;
+ protected Sender _sender;
+ protected Receiver _receiver;
+ protected long _lastFlush;
+ protected int _flushFrequency;
+ protected DelayedFlush _delayedFlush;
+ protected int _messagesSent;
/**
* @param preprocessor this pulls Pending messages off a list, builds some
@@ -55,7 +55,7 @@ public class TunnelGateway {
*/
public TunnelGateway(I2PAppContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver) {
_context = context;
- _log = context.logManager().getLog(TunnelGateway.class);
+ _log = context.logManager().getLog(getClass());
_queue = new ArrayList(4);
_preprocessor = preprocessor;
_sender = sender;
@@ -88,19 +88,22 @@ public class TunnelGateway {
*/
public void add(I2NPMessage msg, Hash toRouter, TunnelId toTunnel) {
_messagesSent++;
+ long startAdd = System.currentTimeMillis();
boolean delayedFlush = false;
long delayAmount = -1;
-
int remaining = 0;
- long beforeLock = _context.clock().now();
- long afterAdded = -1;
Pending cur = new PendingImpl(msg, toRouter, toTunnel);
+ long beforeLock = System.currentTimeMillis();
+ long afterAdded = -1;
+ long afterPreprocess = 0;
+ long afterExpire = 0;
synchronized (_queue) {
_queue.add(cur);
- afterAdded = _context.clock().now();
+ afterAdded = System.currentTimeMillis();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Added before direct flush preprocessing: " + _queue);
delayedFlush = _preprocessor.preprocessQueue(_queue, _sender, _receiver);
+ afterPreprocess = System.currentTimeMillis();
if (delayedFlush)
delayAmount = _preprocessor.getDelayAmount();
_lastFlush = _context.clock().now();
@@ -115,6 +118,7 @@ public class TunnelGateway {
i--;
}
}
+ afterExpire = System.currentTimeMillis();
remaining = _queue.size();
if ( (remaining > 0) && (_log.shouldLog(Log.DEBUG)) )
_log.debug("Remaining after preprocessing: " + _queue);
@@ -124,6 +128,15 @@ public class TunnelGateway {
FlushTimer.getInstance().addEvent(_delayedFlush, delayAmount);
}
_context.statManager().addRateData("tunnel.lockedGatewayAdd", afterAdded-beforeLock, remaining);
+ long complete = System.currentTimeMillis();
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("Time to add the message " + msg.getUniqueId() + ": " + (complete-startAdd)
+ + " delayed? " + delayedFlush + " remaining: " + remaining
+ + " prepare: " + (beforeLock-startAdd)
+ + " add: " + (afterAdded-beforeLock)
+ + " preprocess: " + (afterPreprocess-afterAdded)
+ + " expire: " + (afterExpire-afterPreprocess)
+ + " queue flush: " + (complete-afterExpire));
}
public int getMessagesSent() { return _messagesSent; }
@@ -163,7 +176,7 @@ public class TunnelGateway {
public static class Pending {
protected Hash _toRouter;
protected TunnelId _toTunnel;
- private long _messageId;
+ protected long _messageId;
protected long _expiration;
protected byte _remaining[];
protected int _offset;
@@ -218,14 +231,14 @@ public class TunnelGateway {
}
}
}
- private class PendingImpl extends Pending {
+ class PendingImpl extends Pending {
public PendingImpl(I2NPMessage message, Hash toRouter, TunnelId toTunnel) {
super(message, toRouter, toTunnel, _context.clock().now());
}
public String toString() {
StringBuffer buf = new StringBuffer(64);
- buf.append("Message on ");
+ buf.append("Message ").append(_messageId).append(" on ");
buf.append(TunnelGateway.this.toString());
if (_toRouter != null) {
buf.append(" targetting ");
diff --git a/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java b/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java
new file mode 100644
index 0000000000..494164726a
--- /dev/null
+++ b/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java
@@ -0,0 +1,57 @@
+package net.i2p.router.tunnel;
+
+import java.util.*;
+import net.i2p.router.RouterContext;
+import net.i2p.util.I2PThread;
+import net.i2p.util.Log;
+
+/**
+ * run through the tunnel gateways that have had messages added to them and push
+ * those messages through the preprocessing and sending process
+ */
+public class TunnelGatewayPumper implements Runnable {
+ private RouterContext _context;
+ private Log _log;
+ private List _wantsPumping;
+ private boolean _stop;
+
+ /** Creates a new instance of TunnelGatewayPumper */
+ public TunnelGatewayPumper(RouterContext ctx) {
+ _context = ctx;
+ _log = ctx.logManager().getLog(getClass());
+ _wantsPumping = new ArrayList(64);
+ _stop = false;
+ for (int i = 0; i < 4; i++)
+ new I2PThread(this, "GW pumper " + i, true).start();
+ }
+ public void stopPumping() {
+ _stop=true;
+ synchronized (_wantsPumping) { _wantsPumping.notifyAll(); }
+ }
+
+ public void wantsPumping(PumpedTunnelGateway gw) {
+ synchronized (_wantsPumping) {
+ _wantsPumping.add(gw);
+ _wantsPumping.notify();
+ }
+ }
+
+ public void run() {
+ PumpedTunnelGateway gw = null;
+ List queueBuf = new ArrayList(32);
+ while (!_stop) {
+ try {
+ synchronized (_wantsPumping) {
+ if (_wantsPumping.size() > 0)
+ gw = (PumpedTunnelGateway)_wantsPumping.remove(0);
+ else
+ _wantsPumping.wait();
+ }
+ } catch (InterruptedException ie) {}
+ if (gw != null) {
+ gw.pump(queueBuf);
+ gw = null;
+ }
+ }
+ }
+}
diff --git a/router/java/src/net/i2p/router/tunnel/pool/BuildExecutor.java b/router/java/src/net/i2p/router/tunnel/pool/BuildExecutor.java
index 45f980c943..7a6898e88e 100644
--- a/router/java/src/net/i2p/router/tunnel/pool/BuildExecutor.java
+++ b/router/java/src/net/i2p/router/tunnel/pool/BuildExecutor.java
@@ -102,8 +102,8 @@ class BuildExecutor implements Runnable {
}
}
- if (buf != null)
- _log.debug(buf.toString());
+ //if (buf != null)
+ // _log.debug(buf.toString());
_context.statManager().addRateData("tunnel.concurrentBuilds", concurrent, 0);
@@ -217,15 +217,15 @@ class BuildExecutor implements Runnable {
// allowed() also expires timed out requests (for new style requests)
int allowed = allowed();
- if (_log.shouldLog(Log.DEBUG))
- _log.debug("Allowed: " + allowed + " wanted: " + wanted);
+ //if (_log.shouldLog(Log.DEBUG))
+ // _log.debug("Allowed: " + allowed + " wanted: " + wanted);
// zero hop ones can run inline
allowed = buildZeroHopTunnels(wanted, allowed);
afterBuildZeroHop = System.currentTimeMillis();
- if (_log.shouldLog(Log.DEBUG))
- _log.debug("Zero hops built, Allowed: " + allowed + " wanted: " + wanted);
+ //if (_log.shouldLog(Log.DEBUG))
+ // _log.debug("Zero hops built, Allowed: " + allowed + " wanted: " + wanted);
int realBuilt = 0;
TunnelManagerFacade mgr = _context.tunnelManager();
@@ -243,10 +243,10 @@ class BuildExecutor implements Runnable {
if ( (allowed > 0) && (wanted.size() > 0) ) {
Collections.shuffle(wanted, _context.random());
- // force the loops to be short, since 20 consecutive tunnel build requests can take
+ // force the loops to be short, since 3 consecutive tunnel build requests can take
// a long, long time
- if (allowed > 5)
- allowed = 5;
+ if (allowed > 2)
+ allowed = 2;
for (int i = 0; (i < allowed) && (wanted.size() > 0); i++) {
TunnelPool pool = (TunnelPool)wanted.remove(0);
@@ -275,10 +275,10 @@ class BuildExecutor implements Runnable {
try {
synchronized (_currentlyBuilding) {
if (!_repoll) {
- if (_log.shouldLog(Log.DEBUG))
- _log.debug("Nothin' doin (allowed=" + allowed + ", wanted=" + wanted.size() + ", pending=" + pendingRemaining + "), wait for a while");
+ //if (_log.shouldLog(Log.DEBUG))
+ // _log.debug("Nothin' doin (allowed=" + allowed + ", wanted=" + wanted.size() + ", pending=" + pendingRemaining + "), wait for a while");
//if (allowed <= 0)
- _currentlyBuilding.wait(_context.random().nextInt(1*1000));
+ _currentlyBuilding.wait(_context.random().nextInt(2*1000));
//else // wanted <= 0
// _currentlyBuilding.wait(_context.random().nextInt(30*1000));
}
@@ -297,14 +297,14 @@ class BuildExecutor implements Runnable {
if (pendingRemaining > 0)
_context.statManager().addRateData("tunnel.pendingRemaining", pendingRemaining, afterHandleInbound-afterBuildReal);
- if (_log.shouldLog(Log.INFO))
- _log.info("build loop complete, tot=" + (afterHandleInbound-loopBegin) +
- " inReply=" + (afterHandleInboundReplies-beforeHandleInboundReplies) +
- " zeroHop=" + (afterBuildZeroHop-afterHandleInboundReplies) +
- " real=" + (afterBuildReal-afterBuildZeroHop) +
- " in=" + (afterHandleInbound-afterBuildReal) +
- " built=" + realBuilt +
- " pending=" + pendingRemaining);
+ //if (_log.shouldLog(Log.DEBUG))
+ // _log.debug("build loop complete, tot=" + (afterHandleInbound-loopBegin) +
+ // " inReply=" + (afterHandleInboundReplies-beforeHandleInboundReplies) +
+ // " zeroHop=" + (afterBuildZeroHop-afterHandleInboundReplies) +
+ // " real=" + (afterBuildReal-afterBuildZeroHop) +
+ // " in=" + (afterHandleInbound-afterBuildReal) +
+ // " built=" + realBuilt +
+ // " pending=" + pendingRemaining);
wanted.clear();
pools.clear();
diff --git a/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java b/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java
index 008fc7d831..fb45c642ac 100644
--- a/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java
+++ b/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java
@@ -65,7 +65,7 @@ class BuildHandler {
ctx.inNetMessagePool().registerHandlerJobBuilder(TunnelBuildReplyMessage.MESSAGE_TYPE, new TunnelBuildReplyMessageHandlerJobBuilder());
}
- private static final int MAX_HANDLE_AT_ONCE = 5;
+ private static final int MAX_HANDLE_AT_ONCE = 2;
private static final int NEXT_HOP_LOOKUP_TIMEOUT = 5*1000;
/**
@@ -74,7 +74,9 @@ class BuildHandler {
*/
int handleInboundRequests() {
int dropExpired = 0;
+ int remaining = 0;
List handled = null;
+ long beforeFindHandled = System.currentTimeMillis();
synchronized (_inboundBuildMessages) {
int toHandle = _inboundBuildMessages.size();
if (toHandle > 0) {
@@ -107,14 +109,18 @@ class BuildHandler {
handled.add(_inboundBuildMessages.remove(0));
}
}
+ remaining = _inboundBuildMessages.size();
}
if (handled != null) {
if (_log.shouldLog(Log.DEBUG))
- _log.debug("Handling " + handled.size() + " requests");
+ _log.debug("Handling " + handled.size() + " requests (took " + (System.currentTimeMillis()-beforeFindHandled) + "ms to find them)");
for (int i = 0; i < handled.size(); i++) {
BuildMessageState state = (BuildMessageState)handled.get(i);
- handleRequest(state);
+ long beforeHandle = System.currentTimeMillis();
+ long actualTime = handleRequest(state);
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("Handle took " + (System.currentTimeMillis()-beforeHandle) + "/" + actualTime + " (" + i + " out of " + handled.size() + " with " + remaining + " remaining)");
}
handled.clear();
}
@@ -140,12 +146,15 @@ class BuildHandler {
}
// anything else?
+ /*
synchronized (_inboundBuildMessages) {
int remaining = _inboundBuildMessages.size();
- if (remaining > 0)
- _context.statManager().addRateData("tunnel.handleRemaining", remaining, 0);
return remaining;
}
+ */
+ if (remaining > 0)
+ _context.statManager().addRateData("tunnel.handleRemaining", remaining, 0);
+ return remaining;
}
void handleInboundReplies() {
@@ -273,7 +282,7 @@ class BuildHandler {
}
}
- private void handleRequest(BuildMessageState state) {
+ private long handleRequest(BuildMessageState state) {
long timeSinceReceived = System.currentTimeMillis()-state.recvTime;
if (_log.shouldLog(Log.DEBUG))
_log.debug(state.msg.getUniqueId() + ": handling request after " + timeSinceReceived);
@@ -284,7 +293,7 @@ class BuildHandler {
_log.warn("Not even trying to handle/decrypt the request " + state.msg.getUniqueId()
+ ", since we received it a long time ago: " + timeSinceReceived);
_context.statManager().addRateData("tunnel.dropLoadDelay", timeSinceReceived, 0);
- return;
+ return -1;
}
// ok, this is not our own tunnel, so we need to do some heavy lifting
// this not only decrypts the current hop's record, but encrypts the other records
@@ -293,24 +302,37 @@ class BuildHandler {
BuildRequestRecord req = _processor.decrypt(_context, state.msg, _context.routerHash(), _context.keyManager().getPrivateKey());
long decryptTime = System.currentTimeMillis() - beforeDecrypt;
_context.statManager().addRateData("tunnel.decryptRequestTime", decryptTime, decryptTime);
+ if (decryptTime > 500)
+ _log.warn("Took too long to decrypt the request: " + decryptTime + " for message " + state.msg.getUniqueId() + " received " + (timeSinceReceived+decryptTime) + " ago");
if (req == null) {
// no records matched, or the decryption failed. bah
if (_log.shouldLog(Log.WARN))
_log.warn("The request " + state.msg.getUniqueId() + " could not be decrypted");
- return;
+ return -1;
}
+ long beforeLookup = System.currentTimeMillis();
Hash nextPeer = req.readNextIdentity();
+ long readPeerTime = System.currentTimeMillis()-beforeLookup;
RouterInfo nextPeerInfo = _context.netDb().lookupRouterInfoLocally(nextPeer);
+ long lookupTime = System.currentTimeMillis()-beforeLookup;
+ if (lookupTime > 500)
+ _log.warn("Took too long to lookup the request: " + lookupTime + "/" + readPeerTime + " for message " + state.msg.getUniqueId() + " received " + (timeSinceReceived+decryptTime) + " ago");
if (nextPeerInfo == null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Request " + state.msg.getUniqueId() + "/" + req.readReceiveTunnelId() + "/" + req.readNextTunnelId()
+ " handled, looking for the next peer " + nextPeer.toBase64());
_context.netDb().lookupRouterInfo(nextPeer, new HandleReq(_context, state, req, nextPeer), new TimeoutReq(_context, state, req, nextPeer), NEXT_HOP_LOOKUP_TIMEOUT);
+ return -1;
} else {
- if (_log.shouldLog(Log.DEBUG))
- _log.debug("Request " + state.msg.getUniqueId() + " handled and we know the next peer " + nextPeer.toBase64());
+ long beforeHandle = System.currentTimeMillis();
handleReq(nextPeerInfo, state, req, nextPeer);
+ long handleTime = System.currentTimeMillis() - beforeHandle;
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("Request " + state.msg.getUniqueId() + " handled and we know the next peer "
+ + nextPeer.toBase64() + " after " + handleTime
+ + "/" + decryptTime + "/" + lookupTime + "/" + timeSinceReceived);
+ return handleTime;
}
}
@@ -488,9 +510,9 @@ class BuildHandler {
if (state.msg.getRecord(j) == null) {
ourSlot = j;
state.msg.setRecord(j, new ByteArray(reply));
- if (_log.shouldLog(Log.DEBUG))
- _log.debug("Full reply record for slot " + ourSlot + "/" + ourId + "/" + nextId + "/" + req.readReplyMessageId()
- + ": " + Base64.encode(reply));
+ //if (_log.shouldLog(Log.DEBUG))
+ // _log.debug("Full reply record for slot " + ourSlot + "/" + ourId + "/" + nextId + "/" + req.readReplyMessageId()
+ // + ": " + Base64.encode(reply));
break;
}
}
@@ -579,7 +601,7 @@ class BuildHandler {
_log.debug("Receive tunnel build message " + reqId + " from "
+ (from != null ? from.calculateHash().toBase64() : fromHash != null ? fromHash.toBase64() : "tunnels")
+ ", waiting ids: " + ids + ", found matching tunnel? " + (cfg != null),
- new Exception("source"));
+ null);//new Exception("source"));
if (cfg != null) {
BuildEndMessageState state = new BuildEndMessageState(cfg, receivedMessage, from, fromHash);
if (HANDLE_REPLIES_INLINE) {
diff --git a/router/java/src/net/i2p/router/tunnel/pool/BuildRequestor.java b/router/java/src/net/i2p/router/tunnel/pool/BuildRequestor.java
index cd117d17df..d1eb1e0772 100644
--- a/router/java/src/net/i2p/router/tunnel/pool/BuildRequestor.java
+++ b/router/java/src/net/i2p/router/tunnel/pool/BuildRequestor.java
@@ -92,7 +92,9 @@ class BuildRequestor {
return;
}
+ long beforeCreate = System.currentTimeMillis();
TunnelBuildMessage msg = createTunnelBuildMessage(ctx, pool, cfg, pairedTunnel, exec);
+ long createTime = System.currentTimeMillis()-beforeCreate;
if (msg == null) {
if (log.shouldLog(Log.ERROR))
log.error("Tunnel build failed, as we couldn't create the tunnel build message for " + cfg);
@@ -102,9 +104,10 @@ class BuildRequestor {
cfg.setPairedTunnel(pairedTunnel);
+ long beforeDispatch = System.currentTimeMillis();
if (cfg.isInbound()) {
if (log.shouldLog(Log.DEBUG))
- log.debug("Sending the tunnel build request out the tunnel " + pairedTunnel + " to "
+ log.debug("Sending the tunnel build request " + msg.getUniqueId() + " out the tunnel " + pairedTunnel + " to "
+ cfg.getPeer(0).toBase64() + " for " + cfg + " waiting for the reply of "
+ cfg.getReplyMessageId());
// send it out a tunnel targetting the first hop
@@ -129,6 +132,9 @@ class BuildRequestor {
outMsg.setTarget(peer);
ctx.outNetMessagePool().add(outMsg);
}
+ if (log.shouldLog(Log.DEBUG))
+ log.debug("Tunnel build message " + msg.getUniqueId() + " created in " + createTime
+ + "ms and dispatched in " + (System.currentTimeMillis()-beforeDispatch));
}
private static TunnelBuildMessage createTunnelBuildMessage(RouterContext ctx, TunnelPool pool, PooledTunnelCreatorConfig cfg, TunnelInfo pairedTunnel, BuildExecutor exec) {
diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java
index ad91b74ff1..92b772b958 100644
--- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java
+++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java
@@ -510,19 +510,22 @@ public class TunnelPoolManager implements TunnelManagerFacade {
processedOut += info.getProcessedMessagesCount();
}
out.write("\n");
- List pending = in.listPending();
- for (int i = 0; i < pending.size(); i++) {
- TunnelInfo info = (TunnelInfo)pending.get(i);
- out.write("In progress: " + info.toString() + " \n");
+ if (in != null) {
+ List pending = in.listPending();
+ for (int i = 0; i < pending.size(); i++) {
+ TunnelInfo info = (TunnelInfo)pending.get(i);
+ out.write("In progress: " + info.toString() + " \n");
+ }
+ live += pending.size();
}
- live += pending.size();
- pending = outPool.listPending();
- for (int i = 0; i < pending.size(); i++) {
- TunnelInfo info = (TunnelInfo)pending.get(i);
- out.write("In progress: " + info.toString() + " \n");
+ if (outPool != null) {
+ List pending = outPool.listPending();
+ for (int i = 0; i < pending.size(); i++) {
+ TunnelInfo info = (TunnelInfo)pending.get(i);
+ out.write("In progress: " + info.toString() + " \n");
+ }
+ live += pending.size();
}
- live += pending.size();
-
if (live <= 0)
out.write("No tunnels, waiting for the grace period to end \n");
out.write("Lifetime bandwidth usage: " + processedIn + "KB in, " + processedOut + "KB out ");