2006-01-25 jrandom
* Run the peer profile coalescing/reorganization outside the job queue (on one of the timers), to cut down on some job queue congestion. Also, trim old profiles while running, not just when starting up. * Slightly more sane intra-floodfill-node netDb activity (only flood new entries) * Workaround in the I2PTunnelHTTPServer for some bad requests (though the source of the bug is not yet addressed) * Better I2PSnark reconnection handling * Further cleanup in the new tunnel build process * Make sure we expire old participants properly * Remove much of the transient overload throttling (it wasn't using a good metric)
This commit is contained in:
@ -17,11 +17,11 @@ class RouterThrottleImpl implements RouterThrottle {
|
|||||||
private Log _log;
|
private Log _log;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* arbitrary hard limit of 2 seconds - if its taking this long to get
|
* arbitrary hard limit of 10 seconds - if its taking this long to get
|
||||||
* to a job, we're congested.
|
* to a job, we're congested.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
private static int JOB_LAG_LIMIT = 2000;
|
private static int JOB_LAG_LIMIT = 10*1000;
|
||||||
/**
|
/**
|
||||||
* Arbitrary hard limit - if we throttle our network connection this many
|
* Arbitrary hard limit - if we throttle our network connection this many
|
||||||
* times in the previous 2 minute period, don't accept requests to
|
* times in the previous 2 minute period, don't accept requests to
|
||||||
@ -56,6 +56,7 @@ class RouterThrottleImpl implements RouterThrottle {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public boolean acceptNetworkMessage() {
|
public boolean acceptNetworkMessage() {
|
||||||
|
//if (true) return true;
|
||||||
long lag = _context.jobQueue().getMaxLag();
|
long lag = _context.jobQueue().getMaxLag();
|
||||||
if ( (lag > JOB_LAG_LIMIT) && (_context.router().getUptime() > 60*1000) ) {
|
if ( (lag > JOB_LAG_LIMIT) && (_context.router().getUptime() > 60*1000) ) {
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
@ -87,6 +88,7 @@ class RouterThrottleImpl implements RouterThrottle {
|
|||||||
}
|
}
|
||||||
|
|
||||||
long lag = _context.jobQueue().getMaxLag();
|
long lag = _context.jobQueue().getMaxLag();
|
||||||
|
/*
|
||||||
RateStat rs = _context.statManager().getRate("router.throttleNetworkCause");
|
RateStat rs = _context.statManager().getRate("router.throttleNetworkCause");
|
||||||
Rate r = null;
|
Rate r = null;
|
||||||
if (rs != null)
|
if (rs != null)
|
||||||
@ -100,11 +102,13 @@ class RouterThrottleImpl implements RouterThrottle {
|
|||||||
_context.statManager().addRateData("router.throttleTunnelCause", lag, lag);
|
_context.statManager().addRateData("router.throttleTunnelCause", lag, lag);
|
||||||
return TunnelHistory.TUNNEL_REJECT_TRANSIENT_OVERLOAD;
|
return TunnelHistory.TUNNEL_REJECT_TRANSIENT_OVERLOAD;
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
rs = _context.statManager().getRate("transport.sendProcessingTime");
|
RateStat rs = _context.statManager().getRate("transport.sendProcessingTime");
|
||||||
r = null;
|
Rate r = null;
|
||||||
|
/*
|
||||||
if (rs != null)
|
if (rs != null)
|
||||||
r = rs.getRate(10*60*1000);
|
r = rs.getRate(1*60*1000);
|
||||||
double processTime = (r != null ? r.getAverageValue() : 0);
|
double processTime = (r != null ? r.getAverageValue() : 0);
|
||||||
if (processTime > 2000) {
|
if (processTime > 2000) {
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
@ -113,9 +117,10 @@ class RouterThrottleImpl implements RouterThrottle {
|
|||||||
_context.statManager().addRateData("router.throttleTunnelProcessingTime10m", (long)processTime, (long)processTime);
|
_context.statManager().addRateData("router.throttleTunnelProcessingTime10m", (long)processTime, (long)processTime);
|
||||||
return TunnelHistory.TUNNEL_REJECT_TRANSIENT_OVERLOAD;
|
return TunnelHistory.TUNNEL_REJECT_TRANSIENT_OVERLOAD;
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
if (rs != null)
|
if (rs != null)
|
||||||
r = rs.getRate(60*1000);
|
r = rs.getRate(60*1000);
|
||||||
processTime = (r != null ? r.getAverageValue() : 0);
|
double processTime = (r != null ? r.getAverageValue() : 0);
|
||||||
if (processTime > 2000) {
|
if (processTime > 2000) {
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Refusing tunnel request with the job lag of " + lag
|
_log.debug("Refusing tunnel request with the job lag of " + lag
|
||||||
@ -124,6 +129,7 @@ class RouterThrottleImpl implements RouterThrottle {
|
|||||||
return TunnelHistory.TUNNEL_REJECT_TRANSIENT_OVERLOAD;
|
return TunnelHistory.TUNNEL_REJECT_TRANSIENT_OVERLOAD;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
rs = _context.statManager().getRate("transport.sendMessageFailureLifetime");
|
rs = _context.statManager().getRate("transport.sendMessageFailureLifetime");
|
||||||
r = null;
|
r = null;
|
||||||
if (rs != null)
|
if (rs != null)
|
||||||
@ -142,6 +148,7 @@ class RouterThrottleImpl implements RouterThrottle {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
int numTunnels = _context.tunnelManager().getParticipatingCount();
|
int numTunnels = _context.tunnelManager().getParticipatingCount();
|
||||||
|
|
||||||
@ -251,7 +258,7 @@ class RouterThrottleImpl implements RouterThrottle {
|
|||||||
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Accepting a new tunnel request (now allocating " + bytesAllocated + " bytes across " + numTunnels
|
_log.debug("Accepting a new tunnel request (now allocating " + bytesAllocated + " bytes across " + numTunnels
|
||||||
+ " tunnels with lag of " + lag + " and " + throttleEvents + " throttle events)");
|
+ " tunnels with lag of " + lag + ")");
|
||||||
return TUNNEL_ACCEPT;
|
return TUNNEL_ACCEPT;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
|
|||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public class RouterVersion {
|
public class RouterVersion {
|
||||||
public final static String ID = "$Revision: 1.338 $ $Date: 2006/01/22 19:51:56 $";
|
public final static String ID = "$Revision: 1.339 $ $Date: 2006/01/25 10:34:31 $";
|
||||||
public final static String VERSION = "0.6.1.9";
|
public final static String VERSION = "0.6.1.9";
|
||||||
public final static long BUILD = 7;
|
public final static long BUILD = 8;
|
||||||
public static void main(String args[]) {
|
public static void main(String args[]) {
|
||||||
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
|
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
|
||||||
System.out.println("Router ID: " + RouterVersion.ID);
|
System.out.println("Router ID: " + RouterVersion.ID);
|
||||||
|
@ -322,7 +322,8 @@ public class ClientConnectionRunner {
|
|||||||
*/
|
*/
|
||||||
void receiveMessage(Destination toDest, Destination fromDest, Payload payload) {
|
void receiveMessage(Destination toDest, Destination fromDest, Payload payload) {
|
||||||
if (_dead) return;
|
if (_dead) return;
|
||||||
_context.jobQueue().addJob(new MessageReceivedJob(_context, this, toDest, fromDest, payload));
|
MessageReceivedJob j = new MessageReceivedJob(_context, this, toDest, fromDest, payload);
|
||||||
|
j.runJob();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -82,7 +82,9 @@ public class HandleGarlicMessageJob extends JobImpl implements GarlicMessageRece
|
|||||||
SendMessageDirectJob j = new SendMessageDirectJob(getContext(), data,
|
SendMessageDirectJob j = new SendMessageDirectJob(getContext(), data,
|
||||||
instructions.getRouter(),
|
instructions.getRouter(),
|
||||||
10*1000, 100);
|
10*1000, 100);
|
||||||
getContext().jobQueue().addJob(j);
|
// run it inline (adds to the outNetPool if it has the router info, otherwise queue a lookup)
|
||||||
|
j.runJob();
|
||||||
|
//getContext().jobQueue().addJob(j);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
case DeliveryInstructions.DELIVERY_MODE_TUNNEL:
|
case DeliveryInstructions.DELIVERY_MODE_TUNNEL:
|
||||||
@ -90,9 +92,12 @@ public class HandleGarlicMessageJob extends JobImpl implements GarlicMessageRece
|
|||||||
gw.setMessage(data);
|
gw.setMessage(data);
|
||||||
gw.setTunnelId(instructions.getTunnelId());
|
gw.setTunnelId(instructions.getTunnelId());
|
||||||
gw.setMessageExpiration(data.getMessageExpiration());
|
gw.setMessageExpiration(data.getMessageExpiration());
|
||||||
getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), gw,
|
SendMessageDirectJob job = new SendMessageDirectJob(getContext(), gw,
|
||||||
instructions.getRouter(),
|
instructions.getRouter(),
|
||||||
10*1000, 100));
|
10*1000, 100);
|
||||||
|
// run it inline (adds to the outNetPool if it has the router info, otherwise queue a lookup)
|
||||||
|
job.runJob();
|
||||||
|
// getContext().jobQueue().addJob(job);
|
||||||
return;
|
return;
|
||||||
default:
|
default:
|
||||||
_log.error("Unknown instruction " + instructions.getDeliveryMode() + ": " + instructions);
|
_log.error("Unknown instruction " + instructions.getDeliveryMode() + ": " + instructions);
|
||||||
|
@ -117,6 +117,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
|||||||
ctx.statManager().createRateStat("client.dispatchPrepareTime", "How long until we've queued up the dispatch job (since we started)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
ctx.statManager().createRateStat("client.dispatchPrepareTime", "How long until we've queued up the dispatch job (since we started)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||||
ctx.statManager().createRateStat("client.dispatchTime", "How long until we've dispatched the message (since we started)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
ctx.statManager().createRateStat("client.dispatchTime", "How long until we've dispatched the message (since we started)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||||
ctx.statManager().createRateStat("client.dispatchSendTime", "How long the actual dispatching takes?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
ctx.statManager().createRateStat("client.dispatchSendTime", "How long the actual dispatching takes?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||||
|
ctx.statManager().createRateStat("client.dispatchNoTunnels", "How long after start do we run out of tunnels to send/receive with?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||||
long timeoutMs = OVERALL_TIMEOUT_MS_DEFAULT;
|
long timeoutMs = OVERALL_TIMEOUT_MS_DEFAULT;
|
||||||
_clientMessage = msg;
|
_clientMessage = msg;
|
||||||
_clientMessageId = msg.getMessageId();
|
_clientMessageId = msg.getMessageId();
|
||||||
@ -335,8 +336,9 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
|||||||
// set to null if there are no tunnels to ack the reply back through
|
// set to null if there are no tunnels to ack the reply back through
|
||||||
// (should we always fail for this? or should we send it anyway, even if
|
// (should we always fail for this? or should we send it anyway, even if
|
||||||
// we dont receive the reply? hmm...)
|
// we dont receive the reply? hmm...)
|
||||||
if (_log.shouldLog(Log.ERROR))
|
if (_log.shouldLog(Log.WARN))
|
||||||
_log.error(getJobId() + ": Unable to create the garlic message (no tunnels left) to " + _toString);
|
_log.warn(getJobId() + ": Unable to create the garlic message (no tunnels left) to " + _toString);
|
||||||
|
getContext().statManager().addRateData("client.dispatchNoTunnels", getContext().clock().now() - _start, 0);
|
||||||
dieFatal();
|
dieFatal();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -368,8 +370,9 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
|||||||
else
|
else
|
||||||
dispatchJob.runJob();
|
dispatchJob.runJob();
|
||||||
} else {
|
} else {
|
||||||
if (_log.shouldLog(Log.ERROR))
|
if (_log.shouldLog(Log.WARN))
|
||||||
_log.error(getJobId() + ": Could not find any outbound tunnels to send the payload through... wtf?");
|
_log.warn(getJobId() + ": Could not find any outbound tunnels to send the payload through... this might take a while");
|
||||||
|
getContext().statManager().addRateData("client.dispatchNoTunnels", getContext().clock().now() - _start, 0);
|
||||||
dieFatal();
|
dieFatal();
|
||||||
}
|
}
|
||||||
_clientMessage = null;
|
_clientMessage = null;
|
||||||
|
@ -236,7 +236,8 @@ public class HandleDatabaseLookupMessageJob extends JobImpl {
|
|||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Sending reply directly to " + toPeer);
|
_log.debug("Sending reply directly to " + toPeer);
|
||||||
Job send = new SendMessageDirectJob(getContext(), message, toPeer, REPLY_TIMEOUT, MESSAGE_PRIORITY);
|
Job send = new SendMessageDirectJob(getContext(), message, toPeer, REPLY_TIMEOUT, MESSAGE_PRIORITY);
|
||||||
getContext().netDb().lookupRouterInfo(toPeer, send, null, REPLY_TIMEOUT);
|
send.runJob();
|
||||||
|
//getContext().netDb().lookupRouterInfo(toPeer, send, null, REPLY_TIMEOUT);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -255,7 +256,8 @@ public class HandleDatabaseLookupMessageJob extends JobImpl {
|
|||||||
m.setMessageExpiration(message.getMessageExpiration());
|
m.setMessageExpiration(message.getMessageExpiration());
|
||||||
m.setTunnelId(replyTunnel);
|
m.setTunnelId(replyTunnel);
|
||||||
SendMessageDirectJob j = new SendMessageDirectJob(getContext(), m, toPeer, 10*1000, 100);
|
SendMessageDirectJob j = new SendMessageDirectJob(getContext(), m, toPeer, 10*1000, 100);
|
||||||
getContext().jobQueue().addJob(j);
|
j.runJob();
|
||||||
|
//getContext().jobQueue().addJob(j);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -34,6 +34,9 @@ public class HandleFloodfillDatabaseStoreMessageJob extends JobImpl {
|
|||||||
ctx.statManager().createRateStat("netDb.storeHandled", "How many netDb store messages have we handled?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
ctx.statManager().createRateStat("netDb.storeHandled", "How many netDb store messages have we handled?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||||
ctx.statManager().createRateStat("netDb.storeLeaseSetHandled", "How many leaseSet store messages have we handled?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
ctx.statManager().createRateStat("netDb.storeLeaseSetHandled", "How many leaseSet store messages have we handled?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||||
ctx.statManager().createRateStat("netDb.storeRouterInfoHandled", "How many routerInfo store messages have we handled?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
ctx.statManager().createRateStat("netDb.storeRouterInfoHandled", "How many routerInfo store messages have we handled?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||||
|
ctx.statManager().createRateStat("netDb.storeRecvTime", "How long it takes to handle the local store part of a dbStore?", "NetworkDatabase", new long[] { 60*1000l, 10*60*1000l });
|
||||||
|
ctx.statManager().createRateStat("netDb.storeFloodNew", "How long it takes to flood out a newly received entry?", "NetworkDatabase", new long[] { 60*1000l, 10*60*1000l });
|
||||||
|
ctx.statManager().createRateStat("netDb.storeFloodOld", "How often we receive an old entry?", "NetworkDatabase", new long[] { 60*1000l, 10*60*1000l });
|
||||||
_message = receivedMessage;
|
_message = receivedMessage;
|
||||||
_from = from;
|
_from = from;
|
||||||
_fromHash = fromHash;
|
_fromHash = fromHash;
|
||||||
@ -44,6 +47,8 @@ public class HandleFloodfillDatabaseStoreMessageJob extends JobImpl {
|
|||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Handling database store message");
|
_log.debug("Handling database store message");
|
||||||
|
|
||||||
|
long recvBegin = System.currentTimeMillis();
|
||||||
|
|
||||||
String invalidMessage = null;
|
String invalidMessage = null;
|
||||||
boolean wasNew = false;
|
boolean wasNew = false;
|
||||||
if (_message.getValueType() == DatabaseStoreMessage.KEY_TYPE_LEASESET) {
|
if (_message.getValueType() == DatabaseStoreMessage.KEY_TYPE_LEASESET) {
|
||||||
@ -56,7 +61,7 @@ public class HandleFloodfillDatabaseStoreMessageJob extends JobImpl {
|
|||||||
// receive in response to our own lookups.
|
// receive in response to our own lookups.
|
||||||
ls.setReceivedAsPublished(true);
|
ls.setReceivedAsPublished(true);
|
||||||
LeaseSet match = getContext().netDb().store(_message.getKey(), _message.getLeaseSet());
|
LeaseSet match = getContext().netDb().store(_message.getKey(), _message.getLeaseSet());
|
||||||
if (match == null) {
|
if ( (match == null) || (match.getEarliestLeaseDate() < _message.getLeaseSet().getEarliestLeaseDate()) ) {
|
||||||
wasNew = true;
|
wasNew = true;
|
||||||
} else {
|
} else {
|
||||||
wasNew = false;
|
wasNew = false;
|
||||||
@ -71,8 +76,8 @@ public class HandleFloodfillDatabaseStoreMessageJob extends JobImpl {
|
|||||||
_log.info("Handling dbStore of router " + _message.getKey() + " with publishDate of "
|
_log.info("Handling dbStore of router " + _message.getKey() + " with publishDate of "
|
||||||
+ new Date(_message.getRouterInfo().getPublished()));
|
+ new Date(_message.getRouterInfo().getPublished()));
|
||||||
try {
|
try {
|
||||||
Object match = getContext().netDb().store(_message.getKey(), _message.getRouterInfo());
|
RouterInfo match = getContext().netDb().store(_message.getKey(), _message.getRouterInfo());
|
||||||
wasNew = (null == match);
|
wasNew = ((null == match) || (match.getPublished() < _message.getRouterInfo().getPublished()));
|
||||||
getContext().profileManager().heardAbout(_message.getKey());
|
getContext().profileManager().heardAbout(_message.getKey());
|
||||||
} catch (IllegalArgumentException iae) {
|
} catch (IllegalArgumentException iae) {
|
||||||
invalidMessage = iae.getMessage();
|
invalidMessage = iae.getMessage();
|
||||||
@ -83,22 +88,34 @@ public class HandleFloodfillDatabaseStoreMessageJob extends JobImpl {
|
|||||||
+ ": " + _message);
|
+ ": " + _message);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long recvEnd = System.currentTimeMillis();
|
||||||
|
getContext().statManager().addRateData("netDb.storeRecvTime", recvEnd-recvBegin, 0);
|
||||||
|
|
||||||
if (_message.getReplyToken() > 0)
|
if (_message.getReplyToken() > 0)
|
||||||
sendAck();
|
sendAck();
|
||||||
|
long ackEnd = System.currentTimeMillis();
|
||||||
|
|
||||||
if (_from != null)
|
if (_from != null)
|
||||||
_fromHash = _from.getHash();
|
_fromHash = _from.getHash();
|
||||||
if (_fromHash != null) {
|
if (_fromHash != null) {
|
||||||
if (invalidMessage == null) {
|
if (invalidMessage == null) {
|
||||||
getContext().profileManager().dbStoreReceived(_fromHash, wasNew);
|
getContext().profileManager().dbStoreReceived(_fromHash, wasNew);
|
||||||
getContext().statManager().addRateData("netDb.storeHandled", 1, 0);
|
getContext().statManager().addRateData("netDb.storeHandled", ackEnd-recvEnd, 0);
|
||||||
if (FloodfillNetworkDatabaseFacade.floodfillEnabled(getContext()) && (_message.getReplyToken() > 0) ) {
|
if (FloodfillNetworkDatabaseFacade.floodfillEnabled(getContext()) && (_message.getReplyToken() > 0) ) {
|
||||||
if (_message.getValueType() == DatabaseStoreMessage.KEY_TYPE_LEASESET)
|
if (wasNew) {
|
||||||
_facade.flood(_message.getLeaseSet());
|
long floodBegin = System.currentTimeMillis();
|
||||||
// ERR: see comment in HandleDatabaseLookupMessageJob regarding hidden mode
|
if (_message.getValueType() == DatabaseStoreMessage.KEY_TYPE_LEASESET)
|
||||||
//else if (!_message.getRouterInfo().isHidden())
|
_facade.flood(_message.getLeaseSet());
|
||||||
else
|
// ERR: see comment in HandleDatabaseLookupMessageJob regarding hidden mode
|
||||||
_facade.flood(_message.getRouterInfo());
|
//else if (!_message.getRouterInfo().isHidden())
|
||||||
|
else
|
||||||
|
_facade.flood(_message.getRouterInfo());
|
||||||
|
long floodEnd = System.currentTimeMillis();
|
||||||
|
getContext().statManager().addRateData("netDb.storeFloodNew", floodEnd-floodBegin, 0);
|
||||||
|
} else {
|
||||||
|
// don't flood it *again*
|
||||||
|
getContext().statManager().addRateData("netDb.storeFloodOld", 1, 0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (_log.shouldLog(Log.WARN))
|
if (_log.shouldLog(Log.WARN))
|
||||||
@ -111,14 +128,26 @@ public class HandleFloodfillDatabaseStoreMessageJob extends JobImpl {
|
|||||||
DeliveryStatusMessage msg = new DeliveryStatusMessage(getContext());
|
DeliveryStatusMessage msg = new DeliveryStatusMessage(getContext());
|
||||||
msg.setMessageId(_message.getReplyToken());
|
msg.setMessageId(_message.getReplyToken());
|
||||||
msg.setArrival(getContext().clock().now());
|
msg.setArrival(getContext().clock().now());
|
||||||
TunnelInfo outTunnel = selectOutboundTunnel();
|
/*
|
||||||
if (outTunnel == null) {
|
if (FloodfillNetworkDatabaseFacade.floodfillEnabled(getContext())) {
|
||||||
if (_log.shouldLog(Log.WARN))
|
// no need to do anything but send it where they ask
|
||||||
_log.warn("No outbound tunnel could be found");
|
TunnelGatewayMessage tgm = new TunnelGatewayMessage(getContext());
|
||||||
return;
|
tgm.setMessage(msg);
|
||||||
|
tgm.setTunnelId(_message.getReplyTunnel());
|
||||||
|
tgm.setMessageExpiration(msg.getMessageExpiration());
|
||||||
|
|
||||||
|
getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), tgm, _message.getReplyGateway(), 10*1000, 200));
|
||||||
} else {
|
} else {
|
||||||
getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnel.getSendTunnelId(0), _message.getReplyTunnel(), _message.getReplyGateway());
|
*/
|
||||||
}
|
TunnelInfo outTunnel = selectOutboundTunnel();
|
||||||
|
if (outTunnel == null) {
|
||||||
|
if (_log.shouldLog(Log.WARN))
|
||||||
|
_log.warn("No outbound tunnel could be found");
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
|
getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnel.getSendTunnelId(0), _message.getReplyTunnel(), _message.getReplyGateway());
|
||||||
|
}
|
||||||
|
//}
|
||||||
}
|
}
|
||||||
|
|
||||||
private TunnelInfo selectOutboundTunnel() {
|
private TunnelInfo selectOutboundTunnel() {
|
||||||
|
@ -113,6 +113,7 @@ class HarvesterJob extends JobImpl {
|
|||||||
msg.setSearchKey(peer);
|
msg.setSearchKey(peer);
|
||||||
msg.setReplyTunnel(null);
|
msg.setReplyTunnel(null);
|
||||||
SendMessageDirectJob job = new SendMessageDirectJob(getContext(), msg, peer, 10*1000, PRIORITY);
|
SendMessageDirectJob job = new SendMessageDirectJob(getContext(), msg, peer, 10*1000, PRIORITY);
|
||||||
getContext().jobQueue().addJob(job);
|
job.runJob();
|
||||||
|
//getContext().jobQueue().addJob(job);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -847,7 +847,9 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
|
|||||||
|
|
||||||
public int getPeerTimeout(Hash peer) {
|
public int getPeerTimeout(Hash peer) {
|
||||||
PeerProfile prof = _context.profileOrganizer().getProfile(peer);
|
PeerProfile prof = _context.profileOrganizer().getProfile(peer);
|
||||||
double responseTime = prof.getDbResponseTime().getLifetimeAverageValue();
|
double responseTime = MAX_PER_PEER_TIMEOUT;
|
||||||
|
if (prof != null)
|
||||||
|
responseTime = prof.getDbResponseTime().getLifetimeAverageValue();
|
||||||
if (responseTime < MIN_PER_PEER_TIMEOUT)
|
if (responseTime < MIN_PER_PEER_TIMEOUT)
|
||||||
responseTime = MIN_PER_PEER_TIMEOUT;
|
responseTime = MIN_PER_PEER_TIMEOUT;
|
||||||
else if (responseTime > MAX_PER_PEER_TIMEOUT)
|
else if (responseTime > MAX_PER_PEER_TIMEOUT)
|
||||||
|
@ -225,12 +225,14 @@ class PersistentDataStore extends TransientDataStore {
|
|||||||
int routerCount = 0;
|
int routerCount = 0;
|
||||||
try {
|
try {
|
||||||
File dbDir = getDbDir();
|
File dbDir = getDbDir();
|
||||||
File leaseSetFiles[] = dbDir.listFiles(LeaseSetFilter.getInstance());
|
if (getContext().router().getUptime() < 10*60*1000) {
|
||||||
if (leaseSetFiles != null) {
|
File leaseSetFiles[] = dbDir.listFiles(LeaseSetFilter.getInstance());
|
||||||
for (int i = 0; i < leaseSetFiles.length; i++) {
|
if (leaseSetFiles != null) {
|
||||||
Hash key = getLeaseSetHash(leaseSetFiles[i].getName());
|
for (int i = 0; i < leaseSetFiles.length; i++) {
|
||||||
if ( (key != null) && (!isKnown(key)) )
|
Hash key = getLeaseSetHash(leaseSetFiles[i].getName());
|
||||||
PersistentDataStore.this._context.jobQueue().addJob(new ReadLeaseJob(leaseSetFiles[i], key));
|
if ( (key != null) && (!isKnown(key)) )
|
||||||
|
PersistentDataStore.this._context.jobQueue().addJob(new ReadLeaseJob(leaseSetFiles[i], key));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
File routerInfoFiles[] = dbDir.listFiles(RouterInfoFilter.getInstance());
|
File routerInfoFiles[] = dbDir.listFiles(RouterInfoFilter.getInstance());
|
||||||
|
@ -339,7 +339,7 @@ class SearchJob extends JobImpl {
|
|||||||
protected void sendLeaseSearch(RouterInfo router) {
|
protected void sendLeaseSearch(RouterInfo router) {
|
||||||
TunnelInfo inTunnel = getInboundTunnelId();
|
TunnelInfo inTunnel = getInboundTunnelId();
|
||||||
if (inTunnel == null) {
|
if (inTunnel == null) {
|
||||||
_log.error("No tunnels to get search replies through! wtf!");
|
_log.warn("No tunnels to get search replies through! wtf!");
|
||||||
getContext().jobQueue().addJob(new FailedJob(getContext(), router));
|
getContext().jobQueue().addJob(new FailedJob(getContext(), router));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -362,7 +362,7 @@ class SearchJob extends JobImpl {
|
|||||||
|
|
||||||
TunnelInfo outTunnel = getOutboundTunnelId();
|
TunnelInfo outTunnel = getOutboundTunnelId();
|
||||||
if (outTunnel == null) {
|
if (outTunnel == null) {
|
||||||
_log.error("No tunnels to send search out through! wtf!");
|
_log.warn("No tunnels to send search out through! wtf!");
|
||||||
getContext().jobQueue().addJob(new FailedJob(getContext(), router));
|
getContext().jobQueue().addJob(new FailedJob(getContext(), router));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -398,7 +398,8 @@ class SearchJob extends JobImpl {
|
|||||||
SearchUpdateReplyFoundJob reply = new SearchUpdateReplyFoundJob(getContext(), router, _state, _facade, this);
|
SearchUpdateReplyFoundJob reply = new SearchUpdateReplyFoundJob(getContext(), router, _state, _facade, this);
|
||||||
SendMessageDirectJob j = new SendMessageDirectJob(getContext(), msg, router.getIdentity().getHash(),
|
SendMessageDirectJob j = new SendMessageDirectJob(getContext(), msg, router.getIdentity().getHash(),
|
||||||
reply, new FailedJob(getContext(), router), sel, timeout, SEARCH_PRIORITY);
|
reply, new FailedJob(getContext(), router), sel, timeout, SEARCH_PRIORITY);
|
||||||
getContext().jobQueue().addJob(j);
|
j.runJob();
|
||||||
|
//getContext().jobQueue().addJob(j);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -278,12 +278,12 @@ class StoreJob extends JobImpl {
|
|||||||
|
|
||||||
TunnelInfo replyTunnel = selectInboundTunnel();
|
TunnelInfo replyTunnel = selectInboundTunnel();
|
||||||
if (replyTunnel == null) {
|
if (replyTunnel == null) {
|
||||||
_log.error("No reply inbound tunnels available!");
|
_log.warn("No reply inbound tunnels available!");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
TunnelId replyTunnelId = replyTunnel.getReceiveTunnelId(0);
|
TunnelId replyTunnelId = replyTunnel.getReceiveTunnelId(0);
|
||||||
if (replyTunnel == null) {
|
if (replyTunnel == null) {
|
||||||
_log.error("No reply inbound tunnels available!");
|
_log.warn("No reply inbound tunnels available!");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
msg.setReplyToken(token);
|
msg.setReplyToken(token);
|
||||||
@ -312,8 +312,8 @@ class StoreJob extends JobImpl {
|
|||||||
getContext().messageRegistry().registerPending(selector, onReply, onFail, (int)(expiration - getContext().clock().now()));
|
getContext().messageRegistry().registerPending(selector, onReply, onFail, (int)(expiration - getContext().clock().now()));
|
||||||
getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnel.getSendTunnelId(0), null, peer.getIdentity().getHash());
|
getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnel.getSendTunnelId(0), null, peer.getIdentity().getHash());
|
||||||
} else {
|
} else {
|
||||||
if (_log.shouldLog(Log.ERROR))
|
if (_log.shouldLog(Log.WARN))
|
||||||
_log.error("No outbound tunnels to send a dbStore out!");
|
_log.warn("No outbound tunnels to send a dbStore out!");
|
||||||
fail();
|
fail();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -156,7 +156,8 @@ public class DBHistory {
|
|||||||
public void setUnpromptedDbStoreOld(long num) { _unpromptedDbStoreOld = num; }
|
public void setUnpromptedDbStoreOld(long num) { _unpromptedDbStoreOld = num; }
|
||||||
|
|
||||||
public void coalesceStats() {
|
public void coalesceStats() {
|
||||||
_log.debug("Coallescing stats");
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
|
_log.debug("Coallescing stats");
|
||||||
_failedLookupRate.coalesceStats();
|
_failedLookupRate.coalesceStats();
|
||||||
_invalidReplyRate.coalesceStats();
|
_invalidReplyRate.coalesceStats();
|
||||||
}
|
}
|
||||||
|
@ -1,39 +0,0 @@
|
|||||||
package net.i2p.router.peermanager;
|
|
||||||
|
|
||||||
import java.util.Date;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import net.i2p.data.Hash;
|
|
||||||
import net.i2p.router.JobImpl;
|
|
||||||
import net.i2p.router.RouterContext;
|
|
||||||
import net.i2p.util.Log;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Run across all of the profiles, coallescing the stats and reorganizing them
|
|
||||||
* into appropriate groups. The stat coalesce must be run at least once a minute,
|
|
||||||
* so if the group reorg wants to get changed, this may want to be split into two
|
|
||||||
* jobs.
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
class EvaluateProfilesJob extends JobImpl {
|
|
||||||
private Log _log;
|
|
||||||
|
|
||||||
public EvaluateProfilesJob(RouterContext ctx) {
|
|
||||||
super(ctx);
|
|
||||||
_log = ctx.logManager().getLog(EvaluateProfilesJob.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getName() { return "Evaluate peer profiles"; }
|
|
||||||
public void runJob() {
|
|
||||||
try {
|
|
||||||
getContext().profileOrganizer().reorganize(true);
|
|
||||||
} catch (Throwable t) {
|
|
||||||
_log.log(Log.CRIT, "Error evaluating profiles", t);
|
|
||||||
} finally {
|
|
||||||
requeue(30*1000);
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug("Requeued for " + new Date(getTiming().getStartAfter()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -15,6 +15,7 @@ import java.util.*;
|
|||||||
import net.i2p.data.Hash;
|
import net.i2p.data.Hash;
|
||||||
import net.i2p.router.PeerSelectionCriteria;
|
import net.i2p.router.PeerSelectionCriteria;
|
||||||
import net.i2p.router.RouterContext;
|
import net.i2p.router.RouterContext;
|
||||||
|
import net.i2p.util.SimpleTimer;
|
||||||
import net.i2p.util.Log;
|
import net.i2p.util.Log;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -40,10 +41,23 @@ class PeerManager {
|
|||||||
for (int i = 0; i < _peersByCapability.length; i++)
|
for (int i = 0; i < _peersByCapability.length; i++)
|
||||||
_peersByCapability[i] = new ArrayList(64);
|
_peersByCapability[i] = new ArrayList(64);
|
||||||
loadProfiles();
|
loadProfiles();
|
||||||
_context.jobQueue().addJob(new EvaluateProfilesJob(_context));
|
////_context.jobQueue().addJob(new EvaluateProfilesJob(_context));
|
||||||
|
SimpleTimer.getInstance().addEvent(new Reorg(), 0);
|
||||||
//_context.jobQueue().addJob(new PersistProfilesJob(_context, this));
|
//_context.jobQueue().addJob(new PersistProfilesJob(_context, this));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private class Reorg implements SimpleTimer.TimedEvent {
|
||||||
|
public void timeReached() {
|
||||||
|
try {
|
||||||
|
_organizer.reorganize(true);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
_log.log(Log.CRIT, "Error evaluating profiles", t);
|
||||||
|
} finally {
|
||||||
|
SimpleTimer.getInstance().addEvent(Reorg.this, 30*1000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void storeProfiles() {
|
void storeProfiles() {
|
||||||
Set peers = selectPeers();
|
Set peers = selectPeers();
|
||||||
for (Iterator iter = peers.iterator(); iter.hasNext(); ) {
|
for (Iterator iter = peers.iterator(); iter.hasNext(); ) {
|
||||||
|
@ -115,7 +115,7 @@ public class PeerTestJob extends JobImpl {
|
|||||||
private void testPeer(RouterInfo peer) {
|
private void testPeer(RouterInfo peer) {
|
||||||
TunnelInfo inTunnel = getInboundTunnelId();
|
TunnelInfo inTunnel = getInboundTunnelId();
|
||||||
if (inTunnel == null) {
|
if (inTunnel == null) {
|
||||||
_log.error("No tunnels to get peer test replies through! wtf!");
|
_log.warn("No tunnels to get peer test replies through! wtf!");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
TunnelId inTunnelId = inTunnel.getReceiveTunnelId(0);
|
TunnelId inTunnelId = inTunnel.getReceiveTunnelId(0);
|
||||||
@ -135,7 +135,7 @@ public class PeerTestJob extends JobImpl {
|
|||||||
|
|
||||||
TunnelInfo outTunnel = getOutboundTunnelId();
|
TunnelInfo outTunnel = getOutboundTunnelId();
|
||||||
if (outTunnel == null) {
|
if (outTunnel == null) {
|
||||||
_log.error("No tunnels to send search out through! wtf!");
|
_log.warn("No tunnels to send search out through! wtf!");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -98,6 +98,12 @@ public class ProfileOrganizer {
|
|||||||
_thresholdCapacityValue = 0.0d;
|
_thresholdCapacityValue = 0.0d;
|
||||||
_thresholdIntegrationValue = 0.0d;
|
_thresholdIntegrationValue = 0.0d;
|
||||||
_persistenceHelper = new ProfilePersistenceHelper(_context);
|
_persistenceHelper = new ProfilePersistenceHelper(_context);
|
||||||
|
|
||||||
|
_context.statManager().createRateStat("peer.profileSortTime", "How long the reorg takes sorting peers", "Peers", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
|
||||||
|
_context.statManager().createRateStat("peer.profileCoalesceTime", "How long the reorg takes coalescing peer stats", "Peers", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
|
||||||
|
_context.statManager().createRateStat("peer.profileThresholdTime", "How long the reorg takes determining the tier thresholds", "Peers", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
|
||||||
|
_context.statManager().createRateStat("peer.profilePlaceTime", "How long the reorg takes placing peers in the tiers", "Peers", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
|
||||||
|
_context.statManager().createRateStat("peer.profileReorgTime", "How long the reorg takes overall", "Peers", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setUs(Hash us) { _us = us; }
|
public void setUs(Hash us) { _us = us; }
|
||||||
@ -411,6 +417,20 @@ public class ProfileOrganizer {
|
|||||||
*/
|
*/
|
||||||
public void reorganize() { reorganize(false); }
|
public void reorganize() { reorganize(false); }
|
||||||
public void reorganize(boolean shouldCoalesce) {
|
public void reorganize(boolean shouldCoalesce) {
|
||||||
|
long sortTime = 0;
|
||||||
|
int coalesceTime = 0;
|
||||||
|
long thresholdTime = 0;
|
||||||
|
long placeTime = 0;
|
||||||
|
int profileCount = 0;
|
||||||
|
|
||||||
|
long uptime = _context.router().getUptime();
|
||||||
|
long expireOlderThan = -1;
|
||||||
|
if (uptime > 60*60*1000) {
|
||||||
|
// drop profiles that we haven't spoken with in 6 hours
|
||||||
|
expireOlderThan = _context.clock().now() - 6*60*60*1000;
|
||||||
|
}
|
||||||
|
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
synchronized (_reorganizeLock) {
|
synchronized (_reorganizeLock) {
|
||||||
Set allPeers = _strictCapacityOrder; //new HashSet(_failingPeers.size() + _notFailingPeers.size() + _highCapacityPeers.size() + _fastPeers.size());
|
Set allPeers = _strictCapacityOrder; //new HashSet(_failingPeers.size() + _notFailingPeers.size() + _highCapacityPeers.size() + _fastPeers.size());
|
||||||
//allPeers.addAll(_failingPeers.values());
|
//allPeers.addAll(_failingPeers.values());
|
||||||
@ -419,15 +439,26 @@ public class ProfileOrganizer {
|
|||||||
//allPeers.addAll(_fastPeers.values());
|
//allPeers.addAll(_fastPeers.values());
|
||||||
|
|
||||||
Set reordered = new TreeSet(_comp);
|
Set reordered = new TreeSet(_comp);
|
||||||
|
long sortStart = System.currentTimeMillis();
|
||||||
for (Iterator iter = _strictCapacityOrder.iterator(); iter.hasNext(); ) {
|
for (Iterator iter = _strictCapacityOrder.iterator(); iter.hasNext(); ) {
|
||||||
PeerProfile prof = (PeerProfile)iter.next();
|
PeerProfile prof = (PeerProfile)iter.next();
|
||||||
if (shouldCoalesce)
|
if ( (expireOlderThan > 0) && (prof.getLastSendSuccessful() <= expireOlderThan) )
|
||||||
|
continue; // drop, but no need to delete, since we don't periodically reread
|
||||||
|
|
||||||
|
if (shouldCoalesce) {
|
||||||
|
long coalesceStart = System.currentTimeMillis();
|
||||||
prof.coalesceStats();
|
prof.coalesceStats();
|
||||||
|
coalesceTime += (int)(System.currentTimeMillis()-coalesceStart);
|
||||||
|
}
|
||||||
reordered.add(prof);
|
reordered.add(prof);
|
||||||
|
profileCount++;
|
||||||
}
|
}
|
||||||
|
sortTime = System.currentTimeMillis() - sortStart;
|
||||||
_strictCapacityOrder = reordered;
|
_strictCapacityOrder = reordered;
|
||||||
|
|
||||||
|
long thresholdStart = System.currentTimeMillis();
|
||||||
locked_calculateThresholds(allPeers);
|
locked_calculateThresholds(allPeers);
|
||||||
|
thresholdTime = System.currentTimeMillis()-thresholdStart;
|
||||||
|
|
||||||
_failingPeers.clear();
|
_failingPeers.clear();
|
||||||
_fastPeers.clear();
|
_fastPeers.clear();
|
||||||
@ -436,6 +467,8 @@ public class ProfileOrganizer {
|
|||||||
_notFailingPeersList.clear();
|
_notFailingPeersList.clear();
|
||||||
_wellIntegratedPeers.clear();
|
_wellIntegratedPeers.clear();
|
||||||
|
|
||||||
|
long placeStart = System.currentTimeMillis();
|
||||||
|
|
||||||
for (Iterator iter = allPeers.iterator(); iter.hasNext(); ) {
|
for (Iterator iter = allPeers.iterator(); iter.hasNext(); ) {
|
||||||
PeerProfile profile = (PeerProfile)iter.next();
|
PeerProfile profile = (PeerProfile)iter.next();
|
||||||
locked_placeProfile(profile);
|
locked_placeProfile(profile);
|
||||||
@ -446,6 +479,8 @@ public class ProfileOrganizer {
|
|||||||
|
|
||||||
Collections.shuffle(_notFailingPeersList, _context.random());
|
Collections.shuffle(_notFailingPeersList, _context.random());
|
||||||
|
|
||||||
|
placeTime = System.currentTimeMillis()-placeStart;
|
||||||
|
|
||||||
if (_log.shouldLog(Log.DEBUG)) {
|
if (_log.shouldLog(Log.DEBUG)) {
|
||||||
_log.debug("Profiles reorganized. averages: [integration: " + _thresholdIntegrationValue
|
_log.debug("Profiles reorganized. averages: [integration: " + _thresholdIntegrationValue
|
||||||
+ ", capacity: " + _thresholdCapacityValue + ", speed: " + _thresholdSpeedValue + "]");
|
+ ", capacity: " + _thresholdCapacityValue + ", speed: " + _thresholdSpeedValue + "]");
|
||||||
@ -458,6 +493,13 @@ public class ProfileOrganizer {
|
|||||||
_log.debug("fast: " + _fastPeers.values());
|
_log.debug("fast: " + _fastPeers.values());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long total = System.currentTimeMillis()-start;
|
||||||
|
_context.statManager().addRateData("peer.profileSortTime", sortTime, profileCount);
|
||||||
|
_context.statManager().addRateData("peer.profileCoalesceTime", coalesceTime, profileCount);
|
||||||
|
_context.statManager().addRateData("peer.profileThresholdTime", thresholdTime, profileCount);
|
||||||
|
_context.statManager().addRateData("peer.profilePlaceTime", placeTime, profileCount);
|
||||||
|
_context.statManager().addRateData("peer.profileReorgTime", total, profileCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -92,8 +92,8 @@ public class InboundMessageDistributor implements GarlicMessageReceiver.CloveRec
|
|||||||
// so we send it out a tunnel first
|
// so we send it out a tunnel first
|
||||||
TunnelInfo out = _context.tunnelManager().selectOutboundTunnel(_client);
|
TunnelInfo out = _context.tunnelManager().selectOutboundTunnel(_client);
|
||||||
if (out == null) {
|
if (out == null) {
|
||||||
if (_log.shouldLog(Log.ERROR))
|
if (_log.shouldLog(Log.WARN))
|
||||||
_log.error("no outbound tunnel to send the client message for " + _client + ": " + msg);
|
_log.warn("no outbound tunnel to send the client message for " + _client + ": " + msg);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (_log.shouldLog(Log.INFO))
|
if (_log.shouldLog(Log.INFO))
|
||||||
|
@ -532,6 +532,7 @@ public class TunnelDispatcher implements Service {
|
|||||||
|
|
||||||
public LeaveTunnel(RouterContext ctx) {
|
public LeaveTunnel(RouterContext ctx) {
|
||||||
super(ctx);
|
super(ctx);
|
||||||
|
getTiming().setStartAfter(ctx.clock().now());
|
||||||
_configs = new ArrayList(128);
|
_configs = new ArrayList(128);
|
||||||
_times = new ArrayList(128);
|
_times = new ArrayList(128);
|
||||||
}
|
}
|
||||||
@ -543,12 +544,15 @@ public class TunnelDispatcher implements Service {
|
|||||||
_times.add(dropTime);
|
_times.add(dropTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (_log.shouldLog(Log.INFO)) {
|
||||||
|
long now = getContext().clock().now();
|
||||||
|
_log.info("Scheduling leave in " + DataHelper.formatDuration(dropTime.longValue()-now) +": " + cfg);
|
||||||
|
}
|
||||||
|
|
||||||
long oldAfter = getTiming().getStartAfter();
|
long oldAfter = getTiming().getStartAfter();
|
||||||
if (oldAfter < getContext().clock().now()) {
|
if ( (oldAfter <= 0) || (oldAfter < getContext().clock().now()) || (oldAfter >= dropTime.longValue()) ) {
|
||||||
getTiming().setStartAfter(dropTime.longValue());
|
getTiming().setStartAfter(dropTime.longValue());
|
||||||
getContext().jobQueue().addJob(LeaveTunnel.this);
|
getContext().jobQueue().addJob(LeaveTunnel.this);
|
||||||
} else if (oldAfter >= dropTime.longValue()) {
|
|
||||||
getTiming().setStartAfter(dropTime.longValue());
|
|
||||||
} else {
|
} else {
|
||||||
// already scheduled for the future, and before this expiration
|
// already scheduled for the future, and before this expiration
|
||||||
}
|
}
|
||||||
@ -559,22 +563,28 @@ public class TunnelDispatcher implements Service {
|
|||||||
HopConfig cur = null;
|
HopConfig cur = null;
|
||||||
Long nextTime = null;
|
Long nextTime = null;
|
||||||
long now = getContext().clock().now();
|
long now = getContext().clock().now();
|
||||||
synchronized (LeaveTunnel.this) {
|
while (true) {
|
||||||
if (_configs.size() <= 0)
|
synchronized (LeaveTunnel.this) {
|
||||||
return;
|
if (_configs.size() <= 0)
|
||||||
nextTime = (Long)_times.get(0);
|
return;
|
||||||
if (nextTime.longValue() <= now) {
|
nextTime = (Long)_times.get(0);
|
||||||
cur = (HopConfig)_configs.remove(0);
|
if (nextTime.longValue() <= now) {
|
||||||
_times.remove(0);
|
cur = (HopConfig)_configs.remove(0);
|
||||||
if (_times.size() > 0)
|
_times.remove(0);
|
||||||
nextTime = (Long)_times.get(0);
|
if (_times.size() > 0)
|
||||||
else
|
nextTime = (Long)_times.get(0);
|
||||||
nextTime = null;
|
else
|
||||||
|
nextTime = null;
|
||||||
|
} else {
|
||||||
|
cur = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if (cur != null)
|
if (cur != null)
|
||||||
remove(cur);
|
remove(cur);
|
||||||
|
else
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
if (nextTime != null) {
|
if (nextTime != null) {
|
||||||
getTiming().setStartAfter(nextTime.longValue());
|
getTiming().setStartAfter(nextTime.longValue());
|
||||||
|
@ -4,6 +4,7 @@ import java.util.*;
|
|||||||
import net.i2p.router.Job;
|
import net.i2p.router.Job;
|
||||||
import net.i2p.router.JobImpl;
|
import net.i2p.router.JobImpl;
|
||||||
import net.i2p.router.RouterContext;
|
import net.i2p.router.RouterContext;
|
||||||
|
import net.i2p.router.TunnelManagerFacade;
|
||||||
import net.i2p.router.tunnel.TunnelCreatorConfig;
|
import net.i2p.router.tunnel.TunnelCreatorConfig;
|
||||||
import net.i2p.util.Log;
|
import net.i2p.util.Log;
|
||||||
|
|
||||||
@ -73,7 +74,7 @@ class BuildExecutor implements Runnable {
|
|||||||
List wanted = new ArrayList(8);
|
List wanted = new ArrayList(8);
|
||||||
List pools = new ArrayList(8);
|
List pools = new ArrayList(8);
|
||||||
|
|
||||||
while (!_manager.isShutdown()) {
|
while (!_manager.isShutdown()){
|
||||||
try {
|
try {
|
||||||
_manager.listPools(pools);
|
_manager.listPools(pools);
|
||||||
for (int i = 0; i < pools.size(); i++) {
|
for (int i = 0; i < pools.size(); i++) {
|
||||||
@ -91,38 +92,51 @@ class BuildExecutor implements Runnable {
|
|||||||
// zero hop ones can run inline
|
// zero hop ones can run inline
|
||||||
allowed = buildZeroHopTunnels(wanted, allowed);
|
allowed = buildZeroHopTunnels(wanted, allowed);
|
||||||
|
|
||||||
if ( (allowed > 0) && (wanted.size() > 0) ) {
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
Collections.shuffle(wanted, _context.random());
|
_log.debug("Zero hops built, Allowed: " + allowed + " wanted: " + wanted);
|
||||||
for (int i = 0; (i < allowed) && (wanted.size() > 0); i++) {
|
|
||||||
TunnelPool pool = (TunnelPool)wanted.remove(0);
|
TunnelManagerFacade mgr = _context.tunnelManager();
|
||||||
//if (pool.countWantedTunnels() <= 0)
|
if ( (mgr == null) || (mgr.selectInboundTunnel() == null) || (mgr.selectOutboundTunnel() == null) ) {
|
||||||
// continue;
|
// we don't have either inbound or outbound tunnels, so don't bother trying to build
|
||||||
PooledTunnelCreatorConfig cfg = pool.configureNewTunnel();
|
// non-zero-hop tunnels
|
||||||
if (cfg != null) {
|
synchronized (_currentlyBuilding) {
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
_currentlyBuilding.wait(5*1000+_context.random().nextInt(5*1000));
|
||||||
_log.debug("Configuring new tunnel " + i + " for " + pool + ": " + cfg);
|
|
||||||
synchronized (_currentlyBuilding) {
|
|
||||||
_currentlyBuilding.add(cfg);
|
|
||||||
}
|
|
||||||
buildTunnel(pool, cfg);
|
|
||||||
if (cfg.getLength() <= 1)
|
|
||||||
i--; //0hop, we can keep going, as there's no worry about throttling
|
|
||||||
} else {
|
|
||||||
i--;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if ( (allowed > 0) && (wanted.size() > 0) ) {
|
||||||
_log.debug("Nothin' doin, wait for a while");
|
Collections.shuffle(wanted, _context.random());
|
||||||
try {
|
for (int i = 0; (i < allowed) && (wanted.size() > 0); i++) {
|
||||||
synchronized (_currentlyBuilding) {
|
TunnelPool pool = (TunnelPool)wanted.remove(0);
|
||||||
if (allowed <= 0)
|
//if (pool.countWantedTunnels() <= 0)
|
||||||
_currentlyBuilding.wait(_context.random().nextInt(5*1000));
|
// continue;
|
||||||
else // wanted <= 0
|
PooledTunnelCreatorConfig cfg = pool.configureNewTunnel();
|
||||||
_currentlyBuilding.wait(_context.random().nextInt(30*1000));
|
if (cfg != null) {
|
||||||
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
|
_log.debug("Configuring new tunnel " + i + " for " + pool + ": " + cfg);
|
||||||
|
synchronized (_currentlyBuilding) {
|
||||||
|
_currentlyBuilding.add(cfg);
|
||||||
|
}
|
||||||
|
buildTunnel(pool, cfg);
|
||||||
|
// 0hops are taken care of above, these are nonstandard 0hops
|
||||||
|
//if (cfg.getLength() <= 1)
|
||||||
|
// i--; //0hop, we can keep going, as there's no worry about throttling
|
||||||
|
} else {
|
||||||
|
i--;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
|
_log.debug("Nothin' doin, wait for a while");
|
||||||
|
try {
|
||||||
|
synchronized (_currentlyBuilding) {
|
||||||
|
if (allowed <= 0)
|
||||||
|
_currentlyBuilding.wait(_context.random().nextInt(5*1000));
|
||||||
|
else // wanted <= 0
|
||||||
|
_currentlyBuilding.wait(_context.random().nextInt(30*1000));
|
||||||
|
}
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
// someone wanted to build something
|
||||||
}
|
}
|
||||||
} catch (InterruptedException ie) {
|
|
||||||
// someone wanted to build something
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -193,8 +193,11 @@ public class HandleTunnelCreateMessageJob extends JobImpl {
|
|||||||
_log.debug("sending (" + status + ") to the tunnel "
|
_log.debug("sending (" + status + ") to the tunnel "
|
||||||
+ _request.getReplyGateway().toBase64().substring(0,4) + ":"
|
+ _request.getReplyGateway().toBase64().substring(0,4) + ":"
|
||||||
+ _request.getReplyTunnel() + " wrt " + _request);
|
+ _request.getReplyTunnel() + " wrt " + _request);
|
||||||
getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), gw, _request.getReplyGateway(),
|
SendMessageDirectJob job = new SendMessageDirectJob(getContext(), gw, _request.getReplyGateway(),
|
||||||
REPLY_TIMEOUT, REPLY_PRIORITY));
|
REPLY_TIMEOUT, REPLY_PRIORITY);
|
||||||
|
// run it inline (adds to the outNetPool if it has the router info, otherwise queue a lookup)
|
||||||
|
job.runJob();
|
||||||
|
//getContext().jobQueue().addJob(job);
|
||||||
}
|
}
|
||||||
|
|
||||||
private GarlicMessage createReply(TunnelCreateStatusMessage reply) {
|
private GarlicMessage createReply(TunnelCreateStatusMessage reply) {
|
||||||
|
@ -14,6 +14,7 @@ public class PooledTunnelCreatorConfig extends TunnelCreatorConfig {
|
|||||||
private boolean _failed;
|
private boolean _failed;
|
||||||
private TestJob _testJob;
|
private TestJob _testJob;
|
||||||
private Job _expireJob;
|
private Job _expireJob;
|
||||||
|
private int _failures;
|
||||||
|
|
||||||
/** Creates a new instance of PooledTunnelCreatorConfig */
|
/** Creates a new instance of PooledTunnelCreatorConfig */
|
||||||
|
|
||||||
@ -24,6 +25,7 @@ public class PooledTunnelCreatorConfig extends TunnelCreatorConfig {
|
|||||||
super(ctx, length, isInbound, destination);
|
super(ctx, length, isInbound, destination);
|
||||||
_failed = false;
|
_failed = false;
|
||||||
_pool = null;
|
_pool = null;
|
||||||
|
_failures = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -31,6 +33,11 @@ public class PooledTunnelCreatorConfig extends TunnelCreatorConfig {
|
|||||||
if (_testJob != null) {
|
if (_testJob != null) {
|
||||||
_testJob.testSuccessful(ms);
|
_testJob.testSuccessful(ms);
|
||||||
}
|
}
|
||||||
|
int failures = _failures - 1;
|
||||||
|
if (failures < 0)
|
||||||
|
_failures = 0;
|
||||||
|
else
|
||||||
|
_failures = failures;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Properties getOptions() {
|
public Properties getOptions() {
|
||||||
@ -38,17 +45,25 @@ public class PooledTunnelCreatorConfig extends TunnelCreatorConfig {
|
|||||||
return _pool.getSettings().getUnknownOptions();
|
return _pool.getSettings().getUnknownOptions();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final int MAX_CONSECUTIVE_TEST_FAILURES = 2;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The tunnel failed, so stop using it
|
* The tunnel failed, so stop using it
|
||||||
*/
|
*/
|
||||||
public void tunnelFailed() {
|
public boolean tunnelFailed() {
|
||||||
_failed = true;
|
_failures++;
|
||||||
// remove us from the pool (but not the dispatcher) so that we aren't
|
if (_failures > MAX_CONSECUTIVE_TEST_FAILURES) {
|
||||||
// selected again. _expireJob is left to do its thing, in case there
|
_failed = true;
|
||||||
// are any straggling messages coming down the tunnel
|
// remove us from the pool (but not the dispatcher) so that we aren't
|
||||||
_pool.tunnelFailed(this);
|
// selected again. _expireJob is left to do its thing, in case there
|
||||||
if (_testJob != null) // just in case...
|
// are any straggling messages coming down the tunnel
|
||||||
_context.jobQueue().removeJob(_testJob);
|
_pool.tunnelFailed(this);
|
||||||
|
if (_testJob != null) // just in case...
|
||||||
|
_context.jobQueue().removeJob(_testJob);
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
public boolean getTunnelFailed() { return _failed; }
|
public boolean getTunnelFailed() { return _failed; }
|
||||||
public void setTunnelPool(TunnelPool pool) { _pool = pool; }
|
public void setTunnelPool(TunnelPool pool) { _pool = pool; }
|
||||||
|
@ -78,8 +78,8 @@ public class RequestTunnelJob extends JobImpl {
|
|||||||
ctx.statManager().createRateStat("tunnel.buildExploratorySuccess3Hop", "How often we succeed building a 3 hop exploratory tunnel?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
ctx.statManager().createRateStat("tunnel.buildExploratorySuccess3Hop", "How often we succeed building a 3 hop exploratory tunnel?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||||
ctx.statManager().createRateStat("tunnel.buildPartialTime", "How long a non-exploratory request took to be accepted?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
ctx.statManager().createRateStat("tunnel.buildPartialTime", "How long a non-exploratory request took to be accepted?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||||
ctx.statManager().createRateStat("tunnel.buildExploratoryPartialTime", "How long an exploratory request took to be accepted?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
ctx.statManager().createRateStat("tunnel.buildExploratoryPartialTime", "How long an exploratory request took to be accepted?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||||
ctx.statManager().createRateStat("tunnel.buildExploratoryTimeout", "How often a request for an exploratory peer times out?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
ctx.statManager().createRateStat("tunnel.buildExploratoryTimeout", "How often a request for an exploratory tunnel's peer times out?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||||
ctx.statManager().createRateStat("tunnel.buildClientTimeout", "How often a request for an exploratory peer times out?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
ctx.statManager().createRateStat("tunnel.buildClientTimeout", "How often a request for a client tunnel's peer times out?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||||
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Requesting hop " + hop + " in " + cfg);
|
_log.debug("Requesting hop " + hop + " in " + cfg);
|
||||||
@ -192,9 +192,10 @@ public class RequestTunnelJob extends JobImpl {
|
|||||||
|
|
||||||
TunnelInfo replyTunnel = getContext().tunnelManager().selectInboundTunnel();
|
TunnelInfo replyTunnel = getContext().tunnelManager().selectInboundTunnel();
|
||||||
if (replyTunnel == null) {
|
if (replyTunnel == null) {
|
||||||
if (_log.shouldLog(Log.ERROR))
|
if (_log.shouldLog(Log.WARN))
|
||||||
_log.error("No inbound tunnels to build tunnels with!");
|
_log.warn("No inbound tunnels to build tunnels with!");
|
||||||
tunnelFail();
|
tunnelFail();
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
Hash replyGateway = replyTunnel.getPeer(0);
|
Hash replyGateway = replyTunnel.getPeer(0);
|
||||||
|
|
||||||
@ -214,6 +215,11 @@ public class RequestTunnelJob extends JobImpl {
|
|||||||
msg.setReplyTag(replyTag);
|
msg.setReplyTag(replyTag);
|
||||||
int duration = 10*60; // (int)((_config.getExpiration() - getContext().clock().now())/1000);
|
int duration = 10*60; // (int)((_config.getExpiration() - getContext().clock().now())/1000);
|
||||||
msg.setDurationSeconds(duration);
|
msg.setDurationSeconds(duration);
|
||||||
|
long now = getContext().clock().now();
|
||||||
|
if (_isExploratory)
|
||||||
|
msg.setMessageExpiration(now + HOP_REQUEST_TIMEOUT_EXPLORATORY);
|
||||||
|
else
|
||||||
|
msg.setMessageExpiration(now + HOP_REQUEST_TIMEOUT_CLIENT);
|
||||||
if (_currentHop == 0)
|
if (_currentHop == 0)
|
||||||
msg.setIsGateway(true);
|
msg.setIsGateway(true);
|
||||||
else
|
else
|
||||||
|
@ -75,6 +75,7 @@ class SendGarlicMessageJob extends JobImpl {
|
|||||||
if (_onTimeout != null)
|
if (_onTimeout != null)
|
||||||
getContext().jobQueue().addJob(_onTimeout);
|
getContext().jobQueue().addJob(_onTimeout);
|
||||||
getContext().messageRegistry().unregisterPending(dummyMessage);
|
getContext().messageRegistry().unregisterPending(dummyMessage);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
TunnelId outId = out.getSendTunnelId(0);
|
TunnelId outId = out.getSendTunnelId(0);
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
|
@ -41,6 +41,10 @@ class TestJob extends JobImpl {
|
|||||||
new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
|
new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
|
||||||
ctx.statManager().createRateStat("tunnel.testExploratoryFailedTime", "How long did the failure of an exploratory tunnel take (max of 60s for full timeout)?", "Tunnels",
|
ctx.statManager().createRateStat("tunnel.testExploratoryFailedTime", "How long did the failure of an exploratory tunnel take (max of 60s for full timeout)?", "Tunnels",
|
||||||
new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
|
new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
|
||||||
|
ctx.statManager().createRateStat("tunnel.testFailedCompletelyTime", "How long did the complete failure take (max of 60s for full timeout)?", "Tunnels",
|
||||||
|
new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
|
||||||
|
ctx.statManager().createRateStat("tunnel.testExploratoryFailedCompletelyTime", "How long did the complete failure of an exploratory tunnel take (max of 60s for full timeout)?", "Tunnels",
|
||||||
|
new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
|
||||||
ctx.statManager().createRateStat("tunnel.testSuccessLength", "How long were the tunnels that passed the test?", "Tunnels",
|
ctx.statManager().createRateStat("tunnel.testSuccessLength", "How long were the tunnels that passed the test?", "Tunnels",
|
||||||
new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
|
new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
|
||||||
ctx.statManager().createRateStat("tunnel.testSuccessTime", "How long did tunnel testing take?", "Tunnels",
|
ctx.statManager().createRateStat("tunnel.testSuccessTime", "How long did tunnel testing take?", "Tunnels",
|
||||||
@ -72,8 +76,8 @@ class TestJob extends JobImpl {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if ( (_replyTunnel == null) || (_outTunnel == null) ) {
|
if ( (_replyTunnel == null) || (_outTunnel == null) ) {
|
||||||
if (_log.shouldLog(Log.ERROR))
|
if (_log.shouldLog(Log.WARN))
|
||||||
_log.error("Insufficient tunnels to test " + _cfg + " with: " + _replyTunnel + " / " + _outTunnel);
|
_log.warn("Insufficient tunnels to test " + _cfg + " with: " + _replyTunnel + " / " + _outTunnel);
|
||||||
getContext().statManager().addRateData("tunnel.testAborted", _cfg.getLength(), 0);
|
getContext().statManager().addRateData("tunnel.testAborted", _cfg.getLength(), 0);
|
||||||
scheduleRetest();
|
scheduleRetest();
|
||||||
} else {
|
} else {
|
||||||
@ -161,9 +165,17 @@ class TestJob extends JobImpl {
|
|||||||
getContext().statManager().addRateData("tunnel.testExploratoryFailedTime", timeToFail, timeToFail);
|
getContext().statManager().addRateData("tunnel.testExploratoryFailedTime", timeToFail, timeToFail);
|
||||||
else
|
else
|
||||||
getContext().statManager().addRateData("tunnel.testFailedTime", timeToFail, timeToFail);
|
getContext().statManager().addRateData("tunnel.testFailedTime", timeToFail, timeToFail);
|
||||||
_cfg.tunnelFailed();
|
|
||||||
if (_log.shouldLog(Log.WARN))
|
if (_log.shouldLog(Log.WARN))
|
||||||
_log.warn("Tunnel test failed in " + timeToFail + "ms: " + _cfg);
|
_log.warn("Tunnel test failed in " + timeToFail + "ms: " + _cfg);
|
||||||
|
boolean keepGoing = _cfg.tunnelFailed();
|
||||||
|
if (keepGoing) {
|
||||||
|
scheduleRetest();
|
||||||
|
} else {
|
||||||
|
if (_pool.getSettings().isExploratory())
|
||||||
|
getContext().statManager().addRateData("tunnel.testExploratoryFailedCompletelyTime", timeToFail, timeToFail);
|
||||||
|
else
|
||||||
|
getContext().statManager().addRateData("tunnel.testFailedCompletelyTime", timeToFail, timeToFail);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** randomized time we should wait before testing */
|
/** randomized time we should wait before testing */
|
||||||
|
@ -45,6 +45,7 @@ abstract class TunnelPeerSelector {
|
|||||||
if (length < 0)
|
if (length < 0)
|
||||||
length = 0;
|
length = 0;
|
||||||
}
|
}
|
||||||
|
/*
|
||||||
if ( (ctx.tunnelManager().getOutboundTunnelCount() <= 0) ||
|
if ( (ctx.tunnelManager().getOutboundTunnelCount() <= 0) ||
|
||||||
(ctx.tunnelManager().getFreeTunnelCount() <= 0) ) {
|
(ctx.tunnelManager().getFreeTunnelCount() <= 0) ) {
|
||||||
Log log = ctx.logManager().getLog(TunnelPeerSelector.class);
|
Log log = ctx.logManager().getLog(TunnelPeerSelector.class);
|
||||||
@ -59,6 +60,7 @@ abstract class TunnelPeerSelector {
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
return length;
|
return length;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -241,6 +241,8 @@ public class TunnelPool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_manager.getExecutor().repoll();
|
||||||
|
|
||||||
_lifetimeProcessed += info.getProcessedMessagesCount();
|
_lifetimeProcessed += info.getProcessedMessagesCount();
|
||||||
|
|
||||||
long lifetimeConfirmed = info.getVerifiedBytesTransferred();
|
long lifetimeConfirmed = info.getVerifiedBytesTransferred();
|
||||||
@ -260,8 +262,6 @@ public class TunnelPool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_manager.getExecutor().repoll();
|
|
||||||
|
|
||||||
boolean connected = true;
|
boolean connected = true;
|
||||||
if ( (_settings.getDestination() != null) && (!_context.clientManager().isLocal(_settings.getDestination())) )
|
if ( (_settings.getDestination() != null) && (!_context.clientManager().isLocal(_settings.getDestination())) )
|
||||||
connected = false;
|
connected = false;
|
||||||
@ -410,35 +410,42 @@ public class TunnelPool {
|
|||||||
|
|
||||||
boolean allowZeroHop = ((getSettings().getLength() + getSettings().getLengthVariance()) <= 0);
|
boolean allowZeroHop = ((getSettings().getLength() + getSettings().getLengthVariance()) <= 0);
|
||||||
|
|
||||||
long expireAfter = _context.clock().now() + (2 * _settings.getRebuildPeriod());
|
long expireAfter = _context.clock().now() + _expireSkew; // + _settings.getRebuildPeriod() + _expireSkew;
|
||||||
expireAfter += _expireSkew;
|
int expire30s = 0;
|
||||||
|
int expire90s = 0;
|
||||||
|
int expire150s = 0;
|
||||||
|
int expire210s = 0;
|
||||||
|
int expire270s = 0;
|
||||||
|
int expireLater = 0;
|
||||||
|
|
||||||
long earliestExpire = -1;
|
|
||||||
int live = 0;
|
|
||||||
int fallback = 0;
|
int fallback = 0;
|
||||||
int usable = 0;
|
|
||||||
synchronized (_tunnels) {
|
synchronized (_tunnels) {
|
||||||
boolean enough = _tunnels.size() > wanted;
|
boolean enough = _tunnels.size() > wanted;
|
||||||
for (int i = 0; i < _tunnels.size(); i++) {
|
for (int i = 0; i < _tunnels.size(); i++) {
|
||||||
TunnelInfo info = (TunnelInfo)_tunnels.get(i);
|
TunnelInfo info = (TunnelInfo)_tunnels.get(i);
|
||||||
if (info.getExpiration() > expireAfter) {
|
if (allowZeroHop || (info.getLength() > 1)) {
|
||||||
if (allowZeroHop || (info.getLength() > 1)) {
|
long timeToExpire = info.getExpiration() - expireAfter;
|
||||||
usable++;
|
if (timeToExpire <= 0) {
|
||||||
if ( (info.getExpiration() < earliestExpire) || (earliestExpire < 0) )
|
// consider it unusable
|
||||||
earliestExpire = info.getExpiration();
|
} else if (timeToExpire <= 30*1000) {
|
||||||
|
expire30s++;
|
||||||
|
} else if (timeToExpire <= 90*1000) {
|
||||||
|
expire90s++;
|
||||||
|
} else if (timeToExpire <= 150*1000) {
|
||||||
|
expire150s++;
|
||||||
|
} else if (timeToExpire <= 210*1000) {
|
||||||
|
expire210s++;
|
||||||
|
} else if (timeToExpire <= 270*1000) {
|
||||||
|
expire270s++;
|
||||||
|
} else {
|
||||||
|
expireLater++;
|
||||||
}
|
}
|
||||||
}
|
} else if (info.getExpiration() > expireAfter) {
|
||||||
live++;
|
|
||||||
if ( (info.getLength() <= 1) && (info.getExpiration() > expireAfter) )
|
|
||||||
fallback++;
|
fallback++;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (usable < wanted) {
|
|
||||||
// if we are short on tunnels, build fast
|
|
||||||
earliestExpire = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int inProgress = 0;
|
int inProgress = 0;
|
||||||
synchronized (_inProgress) {
|
synchronized (_inProgress) {
|
||||||
inProgress = _inProgress.size();
|
inProgress = _inProgress.size();
|
||||||
@ -449,7 +456,8 @@ public class TunnelPool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return countHowManyToBuild(allowZeroHop, earliestExpire, usable, wanted, inProgress, fallback);
|
return countHowManyToBuild(allowZeroHop, expire30s, expire90s, expire150s, expire210s, expire270s,
|
||||||
|
expireLater, wanted, inProgress, fallback);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -459,45 +467,94 @@ public class TunnelPool {
|
|||||||
* @param allowZeroHop do we normally allow zero hop tunnels? If true, treat fallback tunnels like normal ones
|
* @param allowZeroHop do we normally allow zero hop tunnels? If true, treat fallback tunnels like normal ones
|
||||||
* @param earliestExpire how soon do some of our usable tunnels expire, or, if we are missing tunnels, -1
|
* @param earliestExpire how soon do some of our usable tunnels expire, or, if we are missing tunnels, -1
|
||||||
* @param usable how many tunnels will be around for a while (may include fallback tunnels)
|
* @param usable how many tunnels will be around for a while (may include fallback tunnels)
|
||||||
|
* @param wantToReplace how many tunnels are still usable, but approaching unusability
|
||||||
* @param standardAmount how many tunnels we want to have, in general
|
* @param standardAmount how many tunnels we want to have, in general
|
||||||
* @param inProgress how many tunnels are being built for this pool right now (may include fallback tunnels)
|
* @param inProgress how many tunnels are being built for this pool right now (may include fallback tunnels)
|
||||||
* @param fallback how many zero hop tunnels do we have, or are being built
|
* @param fallback how many zero hop tunnels do we have, or are being built
|
||||||
*/
|
*/
|
||||||
private int countHowManyToBuild(boolean allowZeroHop, long earliestExpire, int usable, int standardAmount,
|
private int countHowManyToBuild(boolean allowZeroHop, int expire30s, int expire90s, int expire150s, int expire210s,
|
||||||
int inProgress, int fallback) {
|
int expire270s, int expireLater, int standardAmount, int inProgress, int fallback) {
|
||||||
int howMany = 0;
|
int rv = 0;
|
||||||
|
int remainingWanted = standardAmount - expireLater;
|
||||||
if (allowZeroHop)
|
if (allowZeroHop)
|
||||||
howMany = standardAmount - usable;
|
remainingWanted -= fallback;
|
||||||
else
|
|
||||||
howMany = standardAmount - (usable - fallback);
|
|
||||||
|
|
||||||
int concurrentBuildWeight = 1;
|
for (int i = 0; i < expire270s && remainingWanted > 0; i++)
|
||||||
if (howMany > 0) {
|
remainingWanted--;
|
||||||
long now = _context.clock().now();
|
if (remainingWanted > 0) {
|
||||||
if (earliestExpire - now < 60*1000)
|
// 1x the tunnels expiring between 3.5 and 2.5 minutes from now
|
||||||
concurrentBuildWeight = 4; // right before expiration, allow up to 4x quantity tunnels to be pending
|
for (int i = 0; i < expire210s && remainingWanted > 0; i++) {
|
||||||
else if (earliestExpire - now < 120*1000)
|
remainingWanted--;
|
||||||
concurrentBuildWeight = 3; // allow up to 3x quantity tunnels to be pending from 1-2m
|
}
|
||||||
else if (earliestExpire - now < 180*1000)
|
if (remainingWanted > 0) {
|
||||||
concurrentBuildWeight = 2; // allow up to 2x quantity tunnels to be pending from 2-3m
|
// 2x the tunnels expiring between 2.5 and 1.5 minutes from now
|
||||||
|
for (int i = 0; i < expire150s && remainingWanted > 0; i++) {
|
||||||
// e.g. we want 3 tunnels, but only have 1 usable, we'd want 2 more. however, if the tunnels
|
remainingWanted--;
|
||||||
// expire in 90 seconds, we'd act like we wanted 6 (and assume 4 would fail building).
|
}
|
||||||
howMany = (howMany * concurrentBuildWeight) - inProgress;
|
if (remainingWanted > 0) {
|
||||||
|
for (int i = 0; i < expire90s && remainingWanted > 0; i++) {
|
||||||
|
remainingWanted--;
|
||||||
|
}
|
||||||
|
if (remainingWanted > 0) {
|
||||||
|
for (int i = 0; i < expire30s && remainingWanted > 0; i++) {
|
||||||
|
remainingWanted--;
|
||||||
|
}
|
||||||
|
if (remainingWanted > 0) {
|
||||||
|
rv = (((expire270s > 0) && _context.random().nextBoolean()) ? 1 : 0);
|
||||||
|
rv += expire210s;
|
||||||
|
rv += 2*expire150s;
|
||||||
|
rv += 4*expire90s;
|
||||||
|
rv += 6*expire30s;
|
||||||
|
rv += 6*remainingWanted;
|
||||||
|
rv -= inProgress;
|
||||||
|
rv -= expireLater;
|
||||||
|
} else {
|
||||||
|
rv = (((expire270s > 0) && _context.random().nextBoolean()) ? 1 : 0);
|
||||||
|
rv += expire210s;
|
||||||
|
rv += 2*expire150s;
|
||||||
|
rv += 4*expire90s;
|
||||||
|
rv += 6*expire30s;
|
||||||
|
rv -= inProgress;
|
||||||
|
rv -= expireLater;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
rv = (((expire270s > 0) && _context.random().nextBoolean()) ? 1 : 0);
|
||||||
|
rv += expire210s;
|
||||||
|
rv += 2*expire150s;
|
||||||
|
rv += 4*expire90s;
|
||||||
|
rv -= inProgress;
|
||||||
|
rv -= expireLater;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
rv = (((expire270s > 0) && _context.random().nextBoolean()) ? 1 : 0);
|
||||||
|
rv += expire210s;
|
||||||
|
rv += 2*expire150s;
|
||||||
|
rv -= inProgress;
|
||||||
|
rv -= expireLater;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
rv = (((expire270s > 0) && _context.random().nextBoolean()) ? 1 : 0);
|
||||||
|
rv += expire210s;
|
||||||
|
rv -= inProgress;
|
||||||
|
rv -= expireLater;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
rv = (((expire270s > 0) && _context.random().nextBoolean()) ? 1 : 0);
|
||||||
|
rv -= inProgress;
|
||||||
|
rv -= expireLater;
|
||||||
}
|
}
|
||||||
|
// yes, the above numbers and periods are completely arbitrary. suggestions welcome
|
||||||
int rv = howMany;
|
|
||||||
// ok, we're actually swamped with tunnels, so lets hold off on replacing the
|
|
||||||
// fallback ones for a bit
|
|
||||||
if ( (usable + inProgress + fallback > 2*standardAmount) && (howMany > 0) )
|
|
||||||
rv = 0;
|
|
||||||
|
|
||||||
if (allowZeroHop && (rv > standardAmount))
|
if (allowZeroHop && (rv > standardAmount))
|
||||||
rv = standardAmount;
|
rv = standardAmount;
|
||||||
|
|
||||||
|
if (rv + inProgress + expireLater + fallback > 4*standardAmount)
|
||||||
|
rv = 4*standardAmount - inProgress - expireLater - fallback;
|
||||||
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Count: rv: " + rv + " howMany " + howMany + " concurrentWeight " + concurrentBuildWeight
|
_log.debug("Count: rv: " + rv + " allow? " + allowZeroHop
|
||||||
+ " allow? " + allowZeroHop + " usable " + usable
|
+ " 30s " + expire30s + " 90s " + expire90s + " 150s " + expire150s + " 210s " + expire210s
|
||||||
|
+ " 270s " + expire270s + " later " + expireLater
|
||||||
+ " std " + standardAmount + " inProgress " + inProgress + " fallback " + fallback
|
+ " std " + standardAmount + " inProgress " + inProgress + " fallback " + fallback
|
||||||
+ " for " + toString());
|
+ " for " + toString());
|
||||||
|
|
||||||
@ -518,11 +575,11 @@ public class TunnelPool {
|
|||||||
// no inbound or outbound tunnels to send the request through, and
|
// no inbound or outbound tunnels to send the request through, and
|
||||||
// the pool is refusing 0 hop tunnels
|
// the pool is refusing 0 hop tunnels
|
||||||
if (peers == null) {
|
if (peers == null) {
|
||||||
if (_log.shouldLog(Log.ERROR))
|
if (_log.shouldLog(Log.WARN))
|
||||||
_log.error("No peers to put in the new tunnel! selectPeers returned null! boo, hiss!");
|
_log.warn("No peers to put in the new tunnel! selectPeers returned null! boo, hiss!");
|
||||||
} else {
|
} else {
|
||||||
if (_log.shouldLog(Log.ERROR))
|
if (_log.shouldLog(Log.WARN))
|
||||||
_log.error("No peers to put in the new tunnel! selectPeers returned an empty list?!");
|
_log.warn("No peers to put in the new tunnel! selectPeers returned an empty list?!");
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -73,7 +73,9 @@ public class TunnelPoolManager implements TunnelManagerFacade {
|
|||||||
|
|
||||||
/** pick an inbound tunnel not bound to a particular destination */
|
/** pick an inbound tunnel not bound to a particular destination */
|
||||||
public TunnelInfo selectInboundTunnel() {
|
public TunnelInfo selectInboundTunnel() {
|
||||||
TunnelInfo info = _inboundExploratory.selectTunnel();
|
TunnelPool pool = _inboundExploratory;
|
||||||
|
if (pool == null) return null;
|
||||||
|
TunnelInfo info = pool.selectTunnel();
|
||||||
if (info == null) {
|
if (info == null) {
|
||||||
_inboundExploratory.buildFallback();
|
_inboundExploratory.buildFallback();
|
||||||
// still can be null, but probably not
|
// still can be null, but probably not
|
||||||
@ -100,11 +102,13 @@ public class TunnelPoolManager implements TunnelManagerFacade {
|
|||||||
|
|
||||||
/** pick an outbound tunnel not bound to a particular destination */
|
/** pick an outbound tunnel not bound to a particular destination */
|
||||||
public TunnelInfo selectOutboundTunnel() {
|
public TunnelInfo selectOutboundTunnel() {
|
||||||
TunnelInfo info = _outboundExploratory.selectTunnel();
|
TunnelPool pool = _outboundExploratory;
|
||||||
|
if (pool == null) return null;
|
||||||
|
TunnelInfo info = pool.selectTunnel();
|
||||||
if (info == null) {
|
if (info == null) {
|
||||||
_outboundExploratory.buildFallback();
|
pool.buildFallback();
|
||||||
// still can be null, but probably not
|
// still can be null, but probably not
|
||||||
info = _outboundExploratory.selectTunnel();
|
info = pool.selectTunnel();
|
||||||
}
|
}
|
||||||
return info;
|
return info;
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user