2004-10-14 jrandom
* Allow for a configurable tunnel "growth factor", rather than trying to achieve a steady state. This will let us grow gradually when the router is needed more, rather than blindly accepting the request or arbitrarily choking it at an averaged value. Configure this with "router.tunnelGrowthFactor" in the router.config (default "1.5"). * Adjust the tunnel test timeouts dynamically - rather than the old flat 30s (!!!) timeout, we set the timeout to 2x the average tunnel test time (the deviation factor can be adjusted by setting "router.tunnelTestDeviation" to "3.0" or whatever). This should help find the 'good' tunnels. * Added some crazy debugging to try and track down an intermittent hang.
This commit is contained in:
15
history.txt
15
history.txt
@ -1,4 +1,17 @@
|
||||
$Id: history.txt,v 1.45 2004/10/12 16:29:42 jrandom Exp $
|
||||
$Id: history.txt,v 1.46 2004/10/13 14:40:47 jrandom Exp $
|
||||
|
||||
2004-10-14 jrandom
|
||||
* Allow for a configurable tunnel "growth factor", rather than trying
|
||||
to achieve a steady state. This will let us grow gradually when
|
||||
the router is needed more, rather than blindly accepting the request
|
||||
or arbitrarily choking it at an averaged value. Configure this with
|
||||
"router.tunnelGrowthFactor" in the router.config (default "1.5").
|
||||
* Adjust the tunnel test timeouts dynamically - rather than the old
|
||||
flat 30s (!!!) timeout, we set the timeout to 2x the average tunnel
|
||||
test time (the deviation factor can be adjusted by setting
|
||||
"router.tunnelTestDeviation" to "3.0" or whatever). This should help
|
||||
find the 'good' tunnels.
|
||||
* Added some crazy debugging to try and track down an intermittent hang.
|
||||
|
||||
2004-10-13 jrandom
|
||||
* Fix the probabalistic tunnel reject (we always accepted everything,
|
||||
|
@ -131,6 +131,7 @@ class RouterThrottleImpl implements RouterThrottle {
|
||||
}
|
||||
|
||||
if (numTunnels > getMinThrottleTunnels()) {
|
||||
double growthFactor = getTunnelGrowthFactor();
|
||||
Rate avgTunnels = _context.statManager().getRate("tunnel.participatingTunnels").getRate(60*60*1000);
|
||||
if (avgTunnels != null) {
|
||||
double avg = 0;
|
||||
@ -138,9 +139,9 @@ class RouterThrottleImpl implements RouterThrottle {
|
||||
avg = avgTunnels.getAverageValue();
|
||||
else
|
||||
avg = avgTunnels.getLifetimeAverageValue();
|
||||
if ( (avg > 0) && (avg < numTunnels) ) {
|
||||
if ( (avg > 0) && (avg*growthFactor < numTunnels) ) {
|
||||
// we're accelerating, lets try not to take on too much too fast
|
||||
double probAccept = avg / numTunnels;
|
||||
double probAccept = (avg*growthFactor) / numTunnels;
|
||||
int v = _context.random().nextInt(100);
|
||||
if (v < probAccept*100) {
|
||||
// ok
|
||||
@ -171,8 +172,8 @@ class RouterThrottleImpl implements RouterThrottle {
|
||||
else
|
||||
avg60m = tunnelTestTime60m.getLifetimeAverageValue();
|
||||
|
||||
if ( (avg60m > 0) && (avg10m > avg60m) ) {
|
||||
double probAccept = avg60m/avg10m;
|
||||
if ( (avg60m > 0) && (avg10m > avg60m * growthFactor) ) {
|
||||
double probAccept = (avg60m*growthFactor)/avg10m;
|
||||
int v = _context.random().nextInt(100);
|
||||
if (v < probAccept*100) {
|
||||
// ok
|
||||
@ -225,6 +226,14 @@ class RouterThrottleImpl implements RouterThrottle {
|
||||
}
|
||||
}
|
||||
|
||||
private double getTunnelGrowthFactor() {
|
||||
try {
|
||||
return Double.parseDouble(_context.getProperty("router.tunnelGrowthFactor", "1.5"));
|
||||
} catch (NumberFormatException nfe) {
|
||||
return 1.5;
|
||||
}
|
||||
}
|
||||
|
||||
public long getMessageDelay() {
|
||||
Rate delayRate = _context.statManager().getRate("transport.sendProcessingTime").getRate(60*1000);
|
||||
return (long)delayRate.getAverageValue();
|
||||
|
@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
|
||||
*
|
||||
*/
|
||||
public class RouterVersion {
|
||||
public final static String ID = "$Revision: 1.52 $ $Date: 2004/10/12 16:29:42 $";
|
||||
public final static String ID = "$Revision: 1.53 $ $Date: 2004/10/13 14:40:47 $";
|
||||
public final static String VERSION = "0.4.1.2";
|
||||
public final static long BUILD = 2;
|
||||
public final static long BUILD = 3;
|
||||
public static void main(String args[]) {
|
||||
System.out.println("I2P Router version: " + VERSION);
|
||||
System.out.println("Router ID: " + RouterVersion.ID);
|
||||
|
@ -54,6 +54,7 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
private long _timeout;
|
||||
private long _expiration;
|
||||
private int _priority;
|
||||
private int _state;
|
||||
|
||||
public SendTunnelMessageJob(RouterContext ctx, I2NPMessage msg, TunnelId tunnelId, Job onSend, ReplyJob onReply, Job onFailure, MessageSelector selector, long timeoutMs, int priority) {
|
||||
this(ctx, msg, tunnelId, null, null, onSend, onReply, onFailure, selector, timeoutMs, priority);
|
||||
@ -61,11 +62,14 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
|
||||
public SendTunnelMessageJob(RouterContext ctx, I2NPMessage msg, TunnelId tunnelId, Hash targetRouter, TunnelId targetTunnelId, Job onSend, ReplyJob onReply, Job onFailure, MessageSelector selector, long timeoutMs, int priority) {
|
||||
super(ctx);
|
||||
_state = 0;
|
||||
_log = ctx.logManager().getLog(SendTunnelMessageJob.class);
|
||||
if (msg == null)
|
||||
throw new IllegalArgumentException("wtf, null message? sod off");
|
||||
if (tunnelId == null)
|
||||
throw new IllegalArgumentException("wtf, no tunnelId? nuh uh");
|
||||
|
||||
_state = 1;
|
||||
_message = msg;
|
||||
_destRouter = targetRouter;
|
||||
_tunnelId = tunnelId;
|
||||
@ -92,9 +96,11 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
} else {
|
||||
_expiration = getContext().clock().now() + timeoutMs;
|
||||
}
|
||||
_state = 2;
|
||||
}
|
||||
|
||||
public void runJob() {
|
||||
_state = 3;
|
||||
TunnelInfo info = getContext().tunnelManager().getTunnelInfo(_tunnelId);
|
||||
if (info == null) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
@ -108,7 +114,9 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
getContext().jobQueue().addJob(_onFailure);
|
||||
return;
|
||||
} else {
|
||||
_state = 4;
|
||||
forwardToGateway();
|
||||
_state = 0;
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -118,13 +126,19 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
if (isEndpoint(info)) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Tunnel message where we're both the gateway and the endpoint - honor instructions");
|
||||
_state = 5;
|
||||
honorInstructions(info);
|
||||
_state = 0;
|
||||
return;
|
||||
} else if (isGateway(info)) {
|
||||
_state = 6;
|
||||
handleAsGateway(info);
|
||||
_state = 0;
|
||||
return;
|
||||
} else {
|
||||
_state = 7;
|
||||
handleAsParticipant(info);
|
||||
_state = 0;
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -134,6 +148,7 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
*
|
||||
*/
|
||||
private void forwardToGateway() {
|
||||
_state = 8;
|
||||
TunnelMessage msg = new TunnelMessage(getContext());
|
||||
msg.setData(_message.toByteArray());
|
||||
msg.setTunnelId(_tunnelId);
|
||||
@ -148,6 +163,7 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
String bodyType = _message.getClass().getName();
|
||||
getContext().messageHistory().wrap(bodyType, _message.getUniqueId(),
|
||||
TunnelMessage.class.getName(), msg.getUniqueId());
|
||||
_state = 9;
|
||||
return;
|
||||
}
|
||||
|
||||
@ -157,6 +173,7 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
*
|
||||
*/
|
||||
private void handleAsGateway(TunnelInfo info) {
|
||||
_state = 10;
|
||||
// since we are the gateway, we don't need to verify the data structures
|
||||
TunnelInfo us = getUs(info);
|
||||
if (us == null) {
|
||||
@ -164,7 +181,9 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
_log.error("We are not participating in this /known/ tunnel - was the router reset?");
|
||||
if (_onFailure != null)
|
||||
getContext().jobQueue().addJob(_onFailure);
|
||||
_state = 11;
|
||||
} else {
|
||||
_state = 12;
|
||||
// we're the gateway, so sign, encrypt, and forward to info.getNextHop()
|
||||
TunnelMessage msg = prepareMessage(info);
|
||||
if (msg == null) {
|
||||
@ -172,6 +191,7 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
_log.error("wtf, unable to prepare a tunnel message to the next hop, when we're the gateway and hops remain? tunnel: " + info);
|
||||
if (_onFailure != null)
|
||||
getContext().jobQueue().addJob(_onFailure);
|
||||
_state = 13;
|
||||
return;
|
||||
}
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
@ -185,6 +205,7 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
+ (now - _expiration) + "ms ago)");
|
||||
if (_onFailure != null)
|
||||
getContext().jobQueue().addJob(_onFailure);
|
||||
_state = 14;
|
||||
return;
|
||||
}else if (_expiration < now + 15*1000) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
@ -198,6 +219,7 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
_selector,
|
||||
(int)(_expiration - getContext().clock().now()),
|
||||
_priority));
|
||||
_state = 15;
|
||||
}
|
||||
}
|
||||
|
||||
@ -207,6 +229,7 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
*
|
||||
*/
|
||||
private void handleAsParticipant(TunnelInfo info) {
|
||||
_state = 16;
|
||||
// SendTunnelMessageJob shouldn't be used for participants!
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("SendTunnelMessageJob for a participant... ", getAddedBy());
|
||||
@ -216,6 +239,7 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
_log.error("Cannot inject non-tunnel messages as a participant!" + _message, getAddedBy());
|
||||
if (_onFailure != null)
|
||||
getContext().jobQueue().addJob(_onFailure);
|
||||
_state = 17;
|
||||
return;
|
||||
}
|
||||
|
||||
@ -227,18 +251,23 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
_log.error("No verification key for the participant? tunnel: " + info, getAddedBy());
|
||||
if (_onFailure != null)
|
||||
getContext().jobQueue().addJob(_onFailure);
|
||||
_state = 18;
|
||||
return;
|
||||
}
|
||||
|
||||
boolean ok = struct.verifySignature(getContext(), info.getVerificationKey().getKey());
|
||||
_state = 19;
|
||||
if (!ok) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Failed tunnel verification! Spoofing / tagging attack? " + _message, getAddedBy());
|
||||
if (_onFailure != null)
|
||||
getContext().jobQueue().addJob(_onFailure);
|
||||
_state = 20;
|
||||
return;
|
||||
} else {
|
||||
_state = 21;
|
||||
if (info.getNextHop() != null) {
|
||||
_state = 22;
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Message for tunnel " + info.getTunnelId().getTunnelId() + " received where we're not the gateway and there are remaining hops, so forward it on to "
|
||||
+ info.getNextHop().toBase64() + " via SendMessageDirectJob");
|
||||
@ -247,12 +276,15 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
(int)(_message.getMessageExpiration().getTime() - getContext().clock().now()),
|
||||
_priority);
|
||||
getContext().jobQueue().addJob(j);
|
||||
_state = 23;
|
||||
return;
|
||||
} else {
|
||||
_state = 24;
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Should not be reached - participant, but no more hops?!");
|
||||
if (_onFailure != null)
|
||||
getContext().jobQueue().addJob(_onFailure);
|
||||
_state = 25;
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -261,19 +293,23 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
|
||||
/** find our place in the tunnel */
|
||||
private TunnelInfo getUs(TunnelInfo info) {
|
||||
_state = 26;
|
||||
Hash us = getContext().routerHash();
|
||||
TunnelInfo lastUs = null;
|
||||
while (info != null) {
|
||||
if (us.equals(info.getThisHop()))
|
||||
lastUs = info;
|
||||
info = info.getNextHopInfo();
|
||||
_state = 28;
|
||||
}
|
||||
_state = 27;
|
||||
return lastUs;
|
||||
}
|
||||
|
||||
/** are we the endpoint for the tunnel? */
|
||||
private boolean isEndpoint(TunnelInfo info) {
|
||||
TunnelInfo us = getUs(info);
|
||||
_state = 29;
|
||||
if (us == null) return false;
|
||||
return (us.getNextHop() == null);
|
||||
}
|
||||
@ -281,6 +317,7 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
/** are we the gateway for the tunnel? */
|
||||
private boolean isGateway(TunnelInfo info) {
|
||||
TunnelInfo us = getUs(info);
|
||||
_state = 30;
|
||||
if (us == null) return false;
|
||||
return (us.getSigningKey() != null); // only the gateway can sign
|
||||
}
|
||||
@ -294,6 +331,7 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
*
|
||||
*/
|
||||
private TunnelMessage prepareMessage(TunnelInfo info) {
|
||||
_state = 31;
|
||||
TunnelMessage msg = new TunnelMessage(getContext());
|
||||
|
||||
SessionKey key = getContext().keyGenerator().generateSessionKey();
|
||||
@ -307,6 +345,7 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
// but if we are, have the endpoint forward it appropriately.
|
||||
// note that this algorithm does not currently support instructing the endpoint to send to a Destination
|
||||
if (_destRouter != null) {
|
||||
_state = 32;
|
||||
instructions.setRouter(_destRouter);
|
||||
if (_targetTunnelId != null) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
@ -320,6 +359,7 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
instructions.setDeliveryMode(DeliveryInstructions.DELIVERY_MODE_ROUTER);
|
||||
}
|
||||
} else {
|
||||
_state = 33;
|
||||
if (_message instanceof DataMessage) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Instructions are for local message delivery at the endpoint with a DataMessage to be sent to a Destination");
|
||||
@ -334,19 +374,26 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
if (info == null) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Tunnel info is null to send message " + _message);
|
||||
_state = 34;
|
||||
return null;
|
||||
} else if ( (info.getEncryptionKey() == null) || (info.getEncryptionKey().getKey() == null) ) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Tunnel encryption key is null when we're the gateway?! info: " + info);
|
||||
_state = 35;
|
||||
return null;
|
||||
}
|
||||
|
||||
_state = 36;
|
||||
byte encryptedInstructions[] = encrypt(instructions, info.getEncryptionKey().getKey(), INSTRUCTIONS_PADDING);
|
||||
byte encryptedMessage[] = encrypt(_message, key, PAYLOAD_PADDING);
|
||||
_state = 37;
|
||||
TunnelVerificationStructure verification = createVerificationStructure(encryptedMessage, info);
|
||||
|
||||
_state = 38;
|
||||
|
||||
String bodyType = _message.getClass().getName();
|
||||
getContext().messageHistory().wrap(bodyType, _message.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId());
|
||||
_state = 39;
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Tunnel message prepared: instructions = " + instructions);
|
||||
@ -355,6 +402,7 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
msg.setEncryptedDeliveryInstructions(encryptedInstructions);
|
||||
msg.setTunnelId(_tunnelId);
|
||||
msg.setVerificationStructure(verification);
|
||||
_state = 40;
|
||||
return msg;
|
||||
}
|
||||
|
||||
@ -363,9 +411,12 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
*
|
||||
*/
|
||||
private TunnelVerificationStructure createVerificationStructure(byte encryptedMessage[], TunnelInfo info) {
|
||||
_state = 41;
|
||||
TunnelVerificationStructure struct = new TunnelVerificationStructure();
|
||||
struct.setMessageHash(getContext().sha().calculateHash(encryptedMessage));
|
||||
struct.sign(getContext(), info.getSigningKey().getKey());
|
||||
|
||||
_state = 42;
|
||||
return struct;
|
||||
}
|
||||
|
||||
@ -375,6 +426,7 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
* @param paddedSize minimum size to pad to
|
||||
*/
|
||||
private byte[] encrypt(DataStructure struct, SessionKey key, int paddedSize) {
|
||||
_state = 43;
|
||||
try {
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream(paddedSize);
|
||||
byte data[] = struct.toByteArray();
|
||||
@ -383,11 +435,13 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
byte iv[] = new byte[16];
|
||||
Hash h = getContext().sha().calculateHash(key.getData());
|
||||
System.arraycopy(h.getData(), 0, iv, 0, iv.length);
|
||||
_state = 44;
|
||||
return getContext().aes().safeEncrypt(baos.toByteArray(), key, iv, paddedSize);
|
||||
} catch (IOException ioe) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Error writing out data to encrypt", ioe);
|
||||
}
|
||||
_state = 45;
|
||||
return null;
|
||||
}
|
||||
|
||||
@ -398,6 +452,7 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
*
|
||||
*/
|
||||
private void honorInstructions(TunnelInfo info) {
|
||||
_state = 46;
|
||||
if (_selector != null)
|
||||
createFakeOutNetMessage();
|
||||
|
||||
@ -405,6 +460,7 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Firing onSend as we're honoring the instructions");
|
||||
getContext().jobQueue().addJob(_onSend);
|
||||
_state = 47;
|
||||
}
|
||||
|
||||
// since we are the gateway, we don't need to decrypt the delivery instructions or the payload
|
||||
@ -412,9 +468,13 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
RouterIdentity ident = getContext().router().getRouterInfo().getIdentity();
|
||||
|
||||
if (_destRouter != null) {
|
||||
_state = 48;
|
||||
honorSendRemote(info, ident);
|
||||
_state = 49;
|
||||
} else {
|
||||
_state = 50;
|
||||
honorSendLocal(info, ident);
|
||||
_state = 51;
|
||||
}
|
||||
}
|
||||
|
||||
@ -424,6 +484,7 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
*
|
||||
*/
|
||||
private void honorSendRemote(TunnelInfo info, RouterIdentity ident) {
|
||||
_state = 52;
|
||||
I2NPMessage msg = null;
|
||||
if (_targetTunnelId != null) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
@ -438,7 +499,9 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
byte data[] = _message.toByteArray();
|
||||
tmsg.setData(data);
|
||||
msg = tmsg;
|
||||
_state = 53;
|
||||
} else {
|
||||
_state = 54;
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Forward " + _message.getClass().getName()
|
||||
+ " message off to remote router " + _destRouter.toBase64());
|
||||
@ -464,15 +527,18 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
}
|
||||
}
|
||||
|
||||
_state = 55;
|
||||
String bodyType = _message.getClass().getName();
|
||||
getContext().messageHistory().wrap(bodyType, _message.getUniqueId(),
|
||||
TunnelMessage.class.getName(), msg.getUniqueId());
|
||||
_state = 56;
|
||||
|
||||
// don't specify a selector, since createFakeOutNetMessage already does that
|
||||
SendMessageDirectJob j = new SendMessageDirectJob(getContext(), msg, _destRouter,
|
||||
_onSend, _onReply, _onFailure,
|
||||
null, (int)(timeLeft),
|
||||
_priority);
|
||||
_state = 57;
|
||||
getContext().jobQueue().addJob(j);
|
||||
}
|
||||
|
||||
@ -483,16 +549,20 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
*
|
||||
*/
|
||||
private void honorSendLocal(TunnelInfo info, RouterIdentity ident) {
|
||||
_state = 59;
|
||||
if ( (info.getDestination() == null) || !(_message instanceof DataMessage) ) {
|
||||
// its a network message targeting us...
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Destination is null or its not a DataMessage - pass it off to the InNetMessagePool");
|
||||
_state = 59;
|
||||
InNetMessage msg = new InNetMessage(getContext());
|
||||
msg.setFromRouter(ident);
|
||||
msg.setFromRouterHash(ident.getHash());
|
||||
msg.setMessage(_message);
|
||||
getContext().inNetMessagePool().add(msg);
|
||||
_state = 60;
|
||||
} else {
|
||||
_state = 61;
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Destination is not null and it is a DataMessage - pop it into the ClientMessagePool");
|
||||
DataMessage msg = (DataMessage)_message;
|
||||
@ -502,6 +572,7 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
_log.warn("Duplicate data message received [" + msg.getUniqueId() + " expiring on " + msg.getMessageExpiration() + "]");
|
||||
getContext().messageHistory().droppedOtherMessage(msg);
|
||||
getContext().messageHistory().messageProcessingError(msg.getUniqueId(), msg.getClass().getName(), "Duplicate");
|
||||
_state = 62;
|
||||
return;
|
||||
}
|
||||
|
||||
@ -518,10 +589,12 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
clientMessage.setReceptionInfo(receptionInfo);
|
||||
getContext().clientMessagePool().add(clientMessage);
|
||||
getContext().messageHistory().receivePayloadMessage(msg.getUniqueId());
|
||||
_state = 63;
|
||||
}
|
||||
}
|
||||
|
||||
private void createFakeOutNetMessage() {
|
||||
_state = 64;
|
||||
// now we create a fake outNetMessage to go onto the registry so we can select
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Registering a fake outNetMessage for the message tunneled locally since we have a selector");
|
||||
@ -538,7 +611,8 @@ public class SendTunnelMessageJob extends JobImpl {
|
||||
getContext().messageRegistry().registerPending(outM);
|
||||
// we dont really need the data
|
||||
outM.discardData();
|
||||
_state = 65;
|
||||
}
|
||||
|
||||
public String getName() { return "Send Tunnel Message"; }
|
||||
public String getName() { return "Send Tunnel Message" + (_state == 0 ? "" : ""+_state); }
|
||||
}
|
||||
|
@ -22,6 +22,8 @@ import net.i2p.router.RouterContext;
|
||||
import net.i2p.router.TunnelInfo;
|
||||
import net.i2p.router.TunnelSelectionCriteria;
|
||||
import net.i2p.router.message.SendTunnelMessageJob;
|
||||
import net.i2p.stat.RateStat;
|
||||
import net.i2p.stat.Rate;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
class TestTunnelJob extends JobImpl {
|
||||
@ -71,9 +73,43 @@ class TestTunnelJob extends JobImpl {
|
||||
return false;
|
||||
}
|
||||
|
||||
private final static long TEST_TIMEOUT = 30*1000; // 30 seconds for a test to succeed
|
||||
private final static long DEFAULT_TEST_TIMEOUT = 10*1000; // 10 seconds for a test to succeed
|
||||
private final static long MINIMUM_TEST_TIMEOUT = 1*1000; // 1 second min
|
||||
private final static int TEST_PRIORITY = 100;
|
||||
|
||||
/**
|
||||
* how long should we let tunnel tests go on for?
|
||||
*/
|
||||
private long getTunnelTestTimeout() {
|
||||
long rv = DEFAULT_TEST_TIMEOUT;
|
||||
RateStat rs = getContext().statManager().getRate("tunnel.testSuccessTime");
|
||||
if (rs != null) {
|
||||
Rate r = rs.getRate(10*60*1000);
|
||||
if (r != null) {
|
||||
if (r.getLifetimeEventCount() > 0) {
|
||||
if (r.getLastEventCount() <= 0)
|
||||
rv = (long)(r.getLifetimeAverageValue() * getTunnelTestDeviationLimit());
|
||||
else
|
||||
rv = (long)(r.getAverageValue() * getTunnelTestDeviationLimit());
|
||||
}
|
||||
}
|
||||
}
|
||||
if (rv < MINIMUM_TEST_TIMEOUT)
|
||||
rv = MINIMUM_TEST_TIMEOUT;
|
||||
return rv;
|
||||
}
|
||||
|
||||
/**
|
||||
* How much greater than the current average tunnel test time should we accept?
|
||||
*/
|
||||
private double getTunnelTestDeviationLimit() {
|
||||
try {
|
||||
return Double.parseDouble(getContext().getProperty("router.tunnelTestDeviation", "2.0"));
|
||||
} catch (NumberFormatException nfe) {
|
||||
return 2.0;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a message out the tunnel with instructions to send the message back
|
||||
* to ourselves and wait for it to arrive.
|
||||
@ -96,7 +132,7 @@ class TestTunnelJob extends JobImpl {
|
||||
|
||||
TestFailedJob failureJob = new TestFailedJob();
|
||||
MessageSelector selector = new TestMessageSelector(msg.getMessageId(), info.getTunnelId().getTunnelId());
|
||||
SendTunnelMessageJob testJob = new SendTunnelMessageJob(getContext(), msg, info.getTunnelId(), us, _secondaryId, null, new TestSuccessfulJob(), failureJob, selector, TEST_TIMEOUT, TEST_PRIORITY);
|
||||
SendTunnelMessageJob testJob = new SendTunnelMessageJob(getContext(), msg, info.getTunnelId(), us, _secondaryId, null, new TestSuccessfulJob(), failureJob, selector, getTunnelTestTimeout(), TEST_PRIORITY);
|
||||
getContext().jobQueue().addJob(testJob);
|
||||
}
|
||||
|
||||
@ -121,7 +157,7 @@ class TestTunnelJob extends JobImpl {
|
||||
|
||||
TestFailedJob failureJob = new TestFailedJob();
|
||||
MessageSelector selector = new TestMessageSelector(msg.getMessageId(), info.getTunnelId().getTunnelId());
|
||||
SendTunnelMessageJob j = new SendTunnelMessageJob(getContext(), msg, _secondaryId, info.getThisHop(), info.getTunnelId(), null, new TestSuccessfulJob(), failureJob, selector, TEST_TIMEOUT, TEST_PRIORITY);
|
||||
SendTunnelMessageJob j = new SendTunnelMessageJob(getContext(), msg, _secondaryId, info.getThisHop(), info.getTunnelId(), null, new TestSuccessfulJob(), failureJob, selector, getTunnelTestTimeout(), TEST_PRIORITY);
|
||||
getContext().jobQueue().addJob(j);
|
||||
}
|
||||
|
||||
@ -216,6 +252,11 @@ class TestTunnelJob extends JobImpl {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Test of tunnel " + _primaryId+ " successfull after "
|
||||
+ time + "ms waiting for " + _nonce);
|
||||
|
||||
if (time > getTunnelTestTimeout()) {
|
||||
return; // the test failed job should already have run
|
||||
}
|
||||
|
||||
TunnelInfo info = _pool.getTunnelInfo(_primaryId);
|
||||
if (info != null) {
|
||||
TestTunnelJob.this.getContext().messageHistory().tunnelValid(info, time);
|
||||
@ -254,7 +295,7 @@ class TestTunnelJob extends JobImpl {
|
||||
_id = id;
|
||||
_tunnelId = tunnelId;
|
||||
_found = false;
|
||||
_expiration = getContext().clock().now() + TEST_TIMEOUT;
|
||||
_expiration = getContext().clock().now() + getTunnelTestTimeout();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("the expiration while testing tunnel " + tunnelId
|
||||
+ " waiting for nonce " + id + ": " + new Date(_expiration));
|
||||
|
Reference in New Issue
Block a user