diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java index 31f5283232..900d18b1de 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java @@ -122,11 +122,12 @@ public class StreamSinkServer { long written = 0; int read = 0; while ( (read = in.read(buf)) != -1) { - _fos.write(buf, 0, read); + //_fos.write(buf, 0, read); written += read; if (_log.shouldLog(Log.DEBUG)) _log.debug("read and wrote " + read); } + _fos.write(("written: [" + written + "]\n").getBytes()); long lifetime = System.currentTimeMillis() - start; _log.error("Got EOF from client socket [written=" + written + " lifetime=" + lifetime + "]"); } catch (IOException ioe) { @@ -150,7 +151,7 @@ public class StreamSinkServer { StreamSinkServer server = null; switch (args.length) { case 0: - server = new StreamSinkServer("dataDir", "server.key", "localhost", 10001); + server = new StreamSinkServer("dataDir", "server.key", "localhost", 7654); break; case 2: server = new StreamSinkServer(args[0], args[1]); diff --git a/apps/routerconsole/java/src/net/i2p/router/web/RouterConsoleRunner.java b/apps/routerconsole/java/src/net/i2p/router/web/RouterConsoleRunner.java index 74840d7fc8..b6f8c63386 100644 --- a/apps/routerconsole/java/src/net/i2p/router/web/RouterConsoleRunner.java +++ b/apps/routerconsole/java/src/net/i2p/router/web/RouterConsoleRunner.java @@ -78,7 +78,7 @@ public class RouterConsoleRunner { private void initialize(WebApplicationContext context) { String password = getPassword(); if (password != null) { - HashUserRealm realm = new HashUserRealm(); + HashUserRealm realm = new HashUserRealm("i2prouter"); realm.put("admin", password); realm.addUserToRole("admin", "routerAdmin"); context.setRealm(realm); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java index 1b49245c29..84e7d6bd6e 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java @@ -344,8 +344,8 @@ public class MessageInputStream extends InputStream { + " readyBlocks=" + _readyDataBlocks.size() + " readTotal=" + _readTotal); } - if (removed) - _cache.release(cur); + //if (removed) + // _cache.release(cur); } } // for (int i = 0; i < length; i++) { } // synchronized (_dataLock) @@ -416,15 +416,16 @@ public class MessageInputStream extends InputStream { public void close() { synchronized (_dataLock) { - while (_readyDataBlocks.size() > 0) - _cache.release((ByteArray)_readyDataBlocks.remove(0)); + //while (_readyDataBlocks.size() > 0) + // _cache.release((ByteArray)_readyDataBlocks.remove(0)); + _readyDataBlocks.clear(); // we don't need the data, but we do need to keep track of the messageIds // received, so we can ACK accordingly for (Iterator iter = _notYetReadyBlocks.values().iterator(); iter.hasNext(); ) { ByteArray ba = (ByteArray)iter.next(); - //ba.setData(null); - _cache.release(ba); + ba.setData(null); + //_cache.release(ba); } _locallyClosed = true; _dataLock.notifyAll(); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java index ca11af9c04..d5d84d2ab4 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java @@ -209,8 +209,8 @@ public class Packet { /** get the actual payload of the message. may be null */ public ByteArray getPayload() { return _payload; } public void setPayload(ByteArray payload) { - if ( (_payload != null) && (_payload != payload) ) - _cache.release(_payload); + //if ( (_payload != null) && (_payload != payload) ) + // _cache.release(_payload); _payload = payload; if ( (payload != null) && (payload.getValid() > MAX_PAYLOAD_SIZE) ) throw new IllegalArgumentException("Too large payload: " + payload.getValid()); @@ -466,7 +466,7 @@ public class Packet { throw new IllegalArgumentException("length: " + length + " offset: " + offset + " begin: " + payloadBegin); // skip ahead to the payload - _payload = _cache.acquire(); //new ByteArray(new byte[payloadSize]); + _payload = new ByteArray(new byte[payloadSize]); //_cache.acquire(); System.arraycopy(buffer, payloadBegin, _payload.getData(), 0, payloadSize); _payload.setValid(payloadSize); _payload.setOffset(0); diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index 8fc8e60735..fa28afce19 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -78,7 +78,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa /** class that generates new messages */ protected I2CPMessageProducer _producer; /** map of integer --> MessagePayloadMessage */ - Map _availableMessages; + private Map _availableMessages; protected I2PClientMessageHandlerMap _handlerMap; @@ -290,7 +290,10 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa * */ public byte[] receiveMessage(int msgId) throws I2PSessionException { - MessagePayloadMessage msg = (MessagePayloadMessage) _availableMessages.remove(new Integer(msgId)); + MessagePayloadMessage msg = null; + synchronized (_availableMessages) { + msg = (MessagePayloadMessage) _availableMessages.remove(new Integer(msgId)); + } if (msg == null) return null; return msg.getPayload().getUnencryptedData(); } @@ -339,7 +342,9 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa * Recieve a payload message and let the app know its available */ public void addNewMessage(MessagePayloadMessage msg) { - _availableMessages.put(new Integer(msg.getMessageId().getMessageId()), msg); + synchronized (_availableMessages) { + _availableMessages.put(new Integer(msg.getMessageId().getMessageId()), msg); + } int id = msg.getMessageId().getMessageId(); byte data[] = msg.getPayload().getUnencryptedData(); if ((data == null) || (data.length <= 0)) { diff --git a/core/java/src/net/i2p/data/DataHelper.java b/core/java/src/net/i2p/data/DataHelper.java index 879afcf017..aadec1549c 100644 --- a/core/java/src/net/i2p/data/DataHelper.java +++ b/core/java/src/net/i2p/data/DataHelper.java @@ -361,7 +361,7 @@ public class DataHelper { */ public static void writeLong(OutputStream rawStream, int numBytes, long value) throws DataFormatException, IOException { - + if (value < 0) throw new DataFormatException("Value is negative (" + value + ")"); for (int i = numBytes - 1; i >= 0; i--) { byte cur = (byte)( (value >>> (i*8) ) & 0xFF); rawStream.write(cur); @@ -369,6 +369,7 @@ public class DataHelper { } public static byte[] toLong(int numBytes, long value) throws IllegalArgumentException { + if (value < 0) throw new IllegalArgumentException("Negative value not allowed"); byte val[] = new byte[numBytes]; toLong(val, 0, numBytes, value); return val; @@ -397,43 +398,6 @@ public class DataHelper { return rv; } - public static void main(String args[]) { - for (int i = 0; i <= 0xFF; i++) - testLong(1, i); - System.out.println("Test 1byte passed"); - for (long i = 0; i <= 0xFFFF; i++) - testLong(2, i); - System.out.println("Test 2byte passed"); - for (long i = 0; i <= 0xFFFFFF; i ++) - testLong(3, i); - System.out.println("Test 3byte passed"); - for (long i = 0; i <= 0xFFFFFFFF; i++) - testLong(4, i); - System.out.println("Test 4byte passed"); - for (long i = 0; i <= 0xFFFFFFFF; i++) - testLong(8, i); - System.out.println("Test 8byte passed"); - } - private static void testLong(int numBytes, long value) { - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(numBytes); - writeLong(baos, numBytes, value); - byte written[] = baos.toByteArray(); - byte extract[] = toLong(numBytes, value); - if (!eq(written, extract)) - throw new RuntimeException("testLong("+numBytes+","+value+") FAILED"); - - long read = fromLong(extract, 0, extract.length); - if (read != value) - throw new RuntimeException("testLong("+numBytes+","+value+") FAILED on read (" + read + ")"); - read = readLong(new ByteArrayInputStream(written), numBytes); - if (read != value) - throw new RuntimeException("testLong("+numBytes+","+value+") FAILED on readLong (" + read + ")"); - } catch (Exception e) { - throw new RuntimeException(e.getMessage()); - } - } - /** Read in a date from the stream as specified by the I2P data structure spec. * A date is an 8 byte unsigned integer in network byte order specifying the number of * milliseconds since midnight on January 1, 1970 in the GMT timezone. If the number is diff --git a/core/java/test/net/i2p/data/DataHelperTest.java b/core/java/test/net/i2p/data/DataHelperTest.java new file mode 100644 index 0000000000..486fe3ba4f --- /dev/null +++ b/core/java/test/net/i2p/data/DataHelperTest.java @@ -0,0 +1,160 @@ +package net.i2p.data; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.util.Calendar; +import java.util.Date; +import java.util.TimeZone; + +import net.i2p.I2PAppContext; +import net.i2p.util.Log; + +/** + * basic unit tests for the DataHelper + * + */ +public class DataHelperTest { + private I2PAppContext _context; + private Log _log; + + public DataHelperTest(I2PAppContext ctx) { + _context = ctx; + _log = ctx.logManager().getLog(DataHelperTest.class); + } + + public void runTests() { + // long (read/write/to/from) + testLong(); + // date (read/write/to/from) + testDate(); + // string + // properties + // boolean + // readline + // compress + } + + /** + * Test to/from/read/writeLong with every 1, 2, and 4 byte value, as + * well as some 8 byte values. + */ + public void testLong() { + for (int i = 0; i <= 0xFF; i++) + testLong(1, i); + System.out.println("Test 1byte passed"); + for (long i = 0; i <= 0xFFFF; i++) + testLong(2, i); + System.out.println("Test 2byte passed"); + for (long i = 0; i <= 0xFFFFFF; i ++) + testLong(3, i); + System.out.println("Test 3byte passed"); + for (long i = 0; i <= 0xFFFFFFFF; i++) + testLong(4, i); + System.out.println("Test 4byte passed"); + // i know, doesnt test (2^63)-(2^64-1) + for (long i = Long.MAX_VALUE - 0xFFFFFFl; i < Long.MAX_VALUE; i++) + testLong(8, i); + System.out.println("Test 8byte passed"); + } + private static void testLong(int numBytes, long value) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(numBytes); + DataHelper.writeLong(baos, numBytes, value); + byte written[] = baos.toByteArray(); + byte extract[] = DataHelper.toLong(numBytes, value); + if (extract.length != numBytes) + throw new RuntimeException("testLong("+numBytes+","+value+") FAILED (len="+extract.length+")"); + if (!DataHelper.eq(written, extract)) + throw new RuntimeException("testLong("+numBytes+","+value+") FAILED"); + byte extract2[] = new byte[numBytes]; + DataHelper.toLong(extract2, 0, numBytes, value); + if (!DataHelper.eq(extract, extract2)) + throw new RuntimeException("testLong("+numBytes+","+value+") FAILED on toLong"); + + long read = DataHelper.fromLong(extract, 0, numBytes); + if (read != value) + throw new RuntimeException("testLong("+numBytes+","+value+") FAILED on read (" + read + ")"); + + ByteArrayInputStream bais = new ByteArrayInputStream(written); + read = DataHelper.readLong(bais, numBytes); + if (read != value) + throw new RuntimeException("testLong("+numBytes+","+value+") FAILED on readLong (" + read + ")"); + read = DataHelper.fromLong(written, 0, numBytes); + if (read != value) + throw new RuntimeException("testLong("+numBytes+","+value+") FAILED on fromLong (" + read + ")"); + } catch (Exception e) { + throw new RuntimeException("test(" + numBytes +","+ value +"): " + e.getMessage()); + } + } + + private void testDate() { + TimeZone.setDefault(TimeZone.getTimeZone("UTC")); + Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + cal.set(Calendar.YEAR, 1970); + cal.set(Calendar.MONTH, 0); + cal.set(Calendar.DAY_OF_MONTH, 1); + cal.set(Calendar.HOUR_OF_DAY, 0); + cal.set(Calendar.MINUTE, 0); + cal.set(Calendar.SECOND, 0); + testDate(cal.getTime()); + + cal.set(Calendar.SECOND, 1); + testDate(cal.getTime()); + + cal.set(Calendar.YEAR, 1999); + cal.set(Calendar.MONTH, 11); + cal.set(Calendar.DAY_OF_MONTH, 31); + cal.set(Calendar.HOUR_OF_DAY, 23); + cal.set(Calendar.MINUTE, 59); + cal.set(Calendar.SECOND, 59); + testDate(cal.getTime()); + + cal.set(Calendar.YEAR, 2000); + cal.set(Calendar.MONTH, 0); + cal.set(Calendar.DAY_OF_MONTH, 1); + cal.set(Calendar.HOUR_OF_DAY, 0); + cal.set(Calendar.MINUTE, 0); + cal.set(Calendar.SECOND, 0); + testDate(cal.getTime()); + + cal.setTimeInMillis(System.currentTimeMillis()); + testDate(cal.getTime()); + + cal.set(Calendar.SECOND, cal.get(Calendar.SECOND)+10); + testDate(cal.getTime()); + + try { + cal.set(Calendar.YEAR, 1969); + cal.set(Calendar.MONTH, 11); + cal.set(Calendar.DAY_OF_MONTH, 31); + cal.set(Calendar.HOUR_OF_DAY, 23); + cal.set(Calendar.MINUTE, 59); + cal.set(Calendar.SECOND, 59); + testDate(cal.getTime()); + System.err.println("foo! this should fail"); + } catch (RuntimeException re) { + // should fail on dates prior to the epoch + } + } + + private void testDate(Date when) { + try { + byte buf[] = new byte[DataHelper.DATE_LENGTH]; + DataHelper.toDate(buf, 0, when.getTime()); + byte tbuf[] = DataHelper.toDate(when); + if (!DataHelper.eq(tbuf, buf)) + throw new RuntimeException("testDate("+when.toString()+") failed on toDate"); + Date time = DataHelper.fromDate(buf, 0); + if (when.getTime() != time.getTime()) + throw new RuntimeException("testDate("+when.toString()+") failed (" + time.toString() + ")"); + System.out.println("eq: " + time); + } catch (Exception e) { + throw new RuntimeException(e.getMessage()); + } + } + + public static void main(String args[]) { + DataHelperTest test = new DataHelperTest(I2PAppContext.getGlobalContext()); + test.runTests(); + } +} diff --git a/history.txt b/history.txt index c1025724f9..a584c0b229 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,23 @@ -$Id: history.txt,v 1.159 2005/02/26 19:03:42 jrandom Exp $ +$Id: history.txt,v 1.160 2005/02/27 17:09:37 jrandom Exp $ + +2005-03-01 jrandom + * Really disable the streaming lib packet caching + * Synchronized a message handling point in the SDK (even though its use is + already essentially single threaded, its better to play it safe) + * Don't add new RepublishLeaseSetJobs on failure, just requeue up the + existing one (duh) + * Throttle the number of concurrent pending tunnel builds across all + pools, in addition to simply throttling the number of new requests per + minute for each pool individually. This should avoid the cascading + failure when tunnel builds take too long, as no new builds will be + created until the previous ones are handled. + * Factored out and extended the DataHelper's unit tests for dealing with + long and date formatting. + * Explicitly specify the HTTP auth realm as "i2prouter", though this + alone doesn't address the bug where jetty asks for authentication too + much. (thanks orion!) + * Updated the StreamSinkServer to ignore all read bytes, rather than write + them to the filesystem. 2005-02-27 jrandom * Don't rerequest leaseSets if there are already pending requests diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 757d4f103c..464a5c8993 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.154 $ $Date: 2005/02/26 19:03:42 $"; + public final static String ID = "$Revision: 1.155 $ $Date: 2005/02/27 17:09:37 $"; public final static String VERSION = "0.5.0.1"; - public final static long BUILD = 6; + public final static long BUILD = 7; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/RepublishLeaseSetJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/RepublishLeaseSetJob.java index b45697621f..a42d94ac79 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/RepublishLeaseSetJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/RepublishLeaseSetJob.java @@ -76,9 +76,7 @@ public class RepublishLeaseSetJob extends JobImpl { public void runJob() { if (_log.shouldLog(Log.WARN)) _log.warn("FAILED publishing of the leaseSet for " + _dest.toBase64()); - LeaseSet ls = _facade.lookupLeaseSetLocally(_dest); - if ( (ls != null) && (ls.isCurrent(0)) ) - getContext().jobQueue().addJob(new RepublishLeaseSetJob(getContext(), _facade, _dest)); + RepublishLeaseSetJob.this.requeue(30*1000); } } } diff --git a/router/java/src/net/i2p/router/tunnel/pool/OnCreatedJob.java b/router/java/src/net/i2p/router/tunnel/pool/OnCreatedJob.java index 74b5ed28ff..4dc86f65f4 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/OnCreatedJob.java +++ b/router/java/src/net/i2p/router/tunnel/pool/OnCreatedJob.java @@ -29,6 +29,8 @@ class OnCreatedJob extends JobImpl { } else { getContext().tunnelDispatcher().joinOutbound(_cfg); } + + _pool.getManager().buildComplete(); _pool.addTunnel(_cfg); TestJob testJob = (_cfg.getLength() > 1 ? new TestJob(getContext(), _cfg, _pool) : null); RebuildJob rebuildJob = new RebuildJob(getContext(), _cfg, _pool); diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelBuilder.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelBuilder.java index abc225e584..09f48eeca4 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelBuilder.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelBuilder.java @@ -31,12 +31,16 @@ public class TunnelBuilder { buildTunnel(ctx, pool, false); } public void buildTunnel(RouterContext ctx, TunnelPool pool, boolean zeroHop) { - if (!pool.isAlive()) return; + if (!pool.isAlive()) { + pool.getManager().buildComplete(); + return; + } // this is probably overkill (ya think?) pool.refreshSettings(); PooledTunnelCreatorConfig cfg = configTunnel(ctx, pool, zeroHop); if (cfg == null) { + pool.getManager().buildComplete(); return; } OnCreatedJob onCreated = new OnCreatedJob(ctx, pool, cfg); @@ -111,6 +115,7 @@ public class TunnelBuilder { public String getName() { return "Tunnel create failed"; } public void runJob() { // yikes, nothing left, lets get some backup (if we're allowed) + _pool.getManager().buildComplete(); _pool.refreshBuilders(); } } diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java index aaedc38d27..d217e69948 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java @@ -120,6 +120,13 @@ public class TunnelPool { if (build > (MAX_BUILDS_PER_MINUTE - _buildsThisMinute)) build = (MAX_BUILDS_PER_MINUTE - _buildsThisMinute); + int wanted = build; + build = _manager.allocateBuilds(build); + + if ( (wanted != build) && (_log.shouldLog(Log.ERROR)) ) + _log.error("Wanted to build " + wanted + " tunnels, but throttled down to " + + build + ", due to concurrent requests (cpu overload?)"); + for (int i = 0; i < build; i++) _builder.buildTunnel(_context, this); _buildsThisMinute += build; @@ -130,6 +137,8 @@ public class TunnelPool { } } + TunnelPoolManager getManager() { return _manager; } + void refreshSettings() { if (_settings.getDestination() != null) { return; // don't override client specified settings diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java index a486ddb8db..a323f0d46d 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java @@ -40,6 +40,10 @@ public class TunnelPoolManager implements TunnelManagerFacade { private Map _clientOutboundPools; private TunnelPool _inboundExploratory; private TunnelPool _outboundExploratory; + /** how many build requests are in process */ + private int _outstandingBuilds; + /** max # of concurrent build requests */ + private int _maxOutstandingBuilds; public TunnelPoolManager(RouterContext ctx) { _context = ctx; @@ -53,6 +57,16 @@ public class TunnelPoolManager implements TunnelManagerFacade { _clientInboundPools = new HashMap(4); _clientOutboundPools = new HashMap(4); + _outstandingBuilds = 0; + _maxOutstandingBuilds = 10; + String max = ctx.getProperty("router.tunnel.maxConcurrentBuilds", "10"); + if (max != null) { + try { + _maxOutstandingBuilds = Integer.parseInt(max); + } catch (NumberFormatException nfe) { + _maxOutstandingBuilds = 10; + } + } ctx.statManager().createRateStat("tunnel.testSuccessTime", "How long do successful tunnel tests take?", "Tunnels", @@ -248,6 +262,35 @@ public class TunnelPoolManager implements TunnelManagerFacade { outbound.shutdown(); } + /** + * Check to make sure we can build this many new tunnels (throttled so + * we don't build too many at a time across all pools). + * + * @param wanted how many tunnels the pool wants to build + * @return how many are allowed to be built + */ + int allocateBuilds(int wanted) { + synchronized (this) { + if (_outstandingBuilds >= _maxOutstandingBuilds) + return 0; + if (_outstandingBuilds + wanted < _maxOutstandingBuilds) { + _outstandingBuilds += wanted; + return wanted; + } else { + int allowed = _maxOutstandingBuilds - _outstandingBuilds; + _outstandingBuilds = _maxOutstandingBuilds; + return allowed; + } + } + } + + void buildComplete() { + synchronized (this) { + if (_outstandingBuilds > 0) + _outstandingBuilds--; + } + } + public void startup() { TunnelBuilder builder = new TunnelBuilder(); ExploratoryPeerSelector selector = new ExploratoryPeerSelector();