new throttling code which rejects tunnel create requests, networkDb lookup requests, and even tells the I2NP components to stop reading from the network (it doesnt affect writing to the network)
the simple RouterThrottleImpl bases its decision entirely on how congested the jobQueue is - if there are jobs that have been waiting 5+ seconds, reject everything and stop reading from the network (each i2npMessageReader randomly waits .5-1s when throttled before rechecking it) minor adjustments in the stats published - removing a few useless ones and adding the router.throttleNetworkCause (which is the average ms lag in the jobQueue when an I2NP reader is throttled)
This commit is contained in:
@ -114,6 +114,9 @@ public class I2NPMessageReader {
|
|||||||
public void run() {
|
public void run() {
|
||||||
while (_stayAlive) {
|
while (_stayAlive) {
|
||||||
while (_doRun) {
|
while (_doRun) {
|
||||||
|
while (!_context.throttle().acceptNetworkMessage()) {
|
||||||
|
try { Thread.sleep(500 + _context.random().nextInt(512)); } catch (InterruptedException ie) {}
|
||||||
|
}
|
||||||
// do read
|
// do read
|
||||||
try {
|
try {
|
||||||
I2NPMessage msg = _handler.readMessage(_stream);
|
I2NPMessage msg = _handler.readMessage(_stream);
|
||||||
|
@ -49,6 +49,7 @@ public class RouterContext extends I2PAppContext {
|
|||||||
private Shitlist _shitlist;
|
private Shitlist _shitlist;
|
||||||
private MessageValidator _messageValidator;
|
private MessageValidator _messageValidator;
|
||||||
private MessageStateMonitor _messageStateMonitor;
|
private MessageStateMonitor _messageStateMonitor;
|
||||||
|
private RouterThrottle _throttle;
|
||||||
private Calculator _isFailingCalc;
|
private Calculator _isFailingCalc;
|
||||||
private Calculator _integrationCalc;
|
private Calculator _integrationCalc;
|
||||||
private Calculator _speedCalc;
|
private Calculator _speedCalc;
|
||||||
@ -83,6 +84,7 @@ public class RouterContext extends I2PAppContext {
|
|||||||
_statPublisher = new StatisticsManager(this);
|
_statPublisher = new StatisticsManager(this);
|
||||||
_shitlist = new Shitlist(this);
|
_shitlist = new Shitlist(this);
|
||||||
_messageValidator = new MessageValidator(this);
|
_messageValidator = new MessageValidator(this);
|
||||||
|
_throttle = new RouterThrottleImpl(this);
|
||||||
_isFailingCalc = new IsFailingCalculator(this);
|
_isFailingCalc = new IsFailingCalculator(this);
|
||||||
_integrationCalc = new IntegrationCalculator(this);
|
_integrationCalc = new IntegrationCalculator(this);
|
||||||
_speedCalc = new SpeedCalculator(this);
|
_speedCalc = new SpeedCalculator(this);
|
||||||
@ -188,6 +190,11 @@ public class RouterContext extends I2PAppContext {
|
|||||||
* well as other criteria for "validity".
|
* well as other criteria for "validity".
|
||||||
*/
|
*/
|
||||||
public MessageValidator messageValidator() { return _messageValidator; }
|
public MessageValidator messageValidator() { return _messageValidator; }
|
||||||
|
/**
|
||||||
|
* Component to coordinate our accepting/rejecting of requests under load
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public RouterThrottle throttle() { return _throttle; }
|
||||||
|
|
||||||
/** how do we rank the failure of profiles? */
|
/** how do we rank the failure of profiles? */
|
||||||
public Calculator isFailingCalculator() { return _isFailingCalc; }
|
public Calculator isFailingCalculator() { return _isFailingCalc; }
|
||||||
|
34
router/java/src/net/i2p/router/RouterThrottle.java
Normal file
34
router/java/src/net/i2p/router/RouterThrottle.java
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
package net.i2p.router;
|
||||||
|
|
||||||
|
import net.i2p.data.Hash;
|
||||||
|
import net.i2p.data.i2np.TunnelCreateMessage;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gatekeeper for deciding whether to throttle the further processing
|
||||||
|
* of messages through the router. This is seperate from the bandwidth
|
||||||
|
* limiting which simply makes sure the bytes transferred dont exceed the
|
||||||
|
* bytes allowed (though the router throttle should take into account the
|
||||||
|
* current bandwidth usage and limits when determining whether to accept or
|
||||||
|
* reject certain activities, such as tunnels)
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public interface RouterThrottle {
|
||||||
|
/**
|
||||||
|
* Should we accept any more data from the network for any sort of message,
|
||||||
|
* taking into account our current load, or should we simply slow down?
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public boolean acceptNetworkMessage();
|
||||||
|
/**
|
||||||
|
* Should we accept the request to participate in the given tunnel,
|
||||||
|
* taking into account our current load and bandwidth usage commitments?
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public boolean acceptTunnelRequest(TunnelCreateMessage msg);
|
||||||
|
/**
|
||||||
|
* Should we accept the netDb lookup message, replying either with the
|
||||||
|
* value or some closer peers, or should we simply drop it due to overload?
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public boolean acceptNetDbLookupRequest(Hash key);
|
||||||
|
}
|
82
router/java/src/net/i2p/router/RouterThrottleImpl.java
Normal file
82
router/java/src/net/i2p/router/RouterThrottleImpl.java
Normal file
@ -0,0 +1,82 @@
|
|||||||
|
package net.i2p.router;
|
||||||
|
|
||||||
|
import net.i2p.data.Hash;
|
||||||
|
import net.i2p.data.i2np.TunnelCreateMessage;
|
||||||
|
import net.i2p.util.Log;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simple throttle that basically stops accepting messages or nontrivial
|
||||||
|
* requests if the jobQueue lag is too large.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
class RouterThrottleImpl implements RouterThrottle {
|
||||||
|
private RouterContext _context;
|
||||||
|
private Log _log;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* arbitrary hard limit of 5 seconds - if its taking this long to get
|
||||||
|
* to a job, we're congested.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
private static int JOB_LAG_LIMIT = 5000;
|
||||||
|
|
||||||
|
public RouterThrottleImpl(RouterContext context) {
|
||||||
|
_context = context;
|
||||||
|
_log = context.logManager().getLog(RouterThrottleImpl.class);
|
||||||
|
_context.statManager().createRateStat("router.throttleNetworkCause", "How lagged the jobQueue was when an I2NP was throttled", "Throttle", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
|
||||||
|
_context.statManager().createRateStat("router.throttleNetDbCause", "How lagged the jobQueue was when a networkDb request was throttled", "Throttle", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
|
||||||
|
_context.statManager().createRateStat("router.throttleTunnelCause", "How lagged the jobQueue was when a tunnel request was throttled", "Throttle", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
|
||||||
|
_context.statManager().createRateStat("tunnel.bytesAllocatedAtAccept", "How many bytes had been 'allocated' for participating tunnels when we accepted a request?", "Tunnels", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean acceptNetworkMessage() {
|
||||||
|
long lag = _context.jobQueue().getMaxLag();
|
||||||
|
if (lag > JOB_LAG_LIMIT) {
|
||||||
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
|
_log.debug("Throttling network reader, as the job lag is " + lag);
|
||||||
|
_context.statManager().addRateData("router.throttleNetworkCause", lag, lag);
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean acceptNetDbLookupRequest(Hash key) {
|
||||||
|
long lag = _context.jobQueue().getMaxLag();
|
||||||
|
if (lag > JOB_LAG_LIMIT) {
|
||||||
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
|
_log.debug("Refusing netDb request, as the job lag is " + lag);
|
||||||
|
_context.statManager().addRateData("router.throttleNetDbCause", lag, lag);
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public boolean acceptTunnelRequest(TunnelCreateMessage msg) {
|
||||||
|
long lag = _context.jobQueue().getMaxLag();
|
||||||
|
if (lag > JOB_LAG_LIMIT) {
|
||||||
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
|
_log.debug("Refusing tunnel request, as the job lag is " + lag);
|
||||||
|
_context.statManager().addRateData("router.throttleTunnelCause", lag, lag);
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
// ok, we're not hosed, but can we handle the bandwidth requirements
|
||||||
|
// of another tunnel?
|
||||||
|
double msgsPerTunnel = _context.statManager().getRate("tunnel.participatingMessagesProcessed").getRate(10*60*1000).getAverageValue();
|
||||||
|
double bytesPerMsg = _context.statManager().getRate("tunnel.relayMessageSize").getRate(10*60*1000).getAverageValue();
|
||||||
|
double bytesPerTunnel = msgsPerTunnel * bytesPerMsg;
|
||||||
|
|
||||||
|
|
||||||
|
int numTunnels = _context.tunnelManager().getParticipatingCount();
|
||||||
|
double bytesAllocated = (numTunnels + 1) * bytesPerTunnel;
|
||||||
|
|
||||||
|
_context.statManager().addRateData("tunnel.bytesAllocatedAtAccept", (long)bytesAllocated, msg.getTunnelDurationSeconds()*1000);
|
||||||
|
// todo: um, throttle (include bw usage of the netDb, our own tunnels, the clients,
|
||||||
|
// and check to see that they are less than the bandwidth limits
|
||||||
|
|
||||||
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
|
_log.debug("Accepting a new tunnel request (now allocating " + bytesAllocated + " bytes across " + numTunnels + " tunnels");
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -106,7 +106,7 @@ public class StatisticsManager implements Service {
|
|||||||
includeRate("crypto.garlic.decryptFail", stats, new long[] { 60*60*1000, 24*60*60*1000 });
|
includeRate("crypto.garlic.decryptFail", stats, new long[] { 60*60*1000, 24*60*60*1000 });
|
||||||
includeRate("tunnel.unknownTunnelTimeLeft", stats, new long[] { 60*60*1000, 24*60*60*1000 });
|
includeRate("tunnel.unknownTunnelTimeLeft", stats, new long[] { 60*60*1000, 24*60*60*1000 });
|
||||||
includeRate("jobQueue.readyJobs", stats, new long[] { 60*1000, 60*60*1000 });
|
includeRate("jobQueue.readyJobs", stats, new long[] { 60*1000, 60*60*1000 });
|
||||||
includeRate("jobQueue.droppedJobs", stats, new long[] { 60*60*1000, 24*60*60*1000 });
|
//includeRate("jobQueue.droppedJobs", stats, new long[] { 60*60*1000, 24*60*60*1000 });
|
||||||
includeRate("inNetPool.dropped", stats, new long[] { 60*60*1000, 24*60*60*1000 });
|
includeRate("inNetPool.dropped", stats, new long[] { 60*60*1000, 24*60*60*1000 });
|
||||||
includeRate("tunnel.participatingTunnels", stats, new long[] { 5*60*1000, 60*60*1000 });
|
includeRate("tunnel.participatingTunnels", stats, new long[] { 5*60*1000, 60*60*1000 });
|
||||||
includeRate("tunnel.testSuccessTime", stats, new long[] { 60*60*1000l, 24*60*60*1000l });
|
includeRate("tunnel.testSuccessTime", stats, new long[] { 60*60*1000l, 24*60*60*1000l });
|
||||||
@ -114,6 +114,7 @@ public class StatisticsManager implements Service {
|
|||||||
includeRate("tunnel.inboundMessagesProcessed", stats, new long[] { 10*60*1000, 60*60*1000 });
|
includeRate("tunnel.inboundMessagesProcessed", stats, new long[] { 10*60*1000, 60*60*1000 });
|
||||||
includeRate("tunnel.participatingMessagesProcessed", stats, new long[] { 10*60*1000, 60*60*1000 });
|
includeRate("tunnel.participatingMessagesProcessed", stats, new long[] { 10*60*1000, 60*60*1000 });
|
||||||
includeRate("tunnel.expiredAfterAcceptTime", stats, new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
includeRate("tunnel.expiredAfterAcceptTime", stats, new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||||
|
includeRate("tunnel.bytesAllocatedAtAccept", stats, new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||||
includeRate("netDb.lookupsReceived", stats, new long[] { 5*60*1000, 60*60*1000 });
|
includeRate("netDb.lookupsReceived", stats, new long[] { 5*60*1000, 60*60*1000 });
|
||||||
includeRate("netDb.lookupsHandled", stats, new long[] { 5*60*1000, 60*60*1000 });
|
includeRate("netDb.lookupsHandled", stats, new long[] { 5*60*1000, 60*60*1000 });
|
||||||
includeRate("netDb.lookupsMatched", stats, new long[] { 5*60*1000, 60*60*1000 });
|
includeRate("netDb.lookupsMatched", stats, new long[] { 5*60*1000, 60*60*1000 });
|
||||||
@ -121,16 +122,17 @@ public class StatisticsManager implements Service {
|
|||||||
includeRate("netDb.successPeers", stats, new long[] { 60*60*1000 });
|
includeRate("netDb.successPeers", stats, new long[] { 60*60*1000 });
|
||||||
includeRate("netDb.failedPeers", stats, new long[] { 60*60*1000 });
|
includeRate("netDb.failedPeers", stats, new long[] { 60*60*1000 });
|
||||||
includeRate("netDb.searchCount", stats, new long[] { 3*60*60*1000});
|
includeRate("netDb.searchCount", stats, new long[] { 3*60*60*1000});
|
||||||
includeRate("inNetMessage.timeToDiscard", stats, new long[] { 5*60*1000, 10*60*1000, 60*60*1000 });
|
//includeRate("inNetMessage.timeToDiscard", stats, new long[] { 5*60*1000, 10*60*1000, 60*60*1000 });
|
||||||
includeRate("outNetMessage.timeToDiscard", stats, new long[] { 5*60*1000, 10*60*1000, 60*60*1000 });
|
//includeRate("outNetMessage.timeToDiscard", stats, new long[] { 5*60*1000, 10*60*1000, 60*60*1000 });
|
||||||
|
includeRate("router.throttleNetworkCause", stats, new long[] { 10*60*1000, 60*60*1000 });
|
||||||
includeRate("transport.receiveMessageSize", stats, new long[] { 5*60*1000, 60*60*1000 });
|
includeRate("transport.receiveMessageSize", stats, new long[] { 5*60*1000, 60*60*1000 });
|
||||||
includeRate("transport.sendMessageSize", stats, new long[] { 5*60*1000, 60*60*1000 });
|
//includeRate("transport.sendMessageSize", stats, new long[] { 5*60*1000, 60*60*1000 });
|
||||||
includeRate("transport.sendMessageSmall", stats, new long[] { 5*60*1000, 60*60*1000 });
|
//includeRate("transport.sendMessageSmall", stats, new long[] { 5*60*1000, 60*60*1000 });
|
||||||
includeRate("transport.sendMessageMedium", stats, new long[] { 5*60*1000, 60*60*1000 });
|
//includeRate("transport.sendMessageMedium", stats, new long[] { 5*60*1000, 60*60*1000 });
|
||||||
includeRate("transport.sendMessageLarge", stats, new long[] { 5*60*1000, 60*60*1000 });
|
//includeRate("transport.sendMessageLarge", stats, new long[] { 5*60*1000, 60*60*1000 });
|
||||||
includeRate("transport.receiveMessageSmall", stats, new long[] { 5*60*1000, 60*60*1000 });
|
//includeRate("transport.receiveMessageSmall", stats, new long[] { 5*60*1000, 60*60*1000 });
|
||||||
includeRate("transport.receiveMessageMedium", stats, new long[] { 5*60*1000, 60*60*1000 });
|
//includeRate("transport.receiveMessageMedium", stats, new long[] { 5*60*1000, 60*60*1000 });
|
||||||
includeRate("transport.receiveMessageLarge", stats, new long[] { 5*60*1000, 60*60*1000 });
|
//includeRate("transport.receiveMessageLarge", stats, new long[] { 5*60*1000, 60*60*1000 });
|
||||||
includeRate("client.sendAckTime", stats, new long[] { 60*60*1000, 24*60*60*1000l }, true);
|
includeRate("client.sendAckTime", stats, new long[] { 60*60*1000, 24*60*60*1000l }, true);
|
||||||
stats.setProperty("stat_uptime", DataHelper.formatDuration(_context.router().getUptime()));
|
stats.setProperty("stat_uptime", DataHelper.formatDuration(_context.router().getUptime()));
|
||||||
stats.setProperty("stat__rateKey", "avg;maxAvg;pctLifetime;[sat;satLim;maxSat;maxSatLim;][num;lifetimeFreq;maxFreq]");
|
stats.setProperty("stat__rateKey", "avg;maxAvg;pctLifetime;[sat;satLim;maxSat;maxSatLim;][num;lifetimeFreq;maxFreq]");
|
||||||
|
@ -16,6 +16,7 @@ import net.i2p.data.i2np.SourceRouteBlock;
|
|||||||
import net.i2p.router.HandlerJobBuilder;
|
import net.i2p.router.HandlerJobBuilder;
|
||||||
import net.i2p.router.Job;
|
import net.i2p.router.Job;
|
||||||
import net.i2p.router.RouterContext;
|
import net.i2p.router.RouterContext;
|
||||||
|
import net.i2p.util.Log;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Build a HandleDatabaseLookupMessageJob whenever a DatabaseLookupMessage arrives
|
* Build a HandleDatabaseLookupMessageJob whenever a DatabaseLookupMessage arrives
|
||||||
@ -23,14 +24,24 @@ import net.i2p.router.RouterContext;
|
|||||||
*/
|
*/
|
||||||
public class DatabaseLookupMessageHandler implements HandlerJobBuilder {
|
public class DatabaseLookupMessageHandler implements HandlerJobBuilder {
|
||||||
private RouterContext _context;
|
private RouterContext _context;
|
||||||
|
private Log _log;
|
||||||
public DatabaseLookupMessageHandler(RouterContext context) {
|
public DatabaseLookupMessageHandler(RouterContext context) {
|
||||||
_context = context;
|
_context = context;
|
||||||
|
_log = context.logManager().getLog(DatabaseLookupMessageHandler.class);
|
||||||
_context.statManager().createRateStat("netDb.lookupsReceived", "How many netDb lookups have we received?", "Network Database", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
_context.statManager().createRateStat("netDb.lookupsReceived", "How many netDb lookups have we received?", "Network Database", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||||
|
_context.statManager().createRateStat("netDb.lookupsDropped", "How many netDb lookups did we drop due to throttling?", "Network Database", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||||
}
|
}
|
||||||
|
|
||||||
public Job createJob(I2NPMessage receivedMessage, RouterIdentity from, Hash fromHash, SourceRouteBlock replyBlock) {
|
public Job createJob(I2NPMessage receivedMessage, RouterIdentity from, Hash fromHash, SourceRouteBlock replyBlock) {
|
||||||
_context.statManager().addRateData("netDb.lookupsReceived", 1, 0);
|
_context.statManager().addRateData("netDb.lookupsReceived", 1, 0);
|
||||||
// ignore the reply block for the moment
|
|
||||||
return new HandleDatabaseLookupMessageJob(_context, (DatabaseLookupMessage)receivedMessage, from, fromHash);
|
if (_context.throttle().acceptNetDbLookupRequest(((DatabaseLookupMessage)receivedMessage).getSearchKey())) {
|
||||||
|
return new HandleDatabaseLookupMessageJob(_context, (DatabaseLookupMessage)receivedMessage, from, fromHash);
|
||||||
|
} else {
|
||||||
|
if (_log.shouldLog(Log.INFO))
|
||||||
|
_log.info("Dropping lookup request as throttled");
|
||||||
|
_context.statManager().addRateData("netDb.lookupsDropped", 1, 1);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -39,6 +39,7 @@ public class HandleTunnelCreateMessageJob extends JobImpl {
|
|||||||
RouterIdentity from, Hash fromHash, SourceRouteBlock replyBlock) {
|
RouterIdentity from, Hash fromHash, SourceRouteBlock replyBlock) {
|
||||||
super(ctx);
|
super(ctx);
|
||||||
_log = ctx.logManager().getLog(HandleTunnelCreateMessageJob.class);
|
_log = ctx.logManager().getLog(HandleTunnelCreateMessageJob.class);
|
||||||
|
ctx.statManager().createRateStat("tunnel.rejectOverloaded", "How many tunnels did we deny due to throttling?", "Tunnels", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||||
_message = receivedMessage;
|
_message = receivedMessage;
|
||||||
_from = from;
|
_from = from;
|
||||||
_fromHash = fromHash;
|
_fromHash = fromHash;
|
||||||
@ -83,8 +84,13 @@ public class HandleTunnelCreateMessageJob extends JobImpl {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private boolean isOverloaded() {
|
private boolean isOverloaded() {
|
||||||
// hmmm....
|
boolean shouldAccept = _context.throttle().acceptTunnelRequest(_message);
|
||||||
return false;
|
if (!shouldAccept) {
|
||||||
|
_context.statManager().addRateData("tunnel.rejectOverloaded", 1, 1);
|
||||||
|
if (_log.shouldLog(Log.INFO))
|
||||||
|
_log.info("Refusing tunnel request due to overload");
|
||||||
|
}
|
||||||
|
return !shouldAccept;
|
||||||
}
|
}
|
||||||
|
|
||||||
private class TestJob extends JobImpl {
|
private class TestJob extends JobImpl {
|
||||||
|
Reference in New Issue
Block a user