diff --git a/history.txt b/history.txt index 0174b02601..805a159a21 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,17 @@ -$Id: history.txt,v 1.45 2004/10/12 16:29:42 jrandom Exp $ +$Id: history.txt,v 1.46 2004/10/13 14:40:47 jrandom Exp $ + +2004-10-14 jrandom + * Allow for a configurable tunnel "growth factor", rather than trying + to achieve a steady state. This will let us grow gradually when + the router is needed more, rather than blindly accepting the request + or arbitrarily choking it at an averaged value. Configure this with + "router.tunnelGrowthFactor" in the router.config (default "1.5"). + * Adjust the tunnel test timeouts dynamically - rather than the old + flat 30s (!!!) timeout, we set the timeout to 2x the average tunnel + test time (the deviation factor can be adjusted by setting + "router.tunnelTestDeviation" to "3.0" or whatever). This should help + find the 'good' tunnels. + * Added some crazy debugging to try and track down an intermittent hang. 2004-10-13 jrandom * Fix the probabalistic tunnel reject (we always accepted everything, diff --git a/router/java/src/net/i2p/router/RouterThrottleImpl.java b/router/java/src/net/i2p/router/RouterThrottleImpl.java index a20c8bb46e..7780aa9c28 100644 --- a/router/java/src/net/i2p/router/RouterThrottleImpl.java +++ b/router/java/src/net/i2p/router/RouterThrottleImpl.java @@ -131,6 +131,7 @@ class RouterThrottleImpl implements RouterThrottle { } if (numTunnels > getMinThrottleTunnels()) { + double growthFactor = getTunnelGrowthFactor(); Rate avgTunnels = _context.statManager().getRate("tunnel.participatingTunnels").getRate(60*60*1000); if (avgTunnels != null) { double avg = 0; @@ -138,9 +139,9 @@ class RouterThrottleImpl implements RouterThrottle { avg = avgTunnels.getAverageValue(); else avg = avgTunnels.getLifetimeAverageValue(); - if ( (avg > 0) && (avg < numTunnels) ) { + if ( (avg > 0) && (avg*growthFactor < numTunnels) ) { // we're accelerating, lets try not to take on too much too fast - double probAccept = avg / numTunnels; + double probAccept = (avg*growthFactor) / numTunnels; int v = _context.random().nextInt(100); if (v < probAccept*100) { // ok @@ -171,8 +172,8 @@ class RouterThrottleImpl implements RouterThrottle { else avg60m = tunnelTestTime60m.getLifetimeAverageValue(); - if ( (avg60m > 0) && (avg10m > avg60m) ) { - double probAccept = avg60m/avg10m; + if ( (avg60m > 0) && (avg10m > avg60m * growthFactor) ) { + double probAccept = (avg60m*growthFactor)/avg10m; int v = _context.random().nextInt(100); if (v < probAccept*100) { // ok @@ -225,6 +226,14 @@ class RouterThrottleImpl implements RouterThrottle { } } + private double getTunnelGrowthFactor() { + try { + return Double.parseDouble(_context.getProperty("router.tunnelGrowthFactor", "1.5")); + } catch (NumberFormatException nfe) { + return 1.5; + } + } + public long getMessageDelay() { Rate delayRate = _context.statManager().getRate("transport.sendProcessingTime").getRate(60*1000); return (long)delayRate.getAverageValue(); diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index dd63e65139..ef3b56c0ce 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.52 $ $Date: 2004/10/12 16:29:42 $"; + public final static String ID = "$Revision: 1.53 $ $Date: 2004/10/13 14:40:47 $"; public final static String VERSION = "0.4.1.2"; - public final static long BUILD = 2; + public final static long BUILD = 3; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/message/SendTunnelMessageJob.java b/router/java/src/net/i2p/router/message/SendTunnelMessageJob.java index 2754330450..757db9f3af 100644 --- a/router/java/src/net/i2p/router/message/SendTunnelMessageJob.java +++ b/router/java/src/net/i2p/router/message/SendTunnelMessageJob.java @@ -54,6 +54,7 @@ public class SendTunnelMessageJob extends JobImpl { private long _timeout; private long _expiration; private int _priority; + private int _state; public SendTunnelMessageJob(RouterContext ctx, I2NPMessage msg, TunnelId tunnelId, Job onSend, ReplyJob onReply, Job onFailure, MessageSelector selector, long timeoutMs, int priority) { this(ctx, msg, tunnelId, null, null, onSend, onReply, onFailure, selector, timeoutMs, priority); @@ -61,11 +62,14 @@ public class SendTunnelMessageJob extends JobImpl { public SendTunnelMessageJob(RouterContext ctx, I2NPMessage msg, TunnelId tunnelId, Hash targetRouter, TunnelId targetTunnelId, Job onSend, ReplyJob onReply, Job onFailure, MessageSelector selector, long timeoutMs, int priority) { super(ctx); + _state = 0; _log = ctx.logManager().getLog(SendTunnelMessageJob.class); if (msg == null) throw new IllegalArgumentException("wtf, null message? sod off"); if (tunnelId == null) throw new IllegalArgumentException("wtf, no tunnelId? nuh uh"); + + _state = 1; _message = msg; _destRouter = targetRouter; _tunnelId = tunnelId; @@ -92,9 +96,11 @@ public class SendTunnelMessageJob extends JobImpl { } else { _expiration = getContext().clock().now() + timeoutMs; } + _state = 2; } public void runJob() { + _state = 3; TunnelInfo info = getContext().tunnelManager().getTunnelInfo(_tunnelId); if (info == null) { if (_log.shouldLog(Log.DEBUG)) @@ -108,7 +114,9 @@ public class SendTunnelMessageJob extends JobImpl { getContext().jobQueue().addJob(_onFailure); return; } else { + _state = 4; forwardToGateway(); + _state = 0; return; } } @@ -118,13 +126,19 @@ public class SendTunnelMessageJob extends JobImpl { if (isEndpoint(info)) { if (_log.shouldLog(Log.INFO)) _log.info("Tunnel message where we're both the gateway and the endpoint - honor instructions"); + _state = 5; honorInstructions(info); + _state = 0; return; } else if (isGateway(info)) { + _state = 6; handleAsGateway(info); + _state = 0; return; } else { + _state = 7; handleAsParticipant(info); + _state = 0; return; } } @@ -134,6 +148,7 @@ public class SendTunnelMessageJob extends JobImpl { * */ private void forwardToGateway() { + _state = 8; TunnelMessage msg = new TunnelMessage(getContext()); msg.setData(_message.toByteArray()); msg.setTunnelId(_tunnelId); @@ -148,6 +163,7 @@ public class SendTunnelMessageJob extends JobImpl { String bodyType = _message.getClass().getName(); getContext().messageHistory().wrap(bodyType, _message.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId()); + _state = 9; return; } @@ -157,6 +173,7 @@ public class SendTunnelMessageJob extends JobImpl { * */ private void handleAsGateway(TunnelInfo info) { + _state = 10; // since we are the gateway, we don't need to verify the data structures TunnelInfo us = getUs(info); if (us == null) { @@ -164,7 +181,9 @@ public class SendTunnelMessageJob extends JobImpl { _log.error("We are not participating in this /known/ tunnel - was the router reset?"); if (_onFailure != null) getContext().jobQueue().addJob(_onFailure); + _state = 11; } else { + _state = 12; // we're the gateway, so sign, encrypt, and forward to info.getNextHop() TunnelMessage msg = prepareMessage(info); if (msg == null) { @@ -172,6 +191,7 @@ public class SendTunnelMessageJob extends JobImpl { _log.error("wtf, unable to prepare a tunnel message to the next hop, when we're the gateway and hops remain? tunnel: " + info); if (_onFailure != null) getContext().jobQueue().addJob(_onFailure); + _state = 13; return; } if (_log.shouldLog(Log.DEBUG)) @@ -185,6 +205,7 @@ public class SendTunnelMessageJob extends JobImpl { + (now - _expiration) + "ms ago)"); if (_onFailure != null) getContext().jobQueue().addJob(_onFailure); + _state = 14; return; }else if (_expiration < now + 15*1000) { if (_log.shouldLog(Log.WARN)) @@ -198,6 +219,7 @@ public class SendTunnelMessageJob extends JobImpl { _selector, (int)(_expiration - getContext().clock().now()), _priority)); + _state = 15; } } @@ -207,6 +229,7 @@ public class SendTunnelMessageJob extends JobImpl { * */ private void handleAsParticipant(TunnelInfo info) { + _state = 16; // SendTunnelMessageJob shouldn't be used for participants! if (_log.shouldLog(Log.DEBUG)) _log.debug("SendTunnelMessageJob for a participant... ", getAddedBy()); @@ -216,6 +239,7 @@ public class SendTunnelMessageJob extends JobImpl { _log.error("Cannot inject non-tunnel messages as a participant!" + _message, getAddedBy()); if (_onFailure != null) getContext().jobQueue().addJob(_onFailure); + _state = 17; return; } @@ -227,18 +251,23 @@ public class SendTunnelMessageJob extends JobImpl { _log.error("No verification key for the participant? tunnel: " + info, getAddedBy()); if (_onFailure != null) getContext().jobQueue().addJob(_onFailure); + _state = 18; return; } boolean ok = struct.verifySignature(getContext(), info.getVerificationKey().getKey()); + _state = 19; if (!ok) { if (_log.shouldLog(Log.WARN)) _log.warn("Failed tunnel verification! Spoofing / tagging attack? " + _message, getAddedBy()); if (_onFailure != null) getContext().jobQueue().addJob(_onFailure); + _state = 20; return; } else { + _state = 21; if (info.getNextHop() != null) { + _state = 22; if (_log.shouldLog(Log.INFO)) _log.info("Message for tunnel " + info.getTunnelId().getTunnelId() + " received where we're not the gateway and there are remaining hops, so forward it on to " + info.getNextHop().toBase64() + " via SendMessageDirectJob"); @@ -247,12 +276,15 @@ public class SendTunnelMessageJob extends JobImpl { (int)(_message.getMessageExpiration().getTime() - getContext().clock().now()), _priority); getContext().jobQueue().addJob(j); + _state = 23; return; } else { + _state = 24; if (_log.shouldLog(Log.ERROR)) _log.error("Should not be reached - participant, but no more hops?!"); if (_onFailure != null) getContext().jobQueue().addJob(_onFailure); + _state = 25; return; } } @@ -261,19 +293,23 @@ public class SendTunnelMessageJob extends JobImpl { /** find our place in the tunnel */ private TunnelInfo getUs(TunnelInfo info) { + _state = 26; Hash us = getContext().routerHash(); TunnelInfo lastUs = null; while (info != null) { if (us.equals(info.getThisHop())) lastUs = info; info = info.getNextHopInfo(); + _state = 28; } + _state = 27; return lastUs; } /** are we the endpoint for the tunnel? */ private boolean isEndpoint(TunnelInfo info) { TunnelInfo us = getUs(info); + _state = 29; if (us == null) return false; return (us.getNextHop() == null); } @@ -281,6 +317,7 @@ public class SendTunnelMessageJob extends JobImpl { /** are we the gateway for the tunnel? */ private boolean isGateway(TunnelInfo info) { TunnelInfo us = getUs(info); + _state = 30; if (us == null) return false; return (us.getSigningKey() != null); // only the gateway can sign } @@ -294,6 +331,7 @@ public class SendTunnelMessageJob extends JobImpl { * */ private TunnelMessage prepareMessage(TunnelInfo info) { + _state = 31; TunnelMessage msg = new TunnelMessage(getContext()); SessionKey key = getContext().keyGenerator().generateSessionKey(); @@ -307,6 +345,7 @@ public class SendTunnelMessageJob extends JobImpl { // but if we are, have the endpoint forward it appropriately. // note that this algorithm does not currently support instructing the endpoint to send to a Destination if (_destRouter != null) { + _state = 32; instructions.setRouter(_destRouter); if (_targetTunnelId != null) { if (_log.shouldLog(Log.DEBUG)) @@ -320,6 +359,7 @@ public class SendTunnelMessageJob extends JobImpl { instructions.setDeliveryMode(DeliveryInstructions.DELIVERY_MODE_ROUTER); } } else { + _state = 33; if (_message instanceof DataMessage) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Instructions are for local message delivery at the endpoint with a DataMessage to be sent to a Destination"); @@ -334,19 +374,26 @@ public class SendTunnelMessageJob extends JobImpl { if (info == null) { if (_log.shouldLog(Log.WARN)) _log.warn("Tunnel info is null to send message " + _message); + _state = 34; return null; } else if ( (info.getEncryptionKey() == null) || (info.getEncryptionKey().getKey() == null) ) { if (_log.shouldLog(Log.WARN)) _log.warn("Tunnel encryption key is null when we're the gateway?! info: " + info); + _state = 35; return null; } + _state = 36; byte encryptedInstructions[] = encrypt(instructions, info.getEncryptionKey().getKey(), INSTRUCTIONS_PADDING); byte encryptedMessage[] = encrypt(_message, key, PAYLOAD_PADDING); + _state = 37; TunnelVerificationStructure verification = createVerificationStructure(encryptedMessage, info); + _state = 38; + String bodyType = _message.getClass().getName(); getContext().messageHistory().wrap(bodyType, _message.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId()); + _state = 39; if (_log.shouldLog(Log.DEBUG)) _log.debug("Tunnel message prepared: instructions = " + instructions); @@ -355,6 +402,7 @@ public class SendTunnelMessageJob extends JobImpl { msg.setEncryptedDeliveryInstructions(encryptedInstructions); msg.setTunnelId(_tunnelId); msg.setVerificationStructure(verification); + _state = 40; return msg; } @@ -363,9 +411,12 @@ public class SendTunnelMessageJob extends JobImpl { * */ private TunnelVerificationStructure createVerificationStructure(byte encryptedMessage[], TunnelInfo info) { + _state = 41; TunnelVerificationStructure struct = new TunnelVerificationStructure(); struct.setMessageHash(getContext().sha().calculateHash(encryptedMessage)); struct.sign(getContext(), info.getSigningKey().getKey()); + + _state = 42; return struct; } @@ -375,6 +426,7 @@ public class SendTunnelMessageJob extends JobImpl { * @param paddedSize minimum size to pad to */ private byte[] encrypt(DataStructure struct, SessionKey key, int paddedSize) { + _state = 43; try { ByteArrayOutputStream baos = new ByteArrayOutputStream(paddedSize); byte data[] = struct.toByteArray(); @@ -383,11 +435,13 @@ public class SendTunnelMessageJob extends JobImpl { byte iv[] = new byte[16]; Hash h = getContext().sha().calculateHash(key.getData()); System.arraycopy(h.getData(), 0, iv, 0, iv.length); + _state = 44; return getContext().aes().safeEncrypt(baos.toByteArray(), key, iv, paddedSize); } catch (IOException ioe) { if (_log.shouldLog(Log.ERROR)) _log.error("Error writing out data to encrypt", ioe); } + _state = 45; return null; } @@ -398,6 +452,7 @@ public class SendTunnelMessageJob extends JobImpl { * */ private void honorInstructions(TunnelInfo info) { + _state = 46; if (_selector != null) createFakeOutNetMessage(); @@ -405,6 +460,7 @@ public class SendTunnelMessageJob extends JobImpl { if (_log.shouldLog(Log.DEBUG)) _log.debug("Firing onSend as we're honoring the instructions"); getContext().jobQueue().addJob(_onSend); + _state = 47; } // since we are the gateway, we don't need to decrypt the delivery instructions or the payload @@ -412,9 +468,13 @@ public class SendTunnelMessageJob extends JobImpl { RouterIdentity ident = getContext().router().getRouterInfo().getIdentity(); if (_destRouter != null) { + _state = 48; honorSendRemote(info, ident); + _state = 49; } else { + _state = 50; honorSendLocal(info, ident); + _state = 51; } } @@ -424,6 +484,7 @@ public class SendTunnelMessageJob extends JobImpl { * */ private void honorSendRemote(TunnelInfo info, RouterIdentity ident) { + _state = 52; I2NPMessage msg = null; if (_targetTunnelId != null) { if (_log.shouldLog(Log.DEBUG)) @@ -438,7 +499,9 @@ public class SendTunnelMessageJob extends JobImpl { byte data[] = _message.toByteArray(); tmsg.setData(data); msg = tmsg; + _state = 53; } else { + _state = 54; if (_log.shouldLog(Log.DEBUG)) _log.debug("Forward " + _message.getClass().getName() + " message off to remote router " + _destRouter.toBase64()); @@ -464,15 +527,18 @@ public class SendTunnelMessageJob extends JobImpl { } } + _state = 55; String bodyType = _message.getClass().getName(); getContext().messageHistory().wrap(bodyType, _message.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId()); + _state = 56; // don't specify a selector, since createFakeOutNetMessage already does that SendMessageDirectJob j = new SendMessageDirectJob(getContext(), msg, _destRouter, _onSend, _onReply, _onFailure, null, (int)(timeLeft), _priority); + _state = 57; getContext().jobQueue().addJob(j); } @@ -483,16 +549,20 @@ public class SendTunnelMessageJob extends JobImpl { * */ private void honorSendLocal(TunnelInfo info, RouterIdentity ident) { + _state = 59; if ( (info.getDestination() == null) || !(_message instanceof DataMessage) ) { // its a network message targeting us... if (_log.shouldLog(Log.DEBUG)) _log.debug("Destination is null or its not a DataMessage - pass it off to the InNetMessagePool"); + _state = 59; InNetMessage msg = new InNetMessage(getContext()); msg.setFromRouter(ident); msg.setFromRouterHash(ident.getHash()); msg.setMessage(_message); getContext().inNetMessagePool().add(msg); + _state = 60; } else { + _state = 61; if (_log.shouldLog(Log.DEBUG)) _log.debug("Destination is not null and it is a DataMessage - pop it into the ClientMessagePool"); DataMessage msg = (DataMessage)_message; @@ -502,6 +572,7 @@ public class SendTunnelMessageJob extends JobImpl { _log.warn("Duplicate data message received [" + msg.getUniqueId() + " expiring on " + msg.getMessageExpiration() + "]"); getContext().messageHistory().droppedOtherMessage(msg); getContext().messageHistory().messageProcessingError(msg.getUniqueId(), msg.getClass().getName(), "Duplicate"); + _state = 62; return; } @@ -518,10 +589,12 @@ public class SendTunnelMessageJob extends JobImpl { clientMessage.setReceptionInfo(receptionInfo); getContext().clientMessagePool().add(clientMessage); getContext().messageHistory().receivePayloadMessage(msg.getUniqueId()); + _state = 63; } } private void createFakeOutNetMessage() { + _state = 64; // now we create a fake outNetMessage to go onto the registry so we can select if (_log.shouldLog(Log.DEBUG)) _log.debug("Registering a fake outNetMessage for the message tunneled locally since we have a selector"); @@ -538,7 +611,8 @@ public class SendTunnelMessageJob extends JobImpl { getContext().messageRegistry().registerPending(outM); // we dont really need the data outM.discardData(); + _state = 65; } - public String getName() { return "Send Tunnel Message"; } + public String getName() { return "Send Tunnel Message" + (_state == 0 ? "" : ""+_state); } } diff --git a/router/java/src/net/i2p/router/tunnelmanager/TestTunnelJob.java b/router/java/src/net/i2p/router/tunnelmanager/TestTunnelJob.java index 74ec14e7ad..7b620d3529 100644 --- a/router/java/src/net/i2p/router/tunnelmanager/TestTunnelJob.java +++ b/router/java/src/net/i2p/router/tunnelmanager/TestTunnelJob.java @@ -22,6 +22,8 @@ import net.i2p.router.RouterContext; import net.i2p.router.TunnelInfo; import net.i2p.router.TunnelSelectionCriteria; import net.i2p.router.message.SendTunnelMessageJob; +import net.i2p.stat.RateStat; +import net.i2p.stat.Rate; import net.i2p.util.Log; class TestTunnelJob extends JobImpl { @@ -71,9 +73,43 @@ class TestTunnelJob extends JobImpl { return false; } - private final static long TEST_TIMEOUT = 30*1000; // 30 seconds for a test to succeed + private final static long DEFAULT_TEST_TIMEOUT = 10*1000; // 10 seconds for a test to succeed + private final static long MINIMUM_TEST_TIMEOUT = 1*1000; // 1 second min private final static int TEST_PRIORITY = 100; + /** + * how long should we let tunnel tests go on for? + */ + private long getTunnelTestTimeout() { + long rv = DEFAULT_TEST_TIMEOUT; + RateStat rs = getContext().statManager().getRate("tunnel.testSuccessTime"); + if (rs != null) { + Rate r = rs.getRate(10*60*1000); + if (r != null) { + if (r.getLifetimeEventCount() > 0) { + if (r.getLastEventCount() <= 0) + rv = (long)(r.getLifetimeAverageValue() * getTunnelTestDeviationLimit()); + else + rv = (long)(r.getAverageValue() * getTunnelTestDeviationLimit()); + } + } + } + if (rv < MINIMUM_TEST_TIMEOUT) + rv = MINIMUM_TEST_TIMEOUT; + return rv; + } + + /** + * How much greater than the current average tunnel test time should we accept? + */ + private double getTunnelTestDeviationLimit() { + try { + return Double.parseDouble(getContext().getProperty("router.tunnelTestDeviation", "2.0")); + } catch (NumberFormatException nfe) { + return 2.0; + } + } + /** * Send a message out the tunnel with instructions to send the message back * to ourselves and wait for it to arrive. @@ -96,7 +132,7 @@ class TestTunnelJob extends JobImpl { TestFailedJob failureJob = new TestFailedJob(); MessageSelector selector = new TestMessageSelector(msg.getMessageId(), info.getTunnelId().getTunnelId()); - SendTunnelMessageJob testJob = new SendTunnelMessageJob(getContext(), msg, info.getTunnelId(), us, _secondaryId, null, new TestSuccessfulJob(), failureJob, selector, TEST_TIMEOUT, TEST_PRIORITY); + SendTunnelMessageJob testJob = new SendTunnelMessageJob(getContext(), msg, info.getTunnelId(), us, _secondaryId, null, new TestSuccessfulJob(), failureJob, selector, getTunnelTestTimeout(), TEST_PRIORITY); getContext().jobQueue().addJob(testJob); } @@ -121,7 +157,7 @@ class TestTunnelJob extends JobImpl { TestFailedJob failureJob = new TestFailedJob(); MessageSelector selector = new TestMessageSelector(msg.getMessageId(), info.getTunnelId().getTunnelId()); - SendTunnelMessageJob j = new SendTunnelMessageJob(getContext(), msg, _secondaryId, info.getThisHop(), info.getTunnelId(), null, new TestSuccessfulJob(), failureJob, selector, TEST_TIMEOUT, TEST_PRIORITY); + SendTunnelMessageJob j = new SendTunnelMessageJob(getContext(), msg, _secondaryId, info.getThisHop(), info.getTunnelId(), null, new TestSuccessfulJob(), failureJob, selector, getTunnelTestTimeout(), TEST_PRIORITY); getContext().jobQueue().addJob(j); } @@ -216,6 +252,11 @@ class TestTunnelJob extends JobImpl { if (_log.shouldLog(Log.INFO)) _log.info("Test of tunnel " + _primaryId+ " successfull after " + time + "ms waiting for " + _nonce); + + if (time > getTunnelTestTimeout()) { + return; // the test failed job should already have run + } + TunnelInfo info = _pool.getTunnelInfo(_primaryId); if (info != null) { TestTunnelJob.this.getContext().messageHistory().tunnelValid(info, time); @@ -254,7 +295,7 @@ class TestTunnelJob extends JobImpl { _id = id; _tunnelId = tunnelId; _found = false; - _expiration = getContext().clock().now() + TEST_TIMEOUT; + _expiration = getContext().clock().now() + getTunnelTestTimeout(); if (_log.shouldLog(Log.DEBUG)) _log.debug("the expiration while testing tunnel " + tunnelId + " waiting for nonce " + id + ": " + new Date(_expiration));