From df4143f036a08c67600ef54e1ac511ba98b8b101 Mon Sep 17 00:00:00 2001 From: zzz Date: Tue, 25 Aug 2009 13:12:24 +0000 Subject: [PATCH 01/14] * NetDb: - oops, store leaseset locally even when shutting down (fix -16) - Java 5 cleanups * PRNG: - Rename config option to prng.buffers (was router.prng.buffers) - Change the default from 16 to 2 for I2PAppContext (saves 3.5MB) * TunnelPool: - Don't test tunnels when shutting down - Less rates - Java 5 cleanups --- android/res/raw/router_config | 2 +- .../crypto/prng/AsyncFortunaStandalone.java | 8 ++- .../src/net/i2p/client/I2PSessionImpl2.java | 2 + router/java/src/net/i2p/router/Router.java | 1 + .../src/net/i2p/router/RouterContext.java | 11 +++- .../KademliaNetworkDatabaseFacade.java | 18 ++--- .../net/i2p/router/tunnel/pool/TestJob.java | 17 +++-- .../router/tunnel/pool/TunnelPoolManager.java | 66 ++++++++++--------- 8 files changed, 75 insertions(+), 50 deletions(-) diff --git a/android/res/raw/router_config b/android/res/raw/router_config index cf63ed56a..3cc52bd63 100644 --- a/android/res/raw/router_config +++ b/android/res/raw/router_config @@ -3,7 +3,7 @@ i2p.dir.temp=/data/data/net.i2p.router/files/tmp i2p.dir.pid=/data/data/net.i2p.router/files/tmp # save memory -router.prng.buffers=2 +prng.buffers=2 router.decayingBloomFilterM=20 stat.full=false i2np.udp.maxConnections=30 diff --git a/core/java/src/gnu/crypto/prng/AsyncFortunaStandalone.java b/core/java/src/gnu/crypto/prng/AsyncFortunaStandalone.java index 0b1f88765..c37ada8bc 100644 --- a/core/java/src/gnu/crypto/prng/AsyncFortunaStandalone.java +++ b/core/java/src/gnu/crypto/prng/AsyncFortunaStandalone.java @@ -12,7 +12,11 @@ import net.i2p.util.Log; * has been eaten) */ public class AsyncFortunaStandalone extends FortunaStandalone implements Runnable { - private static final int DEFAULT_BUFFERS = 16; + /** + * This is set to 2 to minimize memory usage for standalone apps. + * The router must override this via the prng.buffers property in the router context. + */ + private static final int DEFAULT_BUFFERS = 2; private static final int BUFSIZE = 256*1024; private int _bufferCount; private final byte asyncBuffers[][]; @@ -28,7 +32,7 @@ public class AsyncFortunaStandalone extends FortunaStandalone implements Runnabl public AsyncFortunaStandalone(I2PAppContext context) { super(); - _bufferCount = context.getProperty("router.prng.buffers", DEFAULT_BUFFERS); + _bufferCount = Math.max(context.getProperty("prng.buffers", DEFAULT_BUFFERS), 2); asyncBuffers = new byte[_bufferCount][BUFSIZE]; status = new int[_bufferCount]; for (int i = 0; i < _bufferCount; i++) diff --git a/core/java/src/net/i2p/client/I2PSessionImpl2.java b/core/java/src/net/i2p/client/I2PSessionImpl2.java index 508057c2c..981ccfae2 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl2.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl2.java @@ -91,6 +91,8 @@ class I2PSessionImpl2 extends I2PSessionImpl { * Perhaps the http server (which does its own compression) * and P2P apps (with generally uncompressible data) should * set to false. + * + * Todo: don't compress if destination is local? */ private static final int DONT_COMPRESS_SIZE = 66; protected boolean shouldCompress(int size) { diff --git a/router/java/src/net/i2p/router/Router.java b/router/java/src/net/i2p/router/Router.java index 1b20bda9d..6e52f2bc5 100644 --- a/router/java/src/net/i2p/router/Router.java +++ b/router/java/src/net/i2p/router/Router.java @@ -330,6 +330,7 @@ public class Router { _context.blocklist().startup(); // let the timestamper get us sync'ed + // this will block for quite a while on a disconnected machine long before = System.currentTimeMillis(); _context.clock().getTimestamper().waitForInitialization(); long waited = System.currentTimeMillis() - before; diff --git a/router/java/src/net/i2p/router/RouterContext.java b/router/java/src/net/i2p/router/RouterContext.java index 467338eb2..2a3410348 100644 --- a/router/java/src/net/i2p/router/RouterContext.java +++ b/router/java/src/net/i2p/router/RouterContext.java @@ -75,19 +75,28 @@ public class RouterContext extends I2PAppContext { //initAll(); _contexts.add(this); } + /** + * Set properties where the defaults must be different from those + * in I2PAppContext. + * * Unless we are explicitly disabling the timestamper, we want to use it. * We need this now as the new timestamper default is disabled (so we don't * have each I2PAppContext creating their own SNTP queries all the time) * + * Set more PRNG buffers, as the default is now small for the I2PAppContext. + * */ - static final Properties filterProps(Properties envProps) { + private static final Properties filterProps(Properties envProps) { if (envProps == null) envProps = new Properties(); if (envProps.getProperty("time.disabled") == null) envProps.setProperty("time.disabled", "false"); + if (envProps.getProperty("prng.buffers") == null) + envProps.setProperty("prng.buffers", "16"); return envProps; } + public void initAll() { if ("false".equals(getProperty("i2p.dummyClientFacade", "false"))) _clientManagerFacade = new ClientManagerFacadeImpl(this); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java index ff53f7d15..aafd45e7e 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java @@ -56,7 +56,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { private DataStore _ds; // hash to DataStructure mapping, persisted when necessary /** where the data store is pushing the data */ private String _dbDir; - private final Set _exploreKeys = new HashSet(64); // set of Hash objects that we should search on (to fill up a bucket, not to get data) + private final Set _exploreKeys = new HashSet(64); // set of Hash objects that we should search on (to fill up a bucket, not to get data) private boolean _initialized; /** Clock independent time of when we started up */ private long _started; @@ -72,7 +72,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { * removed when the job decides to stop running. * */ - private final Map _publishingLeaseSets; + private final Map _publishingLeaseSets; /** * Hash of the key currently being searched for, pointing the SearchJob that @@ -80,7 +80,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { * added on to the list of jobs fired on success/failure * */ - private final Map _activeRequests; + private final Map _activeRequests; /** * The search for the given key is no longer active @@ -160,7 +160,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { _exploreJob.updateExploreSchedule(); } - public Set getExploreKeys() { + public Set getExploreKeys() { if (!_initialized) return null; synchronized (_exploreKeys) { return new HashSet(_exploreKeys); @@ -302,12 +302,12 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { /** * Get the routers closest to that key in response to a remote lookup */ - public Set findNearestRouters(Hash key, int maxNumRouters, Set peersToIgnore) { + public Set findNearestRouters(Hash key, int maxNumRouters, Set peersToIgnore) { if (!_initialized) return null; return getRouters(_peerSelector.selectNearest(key, maxNumRouters, peersToIgnore, _kb)); } - private Set getRouters(Collection hashes) { + private Set getRouters(Collection hashes) { if (!_initialized) return null; Set rv = new HashSet(hashes.size()); for (Iterator iter = hashes.iterator(); iter.hasNext(); ) { @@ -481,8 +481,6 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { private static final long PUBLISH_DELAY = 3*1000; public void publish(LeaseSet localLeaseSet) { if (!_initialized) return; - if (_context.router().gracefulShutdownInProgress()) - return; Hash h = localLeaseSet.getDestination().calculateHash(); try { store(h, localLeaseSet); @@ -492,6 +490,8 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { } if (!_context.clientManager().shouldPublishLeaseSet(h)) return; + if (_context.router().gracefulShutdownInProgress()) + return; RepublishLeaseSetJob j = null; synchronized (_publishingLeaseSets) { @@ -855,7 +855,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { } return leases; } - private Set getRouters() { + private Set getRouters() { if (!_initialized) return null; Set routers = new HashSet(); Set keys = getDataStore().getKeys(); diff --git a/router/java/src/net/i2p/router/tunnel/pool/TestJob.java b/router/java/src/net/i2p/router/tunnel/pool/TestJob.java index c0633ef9f..fc79c87f2 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TestJob.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TestJob.java @@ -31,6 +31,7 @@ class TestJob extends JobImpl { /** base to randomize the test delay on */ private static final int TEST_DELAY = 30*1000; + private static final long[] RATES = { 60*1000, 10*60*1000l, 60*60*1000l }; public TestJob(RouterContext ctx, PooledTunnelCreatorConfig cfg, TunnelPool pool) { super(ctx); @@ -43,19 +44,19 @@ class TestJob extends JobImpl { _log.error("Invalid tunnel test configuration: no pool for " + cfg, new Exception("origin")); getTiming().setStartAfter(getDelay() + ctx.clock().now()); ctx.statManager().createRateStat("tunnel.testFailedTime", "How long did the failure take (max of 60s for full timeout)?", "Tunnels", - new long[] { 60*1000, 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l }); + RATES); ctx.statManager().createRateStat("tunnel.testExploratoryFailedTime", "How long did the failure of an exploratory tunnel take (max of 60s for full timeout)?", "Tunnels", - new long[] { 60*1000, 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l }); + RATES); ctx.statManager().createRateStat("tunnel.testFailedCompletelyTime", "How long did the complete failure take (max of 60s for full timeout)?", "Tunnels", - new long[] { 60*1000, 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l }); + RATES); ctx.statManager().createRateStat("tunnel.testExploratoryFailedCompletelyTime", "How long did the complete failure of an exploratory tunnel take (max of 60s for full timeout)?", "Tunnels", - new long[] { 60*1000, 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l }); + RATES); ctx.statManager().createRateStat("tunnel.testSuccessLength", "How long were the tunnels that passed the test?", "Tunnels", - new long[] { 60*1000, 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l }); + RATES); ctx.statManager().createRateStat("tunnel.testSuccessTime", "How long did tunnel testing take?", "Tunnels", - new long[] { 60*1000, 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l }); + RATES); ctx.statManager().createRateStat("tunnel.testAborted", "Tunnel test could not occur, since there weren't any tunnels to test with", "Tunnels", - new long[] { 60*1000, 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l }); + RATES); } public String getName() { return "Test tunnel"; } public void runJob() { @@ -69,6 +70,8 @@ class TestJob extends JobImpl { scheduleRetest(); return; } + if (getContext().router().gracefulShutdownInProgress()) + return; // don't reschedule _found = false; // note: testing with exploratory tunnels always, even if the tested tunnel // is a client tunnel (per _cfg.getDestination()) diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java index 6bd7ba646..a83b78aaa 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java @@ -37,9 +37,9 @@ public class TunnelPoolManager implements TunnelManagerFacade { private RouterContext _context; private Log _log; /** Hash (destination) to TunnelPool */ - private final Map _clientInboundPools; + private final Map _clientInboundPools; /** Hash (destination) to TunnelPool */ - private final Map _clientOutboundPools; + private final Map _clientOutboundPools; private TunnelPool _inboundExploratory; private TunnelPool _outboundExploratory; private BuildExecutor _executor; @@ -90,7 +90,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { if (destination == null) return selectInboundTunnel(); TunnelPool pool = null; synchronized (_clientInboundPools) { - pool = (TunnelPool)_clientInboundPools.get(destination); + pool = _clientInboundPools.get(destination); } if (pool != null) { return pool.selectTunnel(); @@ -119,7 +119,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { if (destination == null) return selectOutboundTunnel(); TunnelPool pool = null; synchronized (_clientOutboundPools) { - pool = (TunnelPool)_clientOutboundPools.get(destination); + pool = _clientOutboundPools.get(destination); } if (pool != null) { return pool.selectTunnel(); @@ -130,8 +130,8 @@ public class TunnelPoolManager implements TunnelManagerFacade { public TunnelInfo getTunnelInfo(TunnelId id) { TunnelInfo info = null; synchronized (_clientInboundPools) { - for (Iterator iter = _clientInboundPools.values().iterator(); iter.hasNext(); ) { - TunnelPool pool = (TunnelPool)iter.next(); + for (Iterator iter = _clientInboundPools.values().iterator(); iter.hasNext(); ) { + TunnelPool pool = iter.next(); info = pool.getTunnel(id); if (info != null) return info; @@ -166,7 +166,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { Hash client = (Hash)destinations.get(i); TunnelPool pool = null; synchronized (_clientInboundPools) { - pool = (TunnelPool)_clientInboundPools.get(client); + pool = _clientInboundPools.get(client); } count += pool.listTunnels().size(); } @@ -182,7 +182,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { Hash client = (Hash)destinations.get(i); TunnelPool pool = null; synchronized (_clientOutboundPools) { - pool = (TunnelPool)_clientOutboundPools.get(client); + pool = _clientOutboundPools.get(client); } count += pool.listTunnels().size(); } @@ -196,9 +196,9 @@ public class TunnelPoolManager implements TunnelManagerFacade { return false; TunnelPool pool; if (tunnel.isInbound()) - pool = (TunnelPool)_clientInboundPools.get(client); + pool = _clientInboundPools.get(client); else - pool = (TunnelPool)_clientOutboundPools.get(client); + pool = _clientOutboundPools.get(client); if (pool == null) return false; return pool.listTunnels().contains(tunnel); @@ -211,7 +211,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { public TunnelPoolSettings getInboundSettings(Hash client) { TunnelPool pool = null; synchronized (_clientInboundPools) { - pool = (TunnelPool)_clientInboundPools.get(client); + pool = _clientInboundPools.get(client); } if (pool != null) return pool.getSettings(); @@ -221,7 +221,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { public TunnelPoolSettings getOutboundSettings(Hash client) { TunnelPool pool = null; synchronized (_clientOutboundPools) { - pool = (TunnelPool)_clientOutboundPools.get(client); + pool = _clientOutboundPools.get(client); } if (pool != null) return pool.getSettings(); @@ -234,10 +234,10 @@ public class TunnelPoolManager implements TunnelManagerFacade { public void setOutboundSettings(Hash client, TunnelPoolSettings settings) { setSettings(_clientOutboundPools, client, settings); } - private void setSettings(Map pools, Hash client, TunnelPoolSettings settings) { + private void setSettings(Map pools, Hash client, TunnelPoolSettings settings) { TunnelPool pool = null; synchronized (pools) { - pool = (TunnelPool)pools.get(client); + pool = pools.get(client); } if (pool != null) { settings.setDestination(client); // prevent spoofing or unset dest @@ -260,7 +260,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { TunnelPool outbound = null; // should we share the clientPeerSelector across both inbound and outbound? synchronized (_clientInboundPools) { - inbound = (TunnelPool)_clientInboundPools.get(dest); + inbound = _clientInboundPools.get(dest); if (inbound == null) { inbound = new TunnelPool(_context, this, settings.getInboundSettings(), new ClientPeerSelector()); @@ -270,7 +270,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { } } synchronized (_clientOutboundPools) { - outbound = (TunnelPool)_clientOutboundPools.get(dest); + outbound = _clientOutboundPools.get(dest); if (outbound == null) { outbound = new TunnelPool(_context, this, settings.getOutboundSettings(), new ClientPeerSelector()); @@ -294,10 +294,10 @@ public class TunnelPoolManager implements TunnelManagerFacade { TunnelPool inbound = null; TunnelPool outbound = null; synchronized (_clientInboundPools) { - inbound = (TunnelPool)_clientInboundPools.remove(destination); + inbound = _clientInboundPools.remove(destination); } synchronized (_clientOutboundPools) { - outbound = (TunnelPool)_clientOutboundPools.remove(destination); + outbound = _clientOutboundPools.remove(destination); } if (inbound != null) inbound.shutdown(); @@ -305,20 +305,24 @@ public class TunnelPoolManager implements TunnelManagerFacade { outbound.shutdown(); } + /** queue a recurring test job if appropriate */ void buildComplete(PooledTunnelCreatorConfig cfg) { - buildComplete(); - if (cfg.getLength() > 1) { + //buildComplete(); + if (cfg.getLength() > 1 && + !_context.router().gracefulShutdownInProgress()) { TunnelPool pool = cfg.getTunnelPool(); if (pool == null) { + // never seen this before, do we reallly need to bother + // trying so hard to find his pool? _log.error("How does this not have a pool? " + cfg, new Exception("baf")); if (cfg.getDestination() != null) { if (cfg.isInbound()) { synchronized (_clientInboundPools) { - pool = (TunnelPool)_clientInboundPools.get(cfg.getDestination()); + pool = _clientInboundPools.get(cfg.getDestination()); } } else { synchronized (_clientOutboundPools) { - pool = (TunnelPool)_clientOutboundPools.get(cfg.getDestination()); + pool = _clientOutboundPools.get(cfg.getDestination()); } } } else { @@ -333,6 +337,8 @@ public class TunnelPoolManager implements TunnelManagerFacade { _context.jobQueue().addJob(new TestJob(_context, cfg, pool)); } } + + /** ?? */ void buildComplete() {} public void startup() { @@ -384,7 +390,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { } /** list of TunnelPool instances currently in play */ - void listPools(List out) { + void listPools(List out) { synchronized (_clientInboundPools) { out.addAll(_clientInboundPools.values()); } @@ -407,19 +413,19 @@ public class TunnelPoolManager implements TunnelManagerFacade { out.write("

Exploratory tunnels (config):

\n"); renderPool(out, _inboundExploratory, _outboundExploratory); - List destinations = null; + List destinations = null; synchronized (_clientInboundPools) { destinations = new ArrayList(_clientInboundPools.keySet()); } for (int i = 0; i < destinations.size(); i++) { - Hash client = (Hash)destinations.get(i); + Hash client = destinations.get(i); TunnelPool in = null; TunnelPool outPool = null; synchronized (_clientInboundPools) { - in = (TunnelPool)_clientInboundPools.get(client); + in = _clientInboundPools.get(client); } synchronized (_clientOutboundPools) { - outPool = (TunnelPool)_clientOutboundPools.get(client); + outPool = _clientOutboundPools.get(client); } String name = (in != null ? in.getSettings().getDestinationNickname() : null); if ( (name == null) && (outPool != null) ) @@ -505,7 +511,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { } private void renderPool(Writer out, TunnelPool in, TunnelPool outPool) throws IOException { - List tunnels = null; + List tunnels = null; if (in == null) tunnels = new ArrayList(); else @@ -519,7 +525,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { int live = 0; int maxLength = 1; for (int i = 0; i < tunnels.size(); i++) { - TunnelInfo info = (TunnelInfo)tunnels.get(i); + TunnelInfo info = tunnels.get(i); if (info.getLength() > maxLength) maxLength = info.getLength(); } @@ -536,7 +542,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { } out.write("\n"); for (int i = 0; i < tunnels.size(); i++) { - TunnelInfo info = (TunnelInfo)tunnels.get(i); + TunnelInfo info = tunnels.get(i); long timeLeft = info.getExpiration()-_context.clock().now(); if (timeLeft <= 0) continue; // don't display tunnels in their grace period From 95aba0c537b44b76bf36cd0fd6d4e3b7809187be Mon Sep 17 00:00:00 2001 From: zzz Date: Wed, 26 Aug 2009 22:15:32 +0000 Subject: [PATCH 02/14] * EepGet, I2PSnark: - New I2PSocketEepGet fetches through existing tunnels rather than through the proxy - Use new eepget for i2psnark - Add a fake user agent for non-proxied fetches - Cleanups --- .../src/org/klomp/snark/I2PSnarkUtil.java | 9 +- .../org/klomp/snark/web/I2PSnarkServlet.java | 8 +- .../i2p/i2ptunnel/I2PTunnelHTTPClient.java | 2 +- .../i2p/client/streaming/I2PSocketEepGet.java | 242 ++++++++++++++++++ .../client/streaming/ConnectionOptions.java | 1 + core/java/src/net/i2p/util/EepGet.java | 48 ++-- .../src/net/i2p/util/EepGetScheduler.java | 2 +- core/java/src/net/i2p/util/EepHead.java | 4 +- core/java/src/net/i2p/util/SocketTimeout.java | 11 +- 9 files changed, 297 insertions(+), 30 deletions(-) create mode 100644 apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketEepGet.java diff --git a/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java b/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java index aca5fb69e..1069bae61 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java +++ b/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java @@ -17,6 +17,7 @@ import net.i2p.I2PException; import net.i2p.client.I2PSession; import net.i2p.client.streaming.I2PServerSocket; import net.i2p.client.streaming.I2PSocket; +import net.i2p.client.streaming.I2PSocketEepGet; import net.i2p.client.streaming.I2PSocketManager; import net.i2p.client.streaming.I2PSocketManagerFactory; import net.i2p.data.DataFormatException; @@ -231,7 +232,13 @@ public class I2PSnarkUtil { if (rewrite) fetchURL = rewriteAnnounce(url); //_log.debug("Rewritten url [" + fetchURL + "]"); - EepGet get = new EepGet(_context, _shouldProxy, _proxyHost, _proxyPort, retries, out.getAbsolutePath(), fetchURL); + //EepGet get = new EepGet(_context, _shouldProxy, _proxyHost, _proxyPort, retries, out.getAbsolutePath(), fetchURL); + // Use our tunnel for announces and .torrent fetches too! Make sure we're connected first... + if (!connected()) { + if (!connect()) + return null; + } + EepGet get = new I2PSocketEepGet(_context, _manager, retries, out.getAbsolutePath(), fetchURL); if (get.fetch()) { _log.debug("Fetch successful [" + url + "]: size=" + out.length()); return out; diff --git a/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java b/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java index 696cdbe44..b6f688110 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java +++ b/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java @@ -751,10 +751,10 @@ public class I2PSnarkServlet extends HttpServlet { + openTrackers + "\" size=\"50\" />
\n"); //out.write("\n"); - out.write("EepProxy host: "); - out.write("port:
\n"); + //out.write("EepProxy host: "); + //out.write("port:
\n"); out.write("I2CP host: "); out.write("port: diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketEepGet.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketEepGet.java new file mode 100644 index 000000000..5cc8d694f --- /dev/null +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketEepGet.java @@ -0,0 +1,242 @@ +package net.i2p.client.streaming; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.MalformedURLException; +import java.net.UnknownHostException; +import java.net.URL; +import java.util.Properties; + +import net.i2p.I2PAppContext; +import net.i2p.I2PException; +import net.i2p.data.DataHelper; +import net.i2p.data.Destination; +import net.i2p.util.EepGet; +import net.i2p.util.SocketTimeout; + +/** + * Fetch a URL using a socket from the supplied I2PSocketManager. + * Hostname must resolve to an i2p destination - no routing to an outproxy. + * Does not support response gzip decompression (unlike I2PTunnelHTTPProxy) (yet), + * but of course there is still gzip at the I2CP layer. + * + * This is designed for Java apps such as bittorrent clients that wish to + * do HTTP fetches and use other protocols on a single set of tunnels. + * This may provide anonymity benefits over using the shared clients HTTP proxy, + * preventing inadvertent outproxy usage, reduce resource usage by eliminating + * a second set of tunnels, and eliminate the requirement to + * to separately configure the proxy host and port. + * + * For additional documentation see the superclass. + * + * Supports http://example.i2p/blah + * Supports http://B32KEY.b32.i2p/blah + * Supports http://i2p/B64KEY/blah for compatibility with the eepproxy + * Supports http://B64KEY/blah for compatibility with the eepproxy + * Warning - does not support /eepproxy/blah, address helpers, http://B64KEY.i2p/blah, + * or other odd things that may be found in the HTTP proxy. + * + * @author zzz + */ +public class I2PSocketEepGet extends EepGet { + private I2PSocketManager _socketManager; + /** this replaces _proxy in the superclass. Sadly, I2PSocket does not extend Socket. */ + private I2PSocket _socket; + + public I2PSocketEepGet(I2PAppContext ctx, I2PSocketManager mgr, int numRetries, String outputFile, String url) { + this(ctx, mgr, numRetries, -1, -1, outputFile, null, url); + } + + public I2PSocketEepGet(I2PAppContext ctx, I2PSocketManager mgr, int numRetries, long minSize, long maxSize, + String outputFile, OutputStream outputStream, String url) { + // we're using this constructor: + // public EepGet(I2PAppContext ctx, boolean shouldProxy, String proxyHost, int proxyPort, int numRetries, long minSize, long maxSize, String outputFile, OutputStream outputStream, String url, boolean allowCaching, String etag, String postData) { + super(ctx, false, null, -1, numRetries, minSize, maxSize, outputFile, outputStream, url, true, null, null); + _socketManager = mgr; + _log = ctx.logManager().getLog(I2PSocketEepGet.class); + } + + /** + * We have to override this to close _socket, since we can't use _proxy in super as the I2PSocket. + */ + @Override + public boolean fetch(long fetchHeaderTimeout, long totalTimeout, long inactivityTimeout) { + boolean rv = super.fetch(fetchHeaderTimeout, totalTimeout, inactivityTimeout); + if (_socket != null) { + try { + _socket.close(); + _socket = null; + } catch (IOException ioe) {} + } + return rv; + } + + /** + * Look up the address, get a socket from the I2PSocketManager supplied in the constructor, + * and send the request. + * + * @param timeout ignored + */ + @Override + protected void sendRequest(SocketTimeout timeout) throws IOException { + if (_outputStream == null) { + File outFile = new File(_outputFile); + if (outFile.exists()) + _alreadyTransferred = outFile.length(); + } + + if (_proxyIn != null) try { _proxyIn.close(); } catch (IOException ioe) {} + if (_proxyOut != null) try { _proxyOut.close(); } catch (IOException ioe) {} + if (_socket != null) try { _socket.close(); } catch (IOException ioe) {} + + try { + URL url = new URL(_actualURL); + if ("http".equals(url.getProtocol())) { + String host = url.getHost(); + int port = url.getPort(); + if (port != -1) + throw new IOException("Ports not supported in i2p: " + _actualURL); + + // HTTP Proxy compatibility http://i2p/B64KEY/blah + // Rewrite the url to strip out the /i2p/, + // as the naming service accepts B64KEY (but not B64KEY.i2p atm) + if ("i2p".equals(host)) { + String file = url.getFile(); + try { + int slash = 1 + file.substring(1).indexOf("/"); + host = file.substring(1, slash); + _actualURL = "http:/" + file.substring(slash); // get the extra slash from the substring + } catch (IndexOutOfBoundsException ioobe) { + throw new IOException("Bad /i2p/ format: " + _actualURL); + } + } + + Destination dest = _context.namingService().lookup(host); + if (dest == null) + throw new UnknownHostException("Unknown or non-i2p host"); + + // Set the timeouts, using the other existing options in the socket manager + // This currently duplicates what SocketTimeout is doing in EepGet, + // but when that's ripped out of EepGet to use setsotimeout, we'll need this. + Properties props = new Properties(); + props.setProperty(I2PSocketOptions.PROP_CONNECT_TIMEOUT, "" + CONNECT_TIMEOUT); + props.setProperty(I2PSocketOptions.PROP_READ_TIMEOUT, "" + INACTIVITY_TIMEOUT); + I2PSocketOptions opts = _socketManager.buildOptions(props); + _socket = _socketManager.connect(dest, opts); + } else { + throw new IOException("Unsupported protocol: " + _actualURL); + } + } catch (MalformedURLException mue) { + throw new IOException("Request URL is invalid: " + _actualURL); + } catch (I2PException ie) { + throw new IOException(ie.toString()); + } + + _proxyIn = _socket.getInputStream(); + _proxyOut = _socket.getOutputStream(); + + // SocketTimeout doesn't take an I2PSocket, but no matter, because we + // always close our socket in fetch() above. + //timeout.setSocket(_socket); + + String req = getRequest(); + _proxyOut.write(DataHelper.getUTF8(req)); + _proxyOut.flush(); + } + + /** + * Guess we have to override this since + * super doesn't strip the http://host from the GET line + * which hoses some servers (opentracker) + * HTTP proxy was kind enough to do this for us + */ + @Override + protected String getRequest() throws IOException { + StringBuilder buf = new StringBuilder(2048); + URL url = new URL(_actualURL); + String host = url.getHost(); + String path = url.getPath(); + String query = url.getQuery(); + if (query != null) + path = path + '?' + query; + if (!path.startsWith("/")) + path = '/' + path; + buf.append("GET ").append(path).append(" HTTP/1.1\r\n" + + "Host: ").append(url.getHost()).append("\r\n"); + if (_alreadyTransferred > 0) { + buf.append("Range: bytes="); + buf.append(_alreadyTransferred); + buf.append("-\r\n"); + } + buf.append("Accept-Encoding: \r\n" + + "Cache-control: no-cache\r\n" + + "Pragma: no-cache\r\n" + + "User-Agent: " + USER_AGENT + "\r\n" + + "Connection: close\r\n\r\n"); + return buf.toString(); + } + + /** + * I2PSocketEepGet [-n #retries] [-t timeout] url + * Uses I2CP at localhost:7654 with a single 1-hop tunnel each direction. + * Tunnel build time not included in the timeout. + * + * This is just for testing, it will be commented out someday. + * Real command line apps should use EepGet.main(), + * which has more options, and you don't have to wait for tunnels to be built. + */ + public static void main(String args[]) { + int numRetries = 0; + long inactivityTimeout = INACTIVITY_TIMEOUT; + String url = null; + try { + for (int i = 0; i < args.length; i++) { + if (args[i].equals("-n")) { + numRetries = Integer.parseInt(args[i+1]); + i++; + } else if (args[i].equals("-t")) { + inactivityTimeout = 1000 * Integer.parseInt(args[i+1]); + i++; + } else if (args[i].startsWith("-")) { + usage(); + return; + } else { + url = args[i]; + } + } + } catch (Exception e) { + e.printStackTrace(); + usage(); + return; + } + + if (url == null) { + usage(); + return; + } + + Properties opts = new Properties(); + opts.setProperty("i2cp.dontPublishLeaseSet", "true"); + opts.setProperty("inbound.quantity", "1"); + opts.setProperty("outbound.quantity", "1"); + opts.setProperty("inbound.length", "1"); + opts.setProperty("outbound.length", "1"); + opts.setProperty("inbound.nickname", "I2PSocketEepGet"); + I2PSocketManager mgr = I2PSocketManagerFactory.createManager(opts); + if (mgr == null) { + System.err.println("Error creating the socket manager"); + return; + } + I2PSocketEepGet get = new I2PSocketEepGet(I2PAppContext.getGlobalContext(), + mgr, numRetries, suggestName(url), url); + get.addStatusListener(get.new CLIStatusListener(1024, 40)); + get.fetch(inactivityTimeout, -1, inactivityTimeout); + mgr.destroySocketManager(); + } + + private static void usage() { + System.err.println("I2PSocketEepGet [-n #retries] [-t timeout] url"); + } +} diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java index 140c6d6b9..bd32afbf7 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -266,6 +266,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { if (opts.contains(PROP_SLOW_START_GROWTH_RATE_FACTOR)) setSlowStartGrowthRateFactor(getInt(opts, PROP_SLOW_START_GROWTH_RATE_FACTOR, 2)); if (opts.containsKey(PROP_CONNECT_TIMEOUT)) + // wow 5 minutes!!! FIXME!! setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT)); if (opts.containsKey(PROP_ANSWER_PINGS)) setAnswerPings(getBool(opts, PROP_ANSWER_PINGS, DEFAULT_ANSWER_PINGS)); diff --git a/core/java/src/net/i2p/util/EepGet.java b/core/java/src/net/i2p/util/EepGet.java index 0ab800cee..43c0e40c6 100644 --- a/core/java/src/net/i2p/util/EepGet.java +++ b/core/java/src/net/i2p/util/EepGet.java @@ -27,7 +27,7 @@ import net.i2p.data.DataHelper; * Bug: a malformed url http://example.i2p (no trailing '/') fails cryptically */ public class EepGet { - private I2PAppContext _context; + protected I2PAppContext _context; protected Log _log; protected boolean _shouldProxy; private String _proxyHost; @@ -35,8 +35,8 @@ public class EepGet { protected int _numRetries; private long _minSize; // minimum and maximum acceptable response size, -1 signifies unlimited, private long _maxSize; // applied both against whole responses and chunks - private String _outputFile; - private OutputStream _outputStream; + protected String _outputFile; + protected OutputStream _outputStream; /** url we were asked to fetch */ protected String _url; /** the URL we actually fetch from (may differ from the _url in case of redirect) */ @@ -47,10 +47,10 @@ public class EepGet { private boolean _keepFetching; private Socket _proxy; - private OutputStream _proxyOut; - private InputStream _proxyIn; + protected OutputStream _proxyOut; + protected InputStream _proxyIn; protected OutputStream _out; - private long _alreadyTransferred; + protected long _alreadyTransferred; private long _bytesTransferred; protected long _bytesRemaining; protected int _currentAttempt; @@ -67,6 +67,10 @@ public class EepGet { protected long _fetchInactivityTimeout; protected int _redirects; protected String _redirectLocation; + /** this will be replaced by the HTTP Proxy if we are using it */ + protected static final String USER_AGENT = "Wget/1.11.4"; + protected static final long CONNECT_TIMEOUT = 45*1000; + protected static final long INACTIVITY_TIMEOUT = 60*1000; public EepGet(I2PAppContext ctx, String proxyHost, int proxyPort, int numRetries, String outputFile, String url) { this(ctx, true, proxyHost, proxyPort, numRetries, outputFile, url); @@ -118,7 +122,7 @@ public class EepGet { _transferFailed = false; _headersRead = false; _aborted = false; - _fetchHeaderTimeout = 45*1000; + _fetchHeaderTimeout = CONNECT_TIMEOUT; _listeners = new ArrayList(1); _etag = etag; _lastModified = lastModified; @@ -134,7 +138,7 @@ public class EepGet { int numRetries = 5; int markSize = 1024; int lineLen = 40; - int inactivityTimeout = 60*1000; + long inactivityTimeout = INACTIVITY_TIMEOUT; String etag = null; String saveAs = null; String url = null; @@ -183,7 +187,7 @@ public class EepGet { EepGet get = new EepGet(I2PAppContext.getGlobalContext(), true, proxyHost, proxyPort, numRetries, saveAs, url, true, etag); get.addStatusListener(get.new CLIStatusListener(markSize, lineLen)); - get.fetch(45*1000, -1, inactivityTimeout); + get.fetch(CONNECT_TIMEOUT, -1, inactivityTimeout); } public static String suggestName(String url) { @@ -216,7 +220,7 @@ public class EepGet { return buf.toString(); } - protected static void usage() { + private static void usage() { System.err.println("EepGet [-p 127.0.0.1:4444] [-n #retries] [-o outputFile] [-m markSize lineLen] [-t timeout] url"); } @@ -247,7 +251,7 @@ public class EepGet { public void headerReceived(String url, int currentAttempt, String key, String val); public void attempting(String url); } - private class CLIStatusListener implements StatusListener { + protected class CLIStatusListener implements StatusListener { private int _markSize; private int _lineSize; private long _startedOn; @@ -497,7 +501,7 @@ public class EepGet { if (_fetchInactivityTimeout > 0) timeout.setInactivityTimeout(_fetchInactivityTimeout); else - timeout.setInactivityTimeout(60*1000); + timeout.setInactivityTimeout(INACTIVITY_TIMEOUT); if (_redirectLocation != null) { try { @@ -829,12 +833,12 @@ public class EepGet { } } - private void increment(byte[] lookahead, int cur) { + private static void increment(byte[] lookahead, int cur) { lookahead[0] = lookahead[1]; lookahead[1] = lookahead[2]; lookahead[2] = (byte)cur; } - private boolean isEndOfHeaders(byte lookahead[]) { + private static boolean isEndOfHeaders(byte lookahead[]) { byte first = lookahead[0]; byte second = lookahead[1]; byte third = lookahead[2]; @@ -844,7 +848,7 @@ public class EepGet { /** we ignore any potential \r, since we trim it on write anyway */ private static final byte NL = '\n'; - private boolean isNL(byte b) { return (b == NL); } + private static boolean isNL(byte b) { return (b == NL); } protected void sendRequest(SocketTimeout timeout) throws IOException { if (_outputStream != null) { @@ -895,7 +899,7 @@ public class EepGet { } protected String getRequest() throws IOException { - StringBuilder buf = new StringBuilder(512); + StringBuilder buf = new StringBuilder(2048); boolean post = false; if ( (_postData != null) && (_postData.length() > 0) ) post = true; @@ -906,7 +910,7 @@ public class EepGet { String path = url.getPath(); String query = url.getQuery(); if (query != null) - path = path + "?" + query; + path = path + '?' + query; if (!path.startsWith("/")) path = "/" + path; if ( (port == 80) || (port == 443) || (port <= 0) ) path = proto + "://" + host + path; @@ -923,12 +927,11 @@ public class EepGet { buf.append(_alreadyTransferred); buf.append("-\r\n"); } - buf.append("Accept-Encoding: \r\n"); if (_shouldProxy) buf.append("X-Accept-Encoding: x-i2p-gzip;q=1.0, identity;q=0.5, deflate;q=0, gzip;q=0, *;q=0\r\n"); if (!_allowCaching) { - buf.append("Cache-control: no-cache\r\n"); - buf.append("Pragma: no-cache\r\n"); + buf.append("Cache-control: no-cache\r\n" + + "Pragma: no-cache\r\n"); } if ((_etag != null) && (_alreadyTransferred <= 0)) { buf.append("If-None-Match: "); @@ -942,7 +945,10 @@ public class EepGet { } if (post) buf.append("Content-length: ").append(_postData.length()).append("\r\n"); - buf.append("Connection: close\r\n\r\n"); + // This will be replaced if we are going through I2PTunnelHTTPClient + buf.append("User-Agent: " + USER_AGENT + "\r\n" + + "Accept-Encoding: \r\n" + + "Connection: close\r\n\r\n"); if (post) buf.append(_postData); if (_log.shouldLog(Log.DEBUG)) diff --git a/core/java/src/net/i2p/util/EepGetScheduler.java b/core/java/src/net/i2p/util/EepGetScheduler.java index 86db28540..54c434225 100644 --- a/core/java/src/net/i2p/util/EepGetScheduler.java +++ b/core/java/src/net/i2p/util/EepGetScheduler.java @@ -7,7 +7,7 @@ import java.util.List; import net.i2p.I2PAppContext; /** - * + * @deprecated unused a webapp version would be nice though */ public class EepGetScheduler implements EepGet.StatusListener { private I2PAppContext _context; diff --git a/core/java/src/net/i2p/util/EepHead.java b/core/java/src/net/i2p/util/EepHead.java index 5127ed93b..38438a402 100644 --- a/core/java/src/net/i2p/util/EepHead.java +++ b/core/java/src/net/i2p/util/EepHead.java @@ -93,7 +93,7 @@ public class EepHead extends EepGet { } } - protected static void usage() { + private static void usage() { System.err.println("EepHead [-p 127.0.0.1:4444] [-n #retries] [-t timeout] url"); } @@ -191,6 +191,8 @@ public class EepHead extends EepGet { buf.append("Accept-Encoding: \r\n"); if (_shouldProxy) buf.append("X-Accept-Encoding: x-i2p-gzip;q=1.0, identity;q=0.5, deflate;q=0, gzip;q=0, *;q=0\r\n"); + // This will be replaced if we are going through I2PTunnelHTTPClient + buf.append("User-Agent: " + USER_AGENT + "\r\n"); buf.append("Connection: close\r\n\r\n"); if (_log.shouldLog(Log.DEBUG)) _log.debug("Request: [" + buf.toString() + "]"); diff --git a/core/java/src/net/i2p/util/SocketTimeout.java b/core/java/src/net/i2p/util/SocketTimeout.java index 3813ec6e8..63f54d45d 100644 --- a/core/java/src/net/i2p/util/SocketTimeout.java +++ b/core/java/src/net/i2p/util/SocketTimeout.java @@ -5,6 +5,15 @@ import java.net.Socket; import java.text.SimpleDateFormat; import java.util.Date; +/** + * This should be deprecated. + * It is only used by EepGet, and it uses the inefficient SimpleTimer. + * The only advantage seems to be a total timeout period, which is the second + * argument to EepGet.fetch(headerTimeout, totalTimeout, inactivityTimeout), + * which is most likely always set to -1. + * + * Use socket.setsotimeout instead? + */ public class SocketTimeout implements SimpleTimer.TimedEvent { private Socket _targetSocket; private long _startTime; @@ -69,4 +78,4 @@ public class SocketTimeout implements SimpleTimer.TimedEvent { buf.append("cancelled? ").append(_cancelled); return buf.toString(); } -} \ No newline at end of file +} From bb14fa0b4e02e508670c96c48b45719051677f39 Mon Sep 17 00:00:00 2001 From: zzz Date: Wed, 26 Aug 2009 22:16:29 +0000 Subject: [PATCH 03/14] * Console: Prevent OOMs in NewsFetcher or StatsSummarizer from killing the router --- .../java/src/net/i2p/router/web/RouterConsoleRunner.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/routerconsole/java/src/net/i2p/router/web/RouterConsoleRunner.java b/apps/routerconsole/java/src/net/i2p/router/web/RouterConsoleRunner.java index 22ee8a38b..8529f13ff 100644 --- a/apps/routerconsole/java/src/net/i2p/router/web/RouterConsoleRunner.java +++ b/apps/routerconsole/java/src/net/i2p/router/web/RouterConsoleRunner.java @@ -12,7 +12,7 @@ import net.i2p.apps.systray.SysTray; import net.i2p.data.DataHelper; import net.i2p.router.RouterContext; import net.i2p.util.FileUtil; -import net.i2p.util.I2PThread; +import net.i2p.util.I2PAppThread; import org.mortbay.http.DigestAuthenticator; import org.mortbay.http.HashUserRealm; @@ -160,11 +160,11 @@ public class RouterConsoleRunner { } NewsFetcher fetcher = NewsFetcher.getInstance(I2PAppContext.getGlobalContext()); - I2PThread t = new I2PThread(fetcher, "NewsFetcher"); + Thread t = new I2PAppThread(fetcher, "NewsFetcher"); t.setDaemon(true); t.start(); - I2PThread st = new I2PThread(new StatSummarizer(), "StatSummarizer"); + Thread st = new I2PAppThread(new StatSummarizer(), "StatSummarizer"); st.setDaemon(true); st.start(); } From 7973f2e8b9475bec932846535fc8421621fd8ab0 Mon Sep 17 00:00:00 2001 From: zzz Date: Wed, 26 Aug 2009 22:17:29 +0000 Subject: [PATCH 04/14] * DataHelper: Fix byte array hashcode for small arrays --- core/java/src/net/i2p/data/DataHelper.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/core/java/src/net/i2p/data/DataHelper.java b/core/java/src/net/i2p/data/DataHelper.java index fdd560243..655420282 100644 --- a/core/java/src/net/i2p/data/DataHelper.java +++ b/core/java/src/net/i2p/data/DataHelper.java @@ -798,12 +798,20 @@ public class DataHelper { * */ public static int hashCode(byte b[]) { + // Java 5 now has its own method, and the old way + // is horrible for arrays smaller than 32. + // otoh, for sizes >> 32, java's method may be too slow int rv = 0; if (b != null) { - for (int i = 0; i < b.length && i < 32; i++) - rv += (b[i] << i); + if (b.length <= 32) { + rv = Arrays.hashCode(b); + } else { + for (int i = 0; i < b.length && i < 32; i++) + rv ^= (b[i] << i); // xor better than + in tests + } } return rv; + } /** From 93d366fea116f1d4aa6863f3cb29c4032ad16ce3 Mon Sep 17 00:00:00 2001 From: zzz Date: Wed, 26 Aug 2009 22:18:15 +0000 Subject: [PATCH 05/14] * Tunnel: Concurrentify HashSetIVValidator --- .../net/i2p/router/tunnel/HashSetIVValidator.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/router/java/src/net/i2p/router/tunnel/HashSetIVValidator.java b/router/java/src/net/i2p/router/tunnel/HashSetIVValidator.java index c9241fcce..0dbbe2468 100644 --- a/router/java/src/net/i2p/router/tunnel/HashSetIVValidator.java +++ b/router/java/src/net/i2p/router/tunnel/HashSetIVValidator.java @@ -1,18 +1,19 @@ package net.i2p.router.tunnel; -import java.util.HashSet; +import java.util.Set; import net.i2p.data.ByteArray; import net.i2p.data.DataHelper; +import net.i2p.util.ConcurrentHashSet; /** * waste lots of RAM */ class HashSetIVValidator implements IVValidator { - private final HashSet _received; + private final Set _received; public HashSetIVValidator() { - _received = new HashSet(); + _received = new ConcurrentHashSet(); } public boolean receiveIV(byte ivData[], int ivOffset, byte payload[], int payloadOffset) { @@ -21,10 +22,7 @@ class HashSetIVValidator implements IVValidator { byte iv[] = new byte[HopProcessor.IV_LENGTH]; DataHelper.xor(ivData, ivOffset, payload, payloadOffset, iv, 0, HopProcessor.IV_LENGTH); ByteArray ba = new ByteArray(iv); - boolean isNew = false; - synchronized (_received) { - isNew = _received.add(ba); - } + boolean isNew = _received.add(ba); return isNew; } -} \ No newline at end of file +} From 593d4dc50813a9558bc43d7e47d3e9bd7f031679 Mon Sep 17 00:00:00 2001 From: zzz Date: Wed, 26 Aug 2009 22:22:47 +0000 Subject: [PATCH 06/14] * DecayingBloomFilter: - Replace with new DecayingHashSet for 3 of 4 uses, and also in the 4th if the router is low-bandwidth. Saves 8 MB heap. --- .../src/net/i2p/util/DecayingBloomFilter.java | 68 +++- .../src/net/i2p/util/DecayingHashSet.java | 380 ++++++++++++++++++ .../src/net/i2p/router/MessageValidator.java | 3 +- .../udp/InboundMessageFragments.java | 3 +- .../router/tunnel/BloomFilterIVValidator.java | 24 +- .../router/tunnel/BuildMessageProcessor.java | 3 +- 6 files changed, 467 insertions(+), 14 deletions(-) create mode 100644 core/java/src/net/i2p/util/DecayingHashSet.java diff --git a/core/java/src/net/i2p/util/DecayingBloomFilter.java b/core/java/src/net/i2p/util/DecayingBloomFilter.java index 95da0a03b..a1029ae1b 100644 --- a/core/java/src/net/i2p/util/DecayingBloomFilter.java +++ b/core/java/src/net/i2p/util/DecayingBloomFilter.java @@ -14,6 +14,10 @@ import org.xlattice.crypto.filters.BloomSHA1; * entries per second with virtually no false positive rate. Down the line, * this may be refactored to allow tighter control of the size necessary for the * contained bloom filters, but a fixed 2MB overhead isn't that bad. + * + * NOTE: At 1MBps, the tunnel IVV will see an unacceptable false positive rate + * of almost 0.1% with the current m and k values; however using DHS instead will use 30MB. + * Further analysis and tweaking for the tunnel IVV may be required. */ public class DecayingBloomFilter { private I2PAppContext _context; @@ -26,13 +30,18 @@ public class DecayingBloomFilter { private byte _extended[]; private byte _longToEntry[]; private long _longToEntryMask; - private long _currentDuplicates; + protected long _currentDuplicates; private boolean _keepDecaying; private DecayEvent _decayEvent; + /** just for logging */ + private String _name; private static final int DEFAULT_M = 23; private static final boolean ALWAYS_MISS = false; + /** noop for DHS */ + public DecayingBloomFilter() {} + /** * Create a bloom filter that will decay its entries over time. * @@ -42,9 +51,15 @@ public class DecayingBloomFilter { * against with sufficient random values. */ public DecayingBloomFilter(I2PAppContext context, int durationMs, int entryBytes) { + this(context, durationMs, entryBytes, "DBF"); + } + + /** @param name just for logging / debugging / stats */ + public DecayingBloomFilter(I2PAppContext context, int durationMs, int entryBytes, String name) { _context = context; _log = context.logManager().getLog(DecayingBloomFilter.class); _entryBytes = entryBytes; + _name = name; // this is instantiated in four different places, they may have different // requirements, but for now use this as a gross method of memory reduction. // m == 23 => 1MB each BloomSHA1 (4 pairs = 8MB total) @@ -67,6 +82,17 @@ public class DecayingBloomFilter { _decayEvent = new DecayEvent(); _keepDecaying = true; SimpleTimer.getInstance().addEvent(_decayEvent, _durationMs); + if (_log.shouldLog(Log.WARN)) + _log.warn("New DBF " + name + " m = " + m + " entryBytes = " + entryBytes + + " numExtenders = " + numExtenders + " cycle (s) = " + (durationMs / 1000)); + // try to get a handle on memory usage vs. false positives + context.statManager().createRateStat("router.decayingBloomFilter." + name + ".size", + "Size", "Router", new long[] { Math.max(60*1000, durationMs) }); + context.statManager().createRateStat("router.decayingBloomFilter." + name + ".dups", + "1000000 * Duplicates/Size", "Router", new long[] { Math.max(60*1000, durationMs) }); + context.statManager().createRateStat("router.decayingBloomFilter." + name + ".log10(falsePos)", + "log10 of the false positive rate (must have net.i2p.util.DecayingBloomFilter=DEBUG)", + "Router", new long[] { Math.max(60*1000, durationMs) }); } public long getCurrentDuplicateCount() { return _currentDuplicates; } @@ -196,9 +222,12 @@ public class DecayingBloomFilter { private void decay() { int currentCount = 0; long dups = 0; + double fpr = 0d; synchronized (this) { BloomSHA1 tmp = _previous; currentCount = _current.size(); + if (_log.shouldLog(Log.DEBUG) && currentCount > 0) + fpr = _current.falsePositives(); _previous = _current; _current = tmp; _current.clear(); @@ -206,8 +235,19 @@ public class DecayingBloomFilter { _currentDuplicates = 0; } if (_log.shouldLog(Log.DEBUG)) - _log.debug("Decaying the filter after inserting " + currentCount - + " elements and " + dups + " false positives"); + _log.debug("Decaying the filter " + _name + " after inserting " + currentCount + + " elements and " + dups + " false positives with FPR = " + fpr); + _context.statManager().addRateData("router.decayingBloomFilter." + _name + ".size", + currentCount, 0); + if (currentCount > 0) + _context.statManager().addRateData("router.decayingBloomFilter." + _name + ".dups", + 1000l*1000*dups/currentCount, 0); + if (fpr > 0d) { + // only if log.shouldLog(Log.DEBUG) ... + long exponent = (long) Math.log10(fpr); + _context.statManager().addRateData("router.decayingBloomFilter." + _name + ".log10(falsePos)", + exponent, 0); + } } private class DecayEvent implements SimpleTimer.TimedEvent { @@ -219,18 +259,27 @@ public class DecayingBloomFilter { } } + /** + * Theoretical false positive rate for 16 KBps: 1.17E-21 + * Theoretical false positive rate for 24 KBps: 9.81E-20 + * Theoretical false positive rate for 32 KBps: 2.24E-18 + * Theoretical false positive rate for 256 KBps: 7.45E-9 + * Theoretical false positive rate for 512 KBps: 5.32E-6 + * Theoretical false positive rate for 1024 KBps: 1.48E-3 + */ public static void main(String args[]) { int kbps = 256; - int iterations = 100; + int iterations = 10; testByLong(kbps, iterations); testByBytes(kbps, iterations); } - public static void testByLong(int kbps, int numRuns) { + private static void testByLong(int kbps, int numRuns) { int messages = 60 * 10 * kbps; Random r = new Random(); DecayingBloomFilter filter = new DecayingBloomFilter(I2PAppContext.getGlobalContext(), 600*1000, 8); int falsePositives = 0; long totalTime = 0; + double fpr = 0d; for (int j = 0; j < numRuns; j++) { long start = System.currentTimeMillis(); for (int i = 0; i < messages; i++) { @@ -240,15 +289,17 @@ public class DecayingBloomFilter { } } totalTime += System.currentTimeMillis() - start; + fpr = filter.getFalsePositiveRate(); filter.clear(); } filter.stopDecaying(); + System.out.println("False postive rate should be " + fpr); System.out.println("After " + numRuns + " runs pushing " + messages + " entries in " + DataHelper.formatDuration(totalTime/numRuns) + " per run, there were " + falsePositives + " false positives"); } - public static void testByBytes(int kbps, int numRuns) { + private static void testByBytes(int kbps, int numRuns) { byte iv[][] = new byte[60*10*kbps][16]; Random r = new Random(); for (int i = 0; i < iv.length; i++) @@ -257,18 +308,21 @@ public class DecayingBloomFilter { DecayingBloomFilter filter = new DecayingBloomFilter(I2PAppContext.getGlobalContext(), 600*1000, 16); int falsePositives = 0; long totalTime = 0; + double fpr = 0d; for (int j = 0; j < numRuns; j++) { long start = System.currentTimeMillis(); for (int i = 0; i < iv.length; i++) { if (filter.add(iv[i])) { falsePositives++; - System.out.println("False positive " + falsePositives + " (testByLong j=" + j + " i=" + i + ")"); + System.out.println("False positive " + falsePositives + " (testByBytes j=" + j + " i=" + i + ")"); } } totalTime += System.currentTimeMillis() - start; + fpr = filter.getFalsePositiveRate(); filter.clear(); } filter.stopDecaying(); + System.out.println("False postive rate should be " + fpr); System.out.println("After " + numRuns + " runs pushing " + iv.length + " entries in " + DataHelper.formatDuration(totalTime/numRuns) + " per run, there were " + falsePositives + " false positives"); diff --git a/core/java/src/net/i2p/util/DecayingHashSet.java b/core/java/src/net/i2p/util/DecayingHashSet.java new file mode 100644 index 000000000..a72b6b9e2 --- /dev/null +++ b/core/java/src/net/i2p/util/DecayingHashSet.java @@ -0,0 +1,380 @@ +package net.i2p.util; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.Random; + +import net.i2p.I2PAppContext; +import net.i2p.data.DataHelper; + + +/** + * Double buffered hash set. + * Since DecayingBloomFilter was instantiated 4 times for a total memory usage + * of 8MB, it seemed like we could do a lot better, given these usage stats + * on a class L router: + * + * ./router/java/src/net/i2p/router/tunnel/BuildMessageProcessor.java: + * 32 bytes, peak 10 entries in 1m + * + * ./router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java: + * 4 bytes, peak 150 entries in 10s + * + * ./router/java/src/net/i2p/router/MessageValidator.java: + * 8 bytes, peak 1K entries in 2m + * + * ./router/java/src/net/i2p/router/tunnel/BloomFilterIVValidator.java: + * 16 bytes, peak 15K entries in 10m + * + * If the ArrayWrapper object in the HashSet is 50 bytes, and BloomSHA1(23, 11) is 1MB, + * then for less than 20K entries this is smaller. + * And this uses space proportional to traffiic, so it doesn't penalize small routers + * with a fixed 8MB. + * So let's try it for the first 2 or 3, for now. + * + * Also, DBF is syncrhonized, and uses SimpleTimer. + * Here we use a read/write lock, with synchronization only + * when switching double buffers, and we use SimpleScheduler. + * + * Yes, we could stare at stats all day, and try to calculate an acceptable + * false-positive rate for each of the above uses, then estimate the DBF size + * required to meet that rate for a given usage. Or even start adjusting the + * Bloom filter m and k values on a per-DBF basis. But it's a whole lot easier + * to implement something with a zero false positive rate, and uses less memory + * for almost all bandwidth classes. + * + * This has a strictly zero false positive rate for <= 8 byte keys. + * For larger keys, it is 1 / (2**64) ~= 5E-20, which is better than + * DBF for any entry count greater than about 14K. + * + * DBF has a zero false negative rate over the period + * 2 * durationMs. And a 100% false negative rate beyond that period. + * This has the same properties. + * + * This performs about twice as fast as DBF in the test below. + * + * @author zzz + */ +public class DecayingHashSet extends DecayingBloomFilter { + private final I2PAppContext _context; + private final Log _log; + private ConcurrentHashSet _current; + private ConcurrentHashSet _previous; + private int _durationMs; + private int _entryBytes; + private volatile boolean _keepDecaying; + private final DecayEvent _decayEvent; + /** just for logging */ + private final String _name; + /** synchronize against this lock when switching double buffers */ + private final ReentrantReadWriteLock _reorganizeLock = new ReentrantReadWriteLock(true); + + + /** + * Create a double-buffered hash set that will decay its entries over time. + * + * @param durationMs entries last for at least this long, but no more than twice this long + * @param entryBytes how large are the entries to be added? 1 to 32 bytes + */ + public DecayingHashSet(I2PAppContext context, int durationMs, int entryBytes) { + this(context, durationMs, entryBytes, "DHS"); + } + + /** @param name just for logging / debugging / stats */ + public DecayingHashSet(I2PAppContext context, int durationMs, int entryBytes, String name) { + if (entryBytes <= 0 || entryBytes > 32) + throw new IllegalArgumentException("Bad size"); + _context = context; + _log = context.logManager().getLog(DecayingHashSet.class); + _entryBytes = entryBytes; + _name = name; + _current = new ConcurrentHashSet(128); + _previous = new ConcurrentHashSet(128); + _durationMs = durationMs; + _currentDuplicates = 0; + _decayEvent = new DecayEvent(); + _keepDecaying = true; + SimpleScheduler.getInstance().addEvent(_decayEvent, _durationMs); + if (_log.shouldLog(Log.WARN)) + _log.warn("New DHS " + name + " entryBytes = " + entryBytes + + " cycle (s) = " + (durationMs / 1000)); + // try to get a handle on memory usage vs. false positives + context.statManager().createRateStat("router.decayingHashSet." + name + ".size", + "Size", "Router", new long[] { Math.max(60*1000, durationMs) }); + context.statManager().createRateStat("router.decayingHashSet." + name + ".dups", + "1000000 * Duplicates/Size", "Router", new long[] { Math.max(60*1000, durationMs) }); + } + + /** unsynchronized but only used for logging elsewhere */ + @Override + public int getInsertedCount() { + return _current.size() + _previous.size(); + } + /** pointless, only used for logging elsewhere */ + @Override + public double getFalsePositiveRate() { + if (_entryBytes <= 8) + return 0d; + return 1d / Math.pow(2d, 64d); // 5.4E-20 + } + + /** + * @return true if the entry added is a duplicate + * + */ + @Override + public boolean add(byte entry[], int off, int len) { + if (entry == null) + throw new IllegalArgumentException("Null entry"); + if (len != _entryBytes) + throw new IllegalArgumentException("Bad entry [" + len + ", expected " + + _entryBytes + "]"); + getReadLock(); + try { + return locked_add(entry, off, len, true); + } finally { releaseReadLock(); } + } + + /** + * @return true if the entry added is a duplicate. the number of low order + * bits used is determined by the entryBytes parameter used on creation of the + * filter. + * + */ + @Override + public boolean add(long entry) { + return add(entry, true); + } + + /** + * @return true if the entry is already known. this does NOT add the + * entry however. + * + */ + @Override + public boolean isKnown(long entry) { + return add(entry, false); + } + + private boolean add(long entry, boolean addIfNew) { + int len = Math.min(8, _entryBytes); + byte[] b = toLong(len, entry); + getReadLock(); + try { + return locked_add(b, 0, len, addIfNew); + } finally { releaseReadLock(); } + } + + /** from DataHelper, except negative values ok */ + private static byte[] toLong(int numBytes, long value) { + byte target[] = new byte[numBytes]; + for (int i = 0; i < numBytes; i++) + target[numBytes-i-1] = (byte)(value >>> (i*8)); + return target; + } + + /** so many questions... */ + private boolean locked_add(byte entry[], int offset, int len, boolean addIfNew) { + ArrayWrapper w = new ArrayWrapper(entry, offset, len); + boolean seen = _current.contains(w); + seen = seen || _previous.contains(w); + if (seen) { + // why increment if addIfNew == false? + // why not add to current if only in previous? + _currentDuplicates++; + } else if (addIfNew) { + _current.add(w); + // why add to previous? + _previous.add(w); + } + return seen; + } + + @Override + public void clear() { + _current.clear(); + _previous.clear(); + _currentDuplicates = 0; + } + + /** super doesn't call clear, but neither do the users, so it seems like we should here */ + @Override + public void stopDecaying() { + _keepDecaying = false; + clear(); + } + + private void decay() { + int currentCount = 0; + long dups = 0; + if (!getWriteLock()) + return; + try { + ConcurrentHashSet tmp = _previous; + currentCount = _current.size(); + _previous = _current; + _current = tmp; + _current.clear(); + dups = _currentDuplicates; + _currentDuplicates = 0; + } finally { releaseWriteLock(); } + + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Decaying the filter " + _name + " after inserting " + currentCount + + " elements and " + dups + " false positives"); + _context.statManager().addRateData("router.decayingHashSet." + _name + ".size", + currentCount, 0); + if (currentCount > 0) + _context.statManager().addRateData("router.decayingHashSet." + _name + ".dups", + 1000l*1000*dups/currentCount, 0); + } + + /** if decay() ever blows up, we won't reschedule, and will grow unbounded, but it seems unlikely */ + private class DecayEvent implements SimpleTimer.TimedEvent { + public void timeReached() { + if (_keepDecaying) { + decay(); + SimpleScheduler.getInstance().addEvent(DecayEvent.this, _durationMs); + } + } + } + + private void getReadLock() { + _reorganizeLock.readLock().lock(); + } + + private void releaseReadLock() { + _reorganizeLock.readLock().unlock(); + } + + /** @return true if the lock was acquired */ + private boolean getWriteLock() { + try { + boolean rv = _reorganizeLock.writeLock().tryLock(5000, TimeUnit.MILLISECONDS); + if (!rv) + _log.error("no lock, size is: " + _reorganizeLock.getQueueLength(), new Exception("rats")); + return rv; + } catch (InterruptedException ie) {} + return false; + } + + private void releaseWriteLock() { + _reorganizeLock.writeLock().unlock(); + } + + /** + * This saves the data as-is if the length is <= 8 bytes, + * otherwise it stores an 8-byte hash. + * Hash function is from DataHelper, modded to get + * the maximum entropy given the length of the data. + */ + private static class ArrayWrapper { + private long _longhashcode; + public ArrayWrapper(byte[] b, int offset, int len) { + int idx = offset; + int shift = Math.min(8, 64 / len); + for (int i = 0; i < len; i++) { + // xor better than + in tests + _longhashcode ^= (((long) b[idx++]) << (i * shift)); + } + } + + public int hashCode() { + return (int) _longhashcode; + } + + public long longHashCode() { + return _longhashcode; + } + + public boolean equals(Object o) { + if (o == null || !(o instanceof ArrayWrapper)) + return false; + return ((ArrayWrapper) o).longHashCode() == _longhashcode; + } + } + + /** + * vs. DBF, this measures 1.93x faster for testByLong and 2.46x faster for testByBytes. + */ + public static void main(String args[]) { + /** KBytes per sec, 1 message per KByte */ + int kbps = 256; + int iterations = 10; + //testSize(); + testByLong(kbps, iterations); + testByBytes(kbps, iterations); + } + + /** and the answer is: 49.9 bytes. The ArrayWrapper alone measured 16, so that's 34 for the HashSet entry. */ +/***** + private static void testSize() { + int qty = 256*1024; + byte b[] = new byte[8]; + Random r = new Random(); + long old = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + ConcurrentHashSet foo = new ConcurrentHashSet(qty); + for (int i = 0; i < qty; i++) { + r.nextBytes(b); + foo.add(new ArrayWrapper(b, 0, 8)); + } + long used = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + System.out.println("Memory per ArrayWrapper: " + (((double) (used - old)) / qty)); + } +*****/ + + /** 8 bytes, simulate the router message validator */ + private static void testByLong(int kbps, int numRuns) { + int messages = 60 * 10 * kbps; + Random r = new Random(); + DecayingBloomFilter filter = new DecayingHashSet(I2PAppContext.getGlobalContext(), 600*1000, 8); + int falsePositives = 0; + long totalTime = 0; + for (int j = 0; j < numRuns; j++) { + long start = System.currentTimeMillis(); + for (int i = 0; i < messages; i++) { + if (filter.add(r.nextLong())) { + falsePositives++; + System.out.println("False positive " + falsePositives + " (testByLong j=" + j + " i=" + i + ")"); + } + } + totalTime += System.currentTimeMillis() - start; + filter.clear(); + } + System.out.println("False postive rate should be " + filter.getFalsePositiveRate()); + filter.stopDecaying(); + System.out.println("After " + numRuns + " runs pushing " + messages + " entries in " + + DataHelper.formatDuration(totalTime/numRuns) + " per run, there were " + + falsePositives + " false positives"); + + } + + /** 16 bytes, simulate the tunnel IV validator */ + private static void testByBytes(int kbps, int numRuns) { + byte iv[][] = new byte[60*10*kbps][16]; + Random r = new Random(); + for (int i = 0; i < iv.length; i++) + r.nextBytes(iv[i]); + + DecayingBloomFilter filter = new DecayingHashSet(I2PAppContext.getGlobalContext(), 600*1000, 16); + int falsePositives = 0; + long totalTime = 0; + for (int j = 0; j < numRuns; j++) { + long start = System.currentTimeMillis(); + for (int i = 0; i < iv.length; i++) { + if (filter.add(iv[i])) { + falsePositives++; + System.out.println("False positive " + falsePositives + " (testByBytes j=" + j + " i=" + i + ")"); + } + } + totalTime += System.currentTimeMillis() - start; + filter.clear(); + } + System.out.println("False postive rate should be " + filter.getFalsePositiveRate()); + filter.stopDecaying(); + System.out.println("After " + numRuns + " runs pushing " + iv.length + " entries in " + + DataHelper.formatDuration(totalTime/numRuns) + " per run, there were " + + falsePositives + " false positives"); + } +} diff --git a/router/java/src/net/i2p/router/MessageValidator.java b/router/java/src/net/i2p/router/MessageValidator.java index a5c0a8217..b673702a1 100644 --- a/router/java/src/net/i2p/router/MessageValidator.java +++ b/router/java/src/net/i2p/router/MessageValidator.java @@ -1,6 +1,7 @@ package net.i2p.router; import net.i2p.util.DecayingBloomFilter; +import net.i2p.util.DecayingHashSet; import net.i2p.util.Log; /** @@ -95,7 +96,7 @@ public class MessageValidator { } public void startup() { - _filter = new DecayingBloomFilter(_context, (int)Router.CLOCK_FUDGE_FACTOR * 2, 8); + _filter = new DecayingHashSet(_context, (int)Router.CLOCK_FUDGE_FACTOR * 2, 8, "RouterMV"); } void shutdown() { diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java index 9315a9ff0..04c2a3184 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java @@ -5,6 +5,7 @@ import java.util.Map; import net.i2p.data.Hash; import net.i2p.router.RouterContext; import net.i2p.util.DecayingBloomFilter; +import net.i2p.util.DecayingHashSet; import net.i2p.util.Log; /** @@ -52,7 +53,7 @@ public class InboundMessageFragments /*implements UDPTransport.PartialACKSource // may want to extend the DecayingBloomFilter so we can use a smaller // array size (currently its tuned for 10 minute rates for the // messageValidator) - _recentlyCompletedMessages = new DecayingBloomFilter(_context, DECAY_PERIOD, 4); + _recentlyCompletedMessages = new DecayingHashSet(_context, DECAY_PERIOD, 4, "UDPIMF"); _ackSender.startup(); _messageReceiver.startup(); } diff --git a/router/java/src/net/i2p/router/tunnel/BloomFilterIVValidator.java b/router/java/src/net/i2p/router/tunnel/BloomFilterIVValidator.java index 5248944e6..2b05d0d20 100644 --- a/router/java/src/net/i2p/router/tunnel/BloomFilterIVValidator.java +++ b/router/java/src/net/i2p/router/tunnel/BloomFilterIVValidator.java @@ -1,10 +1,11 @@ package net.i2p.router.tunnel; -import net.i2p.I2PAppContext; import net.i2p.data.ByteArray; import net.i2p.data.DataHelper; +import net.i2p.router.RouterContext; import net.i2p.util.ByteCache; import net.i2p.util.DecayingBloomFilter; +import net.i2p.util.DecayingHashSet; /** * Manage the IV validation for all of the router's tunnels by way of a big @@ -12,7 +13,7 @@ import net.i2p.util.DecayingBloomFilter; * */ public class BloomFilterIVValidator implements IVValidator { - private I2PAppContext _context; + private RouterContext _context; private DecayingBloomFilter _filter; private ByteCache _ivXorCache = ByteCache.getInstance(32, HopProcessor.IV_LENGTH); @@ -23,9 +24,17 @@ public class BloomFilterIVValidator implements IVValidator { * */ private static final int HALFLIFE_MS = 10*60*1000; - public BloomFilterIVValidator(I2PAppContext ctx, int KBps) { + private static final int MIN_SHARE_KBPS_TO_USE_BLOOM = 64; + + public BloomFilterIVValidator(RouterContext ctx, int KBps) { _context = ctx; - _filter = new DecayingBloomFilter(ctx, HALFLIFE_MS, 16); + // Select the filter based on share bandwidth. + // Note that at rates approaching 1MB, we need to do something else, + // as the Bloom filter false positive rates approach 0.1%. FIXME + if (getShareBandwidth(ctx) < MIN_SHARE_KBPS_TO_USE_BLOOM) + _filter = new DecayingHashSet(ctx, HALFLIFE_MS, 16, "TunnelIVV"); // appx. 4MB max + else + _filter = new DecayingBloomFilter(ctx, HALFLIFE_MS, 16, "TunnelIVV"); // 2MB fixed ctx.statManager().createRateStat("tunnel.duplicateIV", "Note that a duplicate IV was received", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l }); } @@ -39,4 +48,11 @@ public class BloomFilterIVValidator implements IVValidator { return !dup; // return true if it is OK, false if it isn't } public void destroy() { _filter.stopDecaying(); } + + private static int getShareBandwidth(RouterContext ctx) { + int irateKBps = ctx.bandwidthLimiter().getInboundKBytesPerSecond(); + int orateKBps = ctx.bandwidthLimiter().getOutboundKBytesPerSecond(); + double pct = ctx.router().getSharePercentage(); + return (int) (pct * Math.min(irateKBps, orateKBps)); + } } diff --git a/router/java/src/net/i2p/router/tunnel/BuildMessageProcessor.java b/router/java/src/net/i2p/router/tunnel/BuildMessageProcessor.java index a578ee08a..ab6aa4cab 100644 --- a/router/java/src/net/i2p/router/tunnel/BuildMessageProcessor.java +++ b/router/java/src/net/i2p/router/tunnel/BuildMessageProcessor.java @@ -10,6 +10,7 @@ import net.i2p.data.SessionKey; import net.i2p.data.i2np.BuildRequestRecord; import net.i2p.data.i2np.TunnelBuildMessage; import net.i2p.util.DecayingBloomFilter; +import net.i2p.util.DecayingHashSet; import net.i2p.util.Log; /** @@ -22,7 +23,7 @@ public class BuildMessageProcessor { private DecayingBloomFilter _filter; public BuildMessageProcessor(I2PAppContext ctx) { - _filter = new DecayingBloomFilter(ctx, 60*1000, 32); + _filter = new DecayingHashSet(ctx, 60*1000, 32, "TunnelBMP"); ctx.statManager().createRateStat("tunnel.buildRequestDup", "How frequently we get dup build request messages", "Tunnels", new long[] { 60*1000, 10*60*1000 }); } /** From 1ecf4377c65907aa2227d753ebc49e0dd37815d5 Mon Sep 17 00:00:00 2001 From: zzz Date: Thu, 27 Aug 2009 03:52:14 +0000 Subject: [PATCH 07/14] * Client: - Fail if no date handshake after 30s or no leaseset after 5m, rather than hanging forever. --- core/java/src/net/i2p/client/I2PSessionImpl.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index 8fc308715..169562b7b 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -283,7 +283,10 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Before getDate"); sendMessage(new GetDateMessage()); if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "After getDate / begin waiting for a response"); + int waitcount = 0; while (!_dateReceived) { + if (waitcount++ > 30) + throw new IOException("no date handshake"); try { synchronized (_dateReceivedLock) { _dateReceivedLock.wait(1000); @@ -298,7 +301,10 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "After producer.connect()"); // wait until we have created a lease set + waitcount = 0; while (_leaseSet == null) { + if (waitcount++ > 5*60) + throw new IOException("no leaseset"); synchronized (_leaseSetWait) { try { _leaseSetWait.wait(1000); From 7736545f5bf036ed59b7274cd41e126abeaeba4b Mon Sep 17 00:00:00 2001 From: zzz Date: Thu, 27 Aug 2009 03:53:41 +0000 Subject: [PATCH 08/14] speed up data hashcodes --- core/java/src/net/i2p/data/Destination.java | 6 ++++-- core/java/src/net/i2p/data/Hash.java | 10 ++++++++-- core/java/src/net/i2p/data/LeaseSet.java | 8 ++++---- core/java/src/net/i2p/data/PrivateKey.java | 10 ++++++++-- core/java/src/net/i2p/data/PublicKey.java | 10 ++++++++-- core/java/src/net/i2p/data/RouterAddress.java | 6 +++--- core/java/src/net/i2p/data/RouterIdentity.java | 8 +++++--- core/java/src/net/i2p/data/SessionKey.java | 10 ++++++++-- core/java/src/net/i2p/data/Signature.java | 10 ++++++++-- core/java/src/net/i2p/data/SigningPrivateKey.java | 10 ++++++++-- core/java/src/net/i2p/data/SigningPublicKey.java | 10 ++++++++-- 11 files changed, 72 insertions(+), 26 deletions(-) diff --git a/core/java/src/net/i2p/data/Destination.java b/core/java/src/net/i2p/data/Destination.java index 5946056b3..7419e11ed 100644 --- a/core/java/src/net/i2p/data/Destination.java +++ b/core/java/src/net/i2p/data/Destination.java @@ -137,10 +137,12 @@ public class Destination extends DataStructureImpl { && DataHelper.eq(getPublicKey(), dst.getPublicKey()); } + /** the public key has enough randomness in it to use it by itself for speed */ @Override public int hashCode() { - return DataHelper.hashCode(getCertificate()) + DataHelper.hashCode(getSigningPublicKey()) - + DataHelper.hashCode(getPublicKey()); + if (_publicKey == null) + return 0; + return _publicKey.hashCode(); } @Override diff --git a/core/java/src/net/i2p/data/Hash.java b/core/java/src/net/i2p/data/Hash.java index 7dcce1ddb..a573b6835 100644 --- a/core/java/src/net/i2p/data/Hash.java +++ b/core/java/src/net/i2p/data/Hash.java @@ -147,9 +147,15 @@ public class Hash extends DataStructureImpl { return DataHelper.eq(_data, ((Hash) obj)._data); } + /** a Hash is a hash, so just use the first 4 bytes for speed */ @Override public int hashCode() { - return DataHelper.hashCode(_data); + int rv = 0; + if (_data != null) { + for (int i = 0; i < 4; i++) + rv ^= (_data[i] << (i*8)); + } + return rv; } @Override @@ -267,4 +273,4 @@ public class Hash extends DataStructureImpl { } _log.debug("Fill check test passed"); } -} \ No newline at end of file +} diff --git a/core/java/src/net/i2p/data/LeaseSet.java b/core/java/src/net/i2p/data/LeaseSet.java index 3dcbf97a6..4a683c257 100644 --- a/core/java/src/net/i2p/data/LeaseSet.java +++ b/core/java/src/net/i2p/data/LeaseSet.java @@ -345,12 +345,12 @@ public class LeaseSet extends DataStructureImpl { } + /** the destination has enough randomness in it to use it by itself for speed */ @Override public int hashCode() { - return DataHelper.hashCode(getEncryptionKey()) + - //(int)_version + - DataHelper.hashCode(_leases) + DataHelper.hashCode(getSignature()) - + DataHelper.hashCode(getSigningKey()) + DataHelper.hashCode(getDestination()); + if (_destination == null) + return 0; + return _destination.hashCode(); } @Override diff --git a/core/java/src/net/i2p/data/PrivateKey.java b/core/java/src/net/i2p/data/PrivateKey.java index 7ea047314..eed7f1eb7 100644 --- a/core/java/src/net/i2p/data/PrivateKey.java +++ b/core/java/src/net/i2p/data/PrivateKey.java @@ -70,9 +70,15 @@ public class PrivateKey extends DataStructureImpl { return DataHelper.eq(_data, ((PrivateKey) obj)._data); } + /** the key has enough randomness in it, use the first 4 bytes for speed */ @Override public int hashCode() { - return DataHelper.hashCode(_data); + int rv = 0; + if (_data != null) { + for (int i = 0; i < 4; i++) + rv ^= (_data[i] << (i*8)); + } + return rv; } @Override @@ -100,4 +106,4 @@ public class PrivateKey extends DataStructureImpl { return KeyGenerator.getPublicKey(this); } -} \ No newline at end of file +} diff --git a/core/java/src/net/i2p/data/PublicKey.java b/core/java/src/net/i2p/data/PublicKey.java index d653ea9a4..01bca46e5 100644 --- a/core/java/src/net/i2p/data/PublicKey.java +++ b/core/java/src/net/i2p/data/PublicKey.java @@ -72,9 +72,15 @@ public class PublicKey extends DataStructureImpl { return DataHelper.eq(_data, ((PublicKey) obj)._data); } + /** the key has enough randomness in it, use the first 4 bytes for speed */ @Override public int hashCode() { - return DataHelper.hashCode(_data); + int rv = 0; + if (_data != null) { + for (int i = 0; i < 4; i++) + rv ^= (_data[i] << (i*8)); + } + return rv; } @Override @@ -94,4 +100,4 @@ public class PublicKey extends DataStructureImpl { return buf.toString(); } -} \ No newline at end of file +} diff --git a/core/java/src/net/i2p/data/RouterAddress.java b/core/java/src/net/i2p/data/RouterAddress.java index 393bf9621..f353f7d73 100644 --- a/core/java/src/net/i2p/data/RouterAddress.java +++ b/core/java/src/net/i2p/data/RouterAddress.java @@ -130,10 +130,10 @@ public class RouterAddress extends DataStructureImpl { && DataHelper.eq(getTransportStyle(), addr.getTransportStyle()); } + /** the style should be sufficient, for speed */ @Override public int hashCode() { - return getCost() + DataHelper.hashCode(getTransportStyle()) + DataHelper.hashCode(getExpiration()) - + DataHelper.hashCode(getOptions()); + return DataHelper.hashCode(getTransportStyle()); } @Override @@ -152,4 +152,4 @@ public class RouterAddress extends DataStructureImpl { buf.append("]"); return buf.toString(); } -} \ No newline at end of file +} diff --git a/core/java/src/net/i2p/data/RouterIdentity.java b/core/java/src/net/i2p/data/RouterIdentity.java index d7375b290..94f4760c1 100644 --- a/core/java/src/net/i2p/data/RouterIdentity.java +++ b/core/java/src/net/i2p/data/RouterIdentity.java @@ -101,10 +101,12 @@ public class RouterIdentity extends DataStructureImpl { && DataHelper.eq(getPublicKey(), ident.getPublicKey()); } + /** the public key has enough randomness in it to use it by itself for speed */ @Override public int hashCode() { - return DataHelper.hashCode(getCertificate()) + DataHelper.hashCode(getSigningPublicKey()) - + DataHelper.hashCode(getPublicKey()); + if (_publicKey == null) + return 0; + return _publicKey.hashCode(); } @Override @@ -140,4 +142,4 @@ public class RouterIdentity extends DataStructureImpl { __calculatedHash = SHA256Generator.getInstance().calculateHash(identBytes); return __calculatedHash; } -} \ No newline at end of file +} diff --git a/core/java/src/net/i2p/data/SessionKey.java b/core/java/src/net/i2p/data/SessionKey.java index b89bee505..1b2ae8a1e 100644 --- a/core/java/src/net/i2p/data/SessionKey.java +++ b/core/java/src/net/i2p/data/SessionKey.java @@ -76,9 +76,15 @@ public class SessionKey extends DataStructureImpl { return DataHelper.eq(_data, ((SessionKey) obj)._data); } + /** the key has enough randomness in it, use the first 4 bytes for speed */ @Override public int hashCode() { - return DataHelper.hashCode(_data); + int rv = 0; + if (_data != null) { + for (int i = 0; i < 4; i++) + rv ^= (_data[i] << (i*8)); + } + return rv; } @Override @@ -98,4 +104,4 @@ public class SessionKey extends DataStructureImpl { buf.append("]"); return buf.toString(); } -} \ No newline at end of file +} diff --git a/core/java/src/net/i2p/data/Signature.java b/core/java/src/net/i2p/data/Signature.java index 41a56aacd..cb43741c5 100644 --- a/core/java/src/net/i2p/data/Signature.java +++ b/core/java/src/net/i2p/data/Signature.java @@ -62,9 +62,15 @@ public class Signature extends DataStructureImpl { return DataHelper.eq(_data, ((Signature) obj)._data); } + /** the sig has enough randomness in it, use the first 4 bytes for speed */ @Override public int hashCode() { - return DataHelper.hashCode(_data); + int rv = 0; + if (_data != null) { + for (int i = 0; i < 4; i++) + rv ^= (_data[i] << (i*8)); + } + return rv; } @Override @@ -83,4 +89,4 @@ public class Signature extends DataStructureImpl { buf.append("]"); return buf.toString(); } -} \ No newline at end of file +} diff --git a/core/java/src/net/i2p/data/SigningPrivateKey.java b/core/java/src/net/i2p/data/SigningPrivateKey.java index 4a1a99ccb..1d2e95cb0 100644 --- a/core/java/src/net/i2p/data/SigningPrivateKey.java +++ b/core/java/src/net/i2p/data/SigningPrivateKey.java @@ -68,9 +68,15 @@ public class SigningPrivateKey extends DataStructureImpl { return DataHelper.eq(_data, ((SigningPrivateKey) obj)._data); } + /** the key has enough randomness in it, use the first 4 bytes for speed */ @Override public int hashCode() { - return DataHelper.hashCode(_data); + int rv = 0; + if (_data != null) { + for (int i = 0; i < 4; i++) + rv ^= (_data[i] << (i*8)); + } + return rv; } @Override @@ -96,4 +102,4 @@ public class SigningPrivateKey extends DataStructureImpl { public SigningPublicKey toPublic() { return KeyGenerator.getSigningPublicKey(this); } -} \ No newline at end of file +} diff --git a/core/java/src/net/i2p/data/SigningPublicKey.java b/core/java/src/net/i2p/data/SigningPublicKey.java index 09097593f..44b8f3c13 100644 --- a/core/java/src/net/i2p/data/SigningPublicKey.java +++ b/core/java/src/net/i2p/data/SigningPublicKey.java @@ -67,9 +67,15 @@ public class SigningPublicKey extends DataStructureImpl { return DataHelper.eq(_data, ((SigningPublicKey) obj)._data); } + /** the key has enough randomness in it, use the first 4 bytes for speed */ @Override public int hashCode() { - return DataHelper.hashCode(_data); + int rv = 0; + if (_data != null) { + for (int i = 0; i < 4; i++) + rv ^= (_data[i] << (i*8)); + } + return rv; } @Override @@ -88,4 +94,4 @@ public class SigningPublicKey extends DataStructureImpl { buf.append("]"); return buf.toString(); } -} \ No newline at end of file +} From 4bc52158330a9a5c4586ce6166386847b85eb1bb Mon Sep 17 00:00:00 2001 From: zzz Date: Thu, 27 Aug 2009 15:07:48 +0000 Subject: [PATCH 09/14] fix /i2p/B64 handling --- .../java/src/net/i2p/client/streaming/I2PSocketEepGet.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketEepGet.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketEepGet.java index 5cc8d694f..8aac29cff 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketEepGet.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketEepGet.java @@ -107,7 +107,7 @@ public class I2PSocketEepGet extends EepGet { try { int slash = 1 + file.substring(1).indexOf("/"); host = file.substring(1, slash); - _actualURL = "http:/" + file.substring(slash); // get the extra slash from the substring + _actualURL = "http://" + host + file.substring(slash); } catch (IndexOutOfBoundsException ioobe) { throw new IOException("Bad /i2p/ format: " + _actualURL); } From 1f5d7d7b5bb49a3ccb0ad3f2ffa9b1f90f4dc6f9 Mon Sep 17 00:00:00 2001 From: zzz Date: Thu, 27 Aug 2009 15:08:21 +0000 Subject: [PATCH 10/14] dont reopen tunnel to say goodbye to tracker --- apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java b/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java index c9087bc7e..46952d14a 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java +++ b/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java @@ -313,9 +313,9 @@ public class TrackerClient extends I2PAppThread try { // try to contact everybody we can - // We don't need I2CP connection for eepget - // if (!verifyConnected()) return; + // Don't try to restart I2CP connection just to say goodbye for (Iterator iter = trackers.iterator(); iter.hasNext(); ) { + if (!verifyConnected()) return; Tracker tr = (Tracker)iter.next(); if (tr.started && (!tr.stop) && tr.trackerProblems == null) doRequest(tr, infoHash, peerID, uploaded, From 787def6a1c76c45fc6def1dd1392a5b0b78f3020 Mon Sep 17 00:00:00 2001 From: zzz Date: Thu, 27 Aug 2009 15:27:46 +0000 Subject: [PATCH 11/14] * Tunnel: - Adjust the random drop probability for the message size --- .../router/tunnel/InboundGatewayReceiver.java | 2 +- .../router/tunnel/OutboundTunnelEndpoint.java | 2 +- .../i2p/router/tunnel/TunnelDispatcher.java | 19 +++++++++++++++++-- .../i2p/router/tunnel/TunnelParticipant.java | 2 +- 4 files changed, 20 insertions(+), 5 deletions(-) diff --git a/router/java/src/net/i2p/router/tunnel/InboundGatewayReceiver.java b/router/java/src/net/i2p/router/tunnel/InboundGatewayReceiver.java index 822c1f637..382be541f 100644 --- a/router/java/src/net/i2p/router/tunnel/InboundGatewayReceiver.java +++ b/router/java/src/net/i2p/router/tunnel/InboundGatewayReceiver.java @@ -35,7 +35,7 @@ public class InboundGatewayReceiver implements TunnelGateway.Receiver { } } - if (_context.tunnelDispatcher().shouldDropParticipatingMessage()) + if (_context.tunnelDispatcher().shouldDropParticipatingMessage("IBGW", encrypted.length)) return -1; _config.incrementSentMessages(); TunnelDataMessage msg = new TunnelDataMessage(_context); diff --git a/router/java/src/net/i2p/router/tunnel/OutboundTunnelEndpoint.java b/router/java/src/net/i2p/router/tunnel/OutboundTunnelEndpoint.java index 9aa9e667b..32e4c2c2e 100644 --- a/router/java/src/net/i2p/router/tunnel/OutboundTunnelEndpoint.java +++ b/router/java/src/net/i2p/router/tunnel/OutboundTunnelEndpoint.java @@ -43,7 +43,7 @@ public class OutboundTunnelEndpoint { + (toTunnel != null ? toTunnel.getTunnelId() + "" : "")); // don't drop it if we are the target if ((!_context.routerHash().equals(toRouter)) && - _context.tunnelDispatcher().shouldDropParticipatingMessage()) + _context.tunnelDispatcher().shouldDropParticipatingMessage("OBEP " + msg.getType(), msg.getMessageSize())) return; _config.incrementSentMessages(); _outDistributor.distribute(msg, toRouter, toTunnel); diff --git a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java index 6351c5855..b9da8d6d8 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java @@ -540,8 +540,17 @@ public class TunnelDispatcher implements Service { * This is similar to the code in ../RouterThrottleImpl.java * We drop in proportion to how far over the limit we are. * Perhaps an exponential function would be better? + * + * The drop probability is adjusted for the size of the message. + * At this stage, participants and IBGWs see a standard 1024 byte message. + * OBEPs however may see a wide variety of sizes. + * + * @param type unused except for logging + * @param length the length of the message */ - public boolean shouldDropParticipatingMessage() { + public boolean shouldDropParticipatingMessage(String type, int length) { + if (length <= 0) + return false; RateStat rs = _context.statManager().getRate("tunnel.participatingBandwidth"); if (rs == null) return false; @@ -574,13 +583,19 @@ public class TunnelDispatcher implements Service { float pctDrop = (used - maxBps) / used; if (pctDrop <= 0) return false; + // drop in proportion to size w.r.t. a standard 1024-byte message + // this is a little expensive but we want to adjust the curve between 0 and 1 + // Most messages are 1024, only at the OBEP do we see other sizes + if (length != 1024) + pctDrop = (float) Math.pow(pctDrop, 1024d / length); float rand = _context.random().nextFloat(); boolean reject = rand <= pctDrop; if (reject) { if (_log.shouldLog(Log.WARN)) { int availBps = (int) (((maxKBps*1024)*share) - used); _log.warn("Drop part. msg. avail/max/used " + availBps + "/" + (int) maxBps + "/" - + used + " %Drop = " + pctDrop); + + used + " %Drop = " + pctDrop + + ' ' + type + ' ' + length); } _context.statManager().addRateData("tunnel.participatingMessageDropped", 1, 0); } diff --git a/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java b/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java index b9d660751..a73208e3b 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java @@ -150,7 +150,7 @@ public class TunnelParticipant { } private void send(HopConfig config, TunnelDataMessage msg, RouterInfo ri) { - if (_context.tunnelDispatcher().shouldDropParticipatingMessage()) + if (_context.tunnelDispatcher().shouldDropParticipatingMessage("TDM", 1024)) return; _config.incrementSentMessages(); long oldId = msg.getUniqueId(); From e0dd1f13e31fe4c9ad7ad77e4e593da750afb783 Mon Sep 17 00:00:00 2001 From: zzz Date: Thu, 27 Aug 2009 15:29:23 +0000 Subject: [PATCH 12/14] keep publishing leaseset if we are restarting, to minimize downtime --- .../kademlia/KademliaNetworkDatabaseFacade.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java index aafd45e7e..6ff84f68c 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java @@ -490,8 +490,13 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { } if (!_context.clientManager().shouldPublishLeaseSet(h)) return; - if (_context.router().gracefulShutdownInProgress()) - return; + // If we're exiting, don't publish. + // If we're restarting, keep publishing to minimize the downtime. + if (_context.router().gracefulShutdownInProgress()) { + int code = _context.router().scheduledGracefulExitCode(); + if (code == Router.EXIT_GRACEFUL || code == Router.EXIT_HARD) + return; + } RepublishLeaseSetJob j = null; synchronized (_publishingLeaseSets) { From a3f290e4d8dad640b33321d3ad1f17f85f38b753 Mon Sep 17 00:00:00 2001 From: zzz Date: Thu, 27 Aug 2009 19:31:24 +0000 Subject: [PATCH 13/14] * Tunnel: - Adjust the random drop probability for the message size --- .../i2p/router/tunnel/TunnelDispatcher.java | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java index b9da8d6d8..9cd67561d 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java @@ -545,7 +545,13 @@ public class TunnelDispatcher implements Service { * At this stage, participants and IBGWs see a standard 1024 byte message. * OBEPs however may see a wide variety of sizes. * - * @param type unused except for logging + * Network-wise, it's most efficient to drop OBEP messages, because they + * are unfragmented and we know their size. Therefore we drop the big ones + * and we drop a single wrapped I2CP message, not a fragment of one or more messages. + * Also, the OBEP is the earliest identifiable hop in the message's path + * (a plain participant could be earlier or later, but on average is later) + * + * @param type message hop location and type * @param length the length of the message */ public boolean shouldDropParticipatingMessage(String type, int length) { @@ -583,11 +589,18 @@ public class TunnelDispatcher implements Service { float pctDrop = (used - maxBps) / used; if (pctDrop <= 0) return false; + // increase the drop probability for OBEP, + // and lower it for IBGW, for network efficiency + double len = length; + if (type.startsWith("OBEP")) + len *= 1.5; + else if (type.startsWith("IBGW")) + len /= 1.5; // drop in proportion to size w.r.t. a standard 1024-byte message // this is a little expensive but we want to adjust the curve between 0 and 1 // Most messages are 1024, only at the OBEP do we see other sizes - if (length != 1024) - pctDrop = (float) Math.pow(pctDrop, 1024d / length); + if (len != 1024d) + pctDrop = (float) Math.pow(pctDrop, 1024d / len); float rand = _context.random().nextFloat(); boolean reject = rand <= pctDrop; if (reject) { From 2e2bff3f0aa06e31aa1c85b53434eef60c8478aa Mon Sep 17 00:00:00 2001 From: zzz Date: Thu, 27 Aug 2009 19:43:57 +0000 Subject: [PATCH 14/14] fix the favicon again --- apps/routerconsole/jsp/css.jsp | 2 +- apps/routerconsole/jsp/error.jsp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/routerconsole/jsp/css.jsp b/apps/routerconsole/jsp/css.jsp index 422d1329c..ed8675030 100644 --- a/apps/routerconsole/jsp/css.jsp +++ b/apps/routerconsole/jsp/css.jsp @@ -22,7 +22,7 @@ } %> - + " /> console.css" rel="stylesheet" type="text/css"> diff --git a/apps/routerconsole/jsp/error.jsp b/apps/routerconsole/jsp/error.jsp index 017efaa34..3628a0c15 100644 --- a/apps/routerconsole/jsp/error.jsp +++ b/apps/routerconsole/jsp/error.jsp @@ -14,7 +14,7 @@ %> I2P Router Console <%@include file="css.jsp" %> - + <% if (System.getProperty("router.consoleNonce") == null) { System.setProperty("router.consoleNonce", new java.util.Random().nextLong() + "");