2005-03-01 jrandom

* Really disable the streaming lib packet caching
    * Synchronized a message handling point in the SDK (even though its use is
      already essentially single threaded, its better to play it safe)
    * Don't add new RepublishLeaseSetJobs on failure, just requeue up the
      existing one (duh)
    * Throttle the number of concurrent pending tunnel builds across all
      pools, in addition to simply throttling the number of new requests per
      minute for each pool individually.  This should avoid the cascading
      failure when tunnel builds take too long, as no new builds will be
      created until the previous ones are handled.
    * Factored out and extended the DataHelper's unit tests for dealing with
      long and date formatting.
    * Explicitly specify the HTTP auth realm as "i2prouter", though this
      alone doesn't address the bug where jetty asks for authentication too
      much.  (thanks orion!)
    * Updated the StreamSinkServer to ignore all read bytes, rather than write
      them to the filesystem.
This commit is contained in:
jrandom
2005-03-01 17:50:52 +00:00
committed by zzz
parent 469a0852d7
commit 57d6a2f645
14 changed files with 267 additions and 60 deletions

View File

@ -122,11 +122,12 @@ public class StreamSinkServer {
long written = 0;
int read = 0;
while ( (read = in.read(buf)) != -1) {
_fos.write(buf, 0, read);
//_fos.write(buf, 0, read);
written += read;
if (_log.shouldLog(Log.DEBUG))
_log.debug("read and wrote " + read);
}
_fos.write(("written: [" + written + "]\n").getBytes());
long lifetime = System.currentTimeMillis() - start;
_log.error("Got EOF from client socket [written=" + written + " lifetime=" + lifetime + "]");
} catch (IOException ioe) {
@ -150,7 +151,7 @@ public class StreamSinkServer {
StreamSinkServer server = null;
switch (args.length) {
case 0:
server = new StreamSinkServer("dataDir", "server.key", "localhost", 10001);
server = new StreamSinkServer("dataDir", "server.key", "localhost", 7654);
break;
case 2:
server = new StreamSinkServer(args[0], args[1]);

View File

@ -78,7 +78,7 @@ public class RouterConsoleRunner {
private void initialize(WebApplicationContext context) {
String password = getPassword();
if (password != null) {
HashUserRealm realm = new HashUserRealm();
HashUserRealm realm = new HashUserRealm("i2prouter");
realm.put("admin", password);
realm.addUserToRole("admin", "routerAdmin");
context.setRealm(realm);

View File

@ -344,8 +344,8 @@ public class MessageInputStream extends InputStream {
+ " readyBlocks=" + _readyDataBlocks.size()
+ " readTotal=" + _readTotal);
}
if (removed)
_cache.release(cur);
//if (removed)
// _cache.release(cur);
}
} // for (int i = 0; i < length; i++) {
} // synchronized (_dataLock)
@ -416,15 +416,16 @@ public class MessageInputStream extends InputStream {
public void close() {
synchronized (_dataLock) {
while (_readyDataBlocks.size() > 0)
_cache.release((ByteArray)_readyDataBlocks.remove(0));
//while (_readyDataBlocks.size() > 0)
// _cache.release((ByteArray)_readyDataBlocks.remove(0));
_readyDataBlocks.clear();
// we don't need the data, but we do need to keep track of the messageIds
// received, so we can ACK accordingly
for (Iterator iter = _notYetReadyBlocks.values().iterator(); iter.hasNext(); ) {
ByteArray ba = (ByteArray)iter.next();
//ba.setData(null);
_cache.release(ba);
ba.setData(null);
//_cache.release(ba);
}
_locallyClosed = true;
_dataLock.notifyAll();

View File

@ -209,8 +209,8 @@ public class Packet {
/** get the actual payload of the message. may be null */
public ByteArray getPayload() { return _payload; }
public void setPayload(ByteArray payload) {
if ( (_payload != null) && (_payload != payload) )
_cache.release(_payload);
//if ( (_payload != null) && (_payload != payload) )
// _cache.release(_payload);
_payload = payload;
if ( (payload != null) && (payload.getValid() > MAX_PAYLOAD_SIZE) )
throw new IllegalArgumentException("Too large payload: " + payload.getValid());
@ -466,7 +466,7 @@ public class Packet {
throw new IllegalArgumentException("length: " + length + " offset: " + offset + " begin: " + payloadBegin);
// skip ahead to the payload
_payload = _cache.acquire(); //new ByteArray(new byte[payloadSize]);
_payload = new ByteArray(new byte[payloadSize]); //_cache.acquire();
System.arraycopy(buffer, payloadBegin, _payload.getData(), 0, payloadSize);
_payload.setValid(payloadSize);
_payload.setOffset(0);

View File

@ -78,7 +78,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
/** class that generates new messages */
protected I2CPMessageProducer _producer;
/** map of integer --> MessagePayloadMessage */
Map _availableMessages;
private Map _availableMessages;
protected I2PClientMessageHandlerMap _handlerMap;
@ -290,7 +290,10 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
*
*/
public byte[] receiveMessage(int msgId) throws I2PSessionException {
MessagePayloadMessage msg = (MessagePayloadMessage) _availableMessages.remove(new Integer(msgId));
MessagePayloadMessage msg = null;
synchronized (_availableMessages) {
msg = (MessagePayloadMessage) _availableMessages.remove(new Integer(msgId));
}
if (msg == null) return null;
return msg.getPayload().getUnencryptedData();
}
@ -339,7 +342,9 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
* Recieve a payload message and let the app know its available
*/
public void addNewMessage(MessagePayloadMessage msg) {
_availableMessages.put(new Integer(msg.getMessageId().getMessageId()), msg);
synchronized (_availableMessages) {
_availableMessages.put(new Integer(msg.getMessageId().getMessageId()), msg);
}
int id = msg.getMessageId().getMessageId();
byte data[] = msg.getPayload().getUnencryptedData();
if ((data == null) || (data.length <= 0)) {

View File

@ -361,7 +361,7 @@ public class DataHelper {
*/
public static void writeLong(OutputStream rawStream, int numBytes, long value)
throws DataFormatException, IOException {
if (value < 0) throw new DataFormatException("Value is negative (" + value + ")");
for (int i = numBytes - 1; i >= 0; i--) {
byte cur = (byte)( (value >>> (i*8) ) & 0xFF);
rawStream.write(cur);
@ -369,6 +369,7 @@ public class DataHelper {
}
public static byte[] toLong(int numBytes, long value) throws IllegalArgumentException {
if (value < 0) throw new IllegalArgumentException("Negative value not allowed");
byte val[] = new byte[numBytes];
toLong(val, 0, numBytes, value);
return val;
@ -397,43 +398,6 @@ public class DataHelper {
return rv;
}
public static void main(String args[]) {
for (int i = 0; i <= 0xFF; i++)
testLong(1, i);
System.out.println("Test 1byte passed");
for (long i = 0; i <= 0xFFFF; i++)
testLong(2, i);
System.out.println("Test 2byte passed");
for (long i = 0; i <= 0xFFFFFF; i ++)
testLong(3, i);
System.out.println("Test 3byte passed");
for (long i = 0; i <= 0xFFFFFFFF; i++)
testLong(4, i);
System.out.println("Test 4byte passed");
for (long i = 0; i <= 0xFFFFFFFF; i++)
testLong(8, i);
System.out.println("Test 8byte passed");
}
private static void testLong(int numBytes, long value) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream(numBytes);
writeLong(baos, numBytes, value);
byte written[] = baos.toByteArray();
byte extract[] = toLong(numBytes, value);
if (!eq(written, extract))
throw new RuntimeException("testLong("+numBytes+","+value+") FAILED");
long read = fromLong(extract, 0, extract.length);
if (read != value)
throw new RuntimeException("testLong("+numBytes+","+value+") FAILED on read (" + read + ")");
read = readLong(new ByteArrayInputStream(written), numBytes);
if (read != value)
throw new RuntimeException("testLong("+numBytes+","+value+") FAILED on readLong (" + read + ")");
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}
/** Read in a date from the stream as specified by the I2P data structure spec.
* A date is an 8 byte unsigned integer in network byte order specifying the number of
* milliseconds since midnight on January 1, 1970 in the GMT timezone. If the number is

View File

@ -0,0 +1,160 @@
package net.i2p.data;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.Calendar;
import java.util.Date;
import java.util.TimeZone;
import net.i2p.I2PAppContext;
import net.i2p.util.Log;
/**
* basic unit tests for the DataHelper
*
*/
public class DataHelperTest {
private I2PAppContext _context;
private Log _log;
public DataHelperTest(I2PAppContext ctx) {
_context = ctx;
_log = ctx.logManager().getLog(DataHelperTest.class);
}
public void runTests() {
// long (read/write/to/from)
testLong();
// date (read/write/to/from)
testDate();
// string
// properties
// boolean
// readline
// compress
}
/**
* Test to/from/read/writeLong with every 1, 2, and 4 byte value, as
* well as some 8 byte values.
*/
public void testLong() {
for (int i = 0; i <= 0xFF; i++)
testLong(1, i);
System.out.println("Test 1byte passed");
for (long i = 0; i <= 0xFFFF; i++)
testLong(2, i);
System.out.println("Test 2byte passed");
for (long i = 0; i <= 0xFFFFFF; i ++)
testLong(3, i);
System.out.println("Test 3byte passed");
for (long i = 0; i <= 0xFFFFFFFF; i++)
testLong(4, i);
System.out.println("Test 4byte passed");
// i know, doesnt test (2^63)-(2^64-1)
for (long i = Long.MAX_VALUE - 0xFFFFFFl; i < Long.MAX_VALUE; i++)
testLong(8, i);
System.out.println("Test 8byte passed");
}
private static void testLong(int numBytes, long value) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream(numBytes);
DataHelper.writeLong(baos, numBytes, value);
byte written[] = baos.toByteArray();
byte extract[] = DataHelper.toLong(numBytes, value);
if (extract.length != numBytes)
throw new RuntimeException("testLong("+numBytes+","+value+") FAILED (len="+extract.length+")");
if (!DataHelper.eq(written, extract))
throw new RuntimeException("testLong("+numBytes+","+value+") FAILED");
byte extract2[] = new byte[numBytes];
DataHelper.toLong(extract2, 0, numBytes, value);
if (!DataHelper.eq(extract, extract2))
throw new RuntimeException("testLong("+numBytes+","+value+") FAILED on toLong");
long read = DataHelper.fromLong(extract, 0, numBytes);
if (read != value)
throw new RuntimeException("testLong("+numBytes+","+value+") FAILED on read (" + read + ")");
ByteArrayInputStream bais = new ByteArrayInputStream(written);
read = DataHelper.readLong(bais, numBytes);
if (read != value)
throw new RuntimeException("testLong("+numBytes+","+value+") FAILED on readLong (" + read + ")");
read = DataHelper.fromLong(written, 0, numBytes);
if (read != value)
throw new RuntimeException("testLong("+numBytes+","+value+") FAILED on fromLong (" + read + ")");
} catch (Exception e) {
throw new RuntimeException("test(" + numBytes +","+ value +"): " + e.getMessage());
}
}
private void testDate() {
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
cal.set(Calendar.YEAR, 1970);
cal.set(Calendar.MONTH, 0);
cal.set(Calendar.DAY_OF_MONTH, 1);
cal.set(Calendar.HOUR_OF_DAY, 0);
cal.set(Calendar.MINUTE, 0);
cal.set(Calendar.SECOND, 0);
testDate(cal.getTime());
cal.set(Calendar.SECOND, 1);
testDate(cal.getTime());
cal.set(Calendar.YEAR, 1999);
cal.set(Calendar.MONTH, 11);
cal.set(Calendar.DAY_OF_MONTH, 31);
cal.set(Calendar.HOUR_OF_DAY, 23);
cal.set(Calendar.MINUTE, 59);
cal.set(Calendar.SECOND, 59);
testDate(cal.getTime());
cal.set(Calendar.YEAR, 2000);
cal.set(Calendar.MONTH, 0);
cal.set(Calendar.DAY_OF_MONTH, 1);
cal.set(Calendar.HOUR_OF_DAY, 0);
cal.set(Calendar.MINUTE, 0);
cal.set(Calendar.SECOND, 0);
testDate(cal.getTime());
cal.setTimeInMillis(System.currentTimeMillis());
testDate(cal.getTime());
cal.set(Calendar.SECOND, cal.get(Calendar.SECOND)+10);
testDate(cal.getTime());
try {
cal.set(Calendar.YEAR, 1969);
cal.set(Calendar.MONTH, 11);
cal.set(Calendar.DAY_OF_MONTH, 31);
cal.set(Calendar.HOUR_OF_DAY, 23);
cal.set(Calendar.MINUTE, 59);
cal.set(Calendar.SECOND, 59);
testDate(cal.getTime());
System.err.println("foo! this should fail");
} catch (RuntimeException re) {
// should fail on dates prior to the epoch
}
}
private void testDate(Date when) {
try {
byte buf[] = new byte[DataHelper.DATE_LENGTH];
DataHelper.toDate(buf, 0, when.getTime());
byte tbuf[] = DataHelper.toDate(when);
if (!DataHelper.eq(tbuf, buf))
throw new RuntimeException("testDate("+when.toString()+") failed on toDate");
Date time = DataHelper.fromDate(buf, 0);
if (when.getTime() != time.getTime())
throw new RuntimeException("testDate("+when.toString()+") failed (" + time.toString() + ")");
System.out.println("eq: " + time);
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}
public static void main(String args[]) {
DataHelperTest test = new DataHelperTest(I2PAppContext.getGlobalContext());
test.runTests();
}
}

View File

@ -1,4 +1,23 @@
$Id: history.txt,v 1.159 2005/02/26 19:03:42 jrandom Exp $
$Id: history.txt,v 1.160 2005/02/27 17:09:37 jrandom Exp $
2005-03-01 jrandom
* Really disable the streaming lib packet caching
* Synchronized a message handling point in the SDK (even though its use is
already essentially single threaded, its better to play it safe)
* Don't add new RepublishLeaseSetJobs on failure, just requeue up the
existing one (duh)
* Throttle the number of concurrent pending tunnel builds across all
pools, in addition to simply throttling the number of new requests per
minute for each pool individually. This should avoid the cascading
failure when tunnel builds take too long, as no new builds will be
created until the previous ones are handled.
* Factored out and extended the DataHelper's unit tests for dealing with
long and date formatting.
* Explicitly specify the HTTP auth realm as "i2prouter", though this
alone doesn't address the bug where jetty asks for authentication too
much. (thanks orion!)
* Updated the StreamSinkServer to ignore all read bytes, rather than write
them to the filesystem.
2005-02-27 jrandom
* Don't rerequest leaseSets if there are already pending requests

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.154 $ $Date: 2005/02/26 19:03:42 $";
public final static String ID = "$Revision: 1.155 $ $Date: 2005/02/27 17:09:37 $";
public final static String VERSION = "0.5.0.1";
public final static long BUILD = 6;
public final static long BUILD = 7;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -76,9 +76,7 @@ public class RepublishLeaseSetJob extends JobImpl {
public void runJob() {
if (_log.shouldLog(Log.WARN))
_log.warn("FAILED publishing of the leaseSet for " + _dest.toBase64());
LeaseSet ls = _facade.lookupLeaseSetLocally(_dest);
if ( (ls != null) && (ls.isCurrent(0)) )
getContext().jobQueue().addJob(new RepublishLeaseSetJob(getContext(), _facade, _dest));
RepublishLeaseSetJob.this.requeue(30*1000);
}
}
}

View File

@ -29,6 +29,8 @@ class OnCreatedJob extends JobImpl {
} else {
getContext().tunnelDispatcher().joinOutbound(_cfg);
}
_pool.getManager().buildComplete();
_pool.addTunnel(_cfg);
TestJob testJob = (_cfg.getLength() > 1 ? new TestJob(getContext(), _cfg, _pool) : null);
RebuildJob rebuildJob = new RebuildJob(getContext(), _cfg, _pool);

View File

@ -31,12 +31,16 @@ public class TunnelBuilder {
buildTunnel(ctx, pool, false);
}
public void buildTunnel(RouterContext ctx, TunnelPool pool, boolean zeroHop) {
if (!pool.isAlive()) return;
if (!pool.isAlive()) {
pool.getManager().buildComplete();
return;
}
// this is probably overkill (ya think?)
pool.refreshSettings();
PooledTunnelCreatorConfig cfg = configTunnel(ctx, pool, zeroHop);
if (cfg == null) {
pool.getManager().buildComplete();
return;
}
OnCreatedJob onCreated = new OnCreatedJob(ctx, pool, cfg);
@ -111,6 +115,7 @@ public class TunnelBuilder {
public String getName() { return "Tunnel create failed"; }
public void runJob() {
// yikes, nothing left, lets get some backup (if we're allowed)
_pool.getManager().buildComplete();
_pool.refreshBuilders();
}
}

View File

@ -120,6 +120,13 @@ public class TunnelPool {
if (build > (MAX_BUILDS_PER_MINUTE - _buildsThisMinute))
build = (MAX_BUILDS_PER_MINUTE - _buildsThisMinute);
int wanted = build;
build = _manager.allocateBuilds(build);
if ( (wanted != build) && (_log.shouldLog(Log.ERROR)) )
_log.error("Wanted to build " + wanted + " tunnels, but throttled down to "
+ build + ", due to concurrent requests (cpu overload?)");
for (int i = 0; i < build; i++)
_builder.buildTunnel(_context, this);
_buildsThisMinute += build;
@ -130,6 +137,8 @@ public class TunnelPool {
}
}
TunnelPoolManager getManager() { return _manager; }
void refreshSettings() {
if (_settings.getDestination() != null) {
return; // don't override client specified settings

View File

@ -40,6 +40,10 @@ public class TunnelPoolManager implements TunnelManagerFacade {
private Map _clientOutboundPools;
private TunnelPool _inboundExploratory;
private TunnelPool _outboundExploratory;
/** how many build requests are in process */
private int _outstandingBuilds;
/** max # of concurrent build requests */
private int _maxOutstandingBuilds;
public TunnelPoolManager(RouterContext ctx) {
_context = ctx;
@ -53,6 +57,16 @@ public class TunnelPoolManager implements TunnelManagerFacade {
_clientInboundPools = new HashMap(4);
_clientOutboundPools = new HashMap(4);
_outstandingBuilds = 0;
_maxOutstandingBuilds = 10;
String max = ctx.getProperty("router.tunnel.maxConcurrentBuilds", "10");
if (max != null) {
try {
_maxOutstandingBuilds = Integer.parseInt(max);
} catch (NumberFormatException nfe) {
_maxOutstandingBuilds = 10;
}
}
ctx.statManager().createRateStat("tunnel.testSuccessTime",
"How long do successful tunnel tests take?", "Tunnels",
@ -248,6 +262,35 @@ public class TunnelPoolManager implements TunnelManagerFacade {
outbound.shutdown();
}
/**
* Check to make sure we can build this many new tunnels (throttled so
* we don't build too many at a time across all pools).
*
* @param wanted how many tunnels the pool wants to build
* @return how many are allowed to be built
*/
int allocateBuilds(int wanted) {
synchronized (this) {
if (_outstandingBuilds >= _maxOutstandingBuilds)
return 0;
if (_outstandingBuilds + wanted < _maxOutstandingBuilds) {
_outstandingBuilds += wanted;
return wanted;
} else {
int allowed = _maxOutstandingBuilds - _outstandingBuilds;
_outstandingBuilds = _maxOutstandingBuilds;
return allowed;
}
}
}
void buildComplete() {
synchronized (this) {
if (_outstandingBuilds > 0)
_outstandingBuilds--;
}
}
public void startup() {
TunnelBuilder builder = new TunnelBuilder();
ExploratoryPeerSelector selector = new ExploratoryPeerSelector();