* BuildHandler:

- Limit request queue size
      - Concurrent request queue
      - Remove dead code for queued rely handling
This commit is contained in:
zzz
2011-03-02 17:18:37 +00:00
parent 28bd1802b4
commit f938cc7b0c
3 changed files with 100 additions and 211 deletions

View File

@ -709,7 +709,8 @@ public class TunnelDispatcher implements Service {
_validator = new BloomFilterIVValidator(_context, getShareBandwidth(_context));
}
private static int getShareBandwidth(RouterContext ctx) {
/** @return in KBps */
public static int getShareBandwidth(RouterContext ctx) {
int irateKBps = ctx.bandwidthLimiter().getInboundKBytesPerSecond();
int orateKBps = ctx.bandwidthLimiter().getOutboundKBytesPerSecond();
double pct = ctx.router().getSharePercentage();

View File

@ -27,9 +27,9 @@ import net.i2p.util.Log;
*/
class BuildExecutor implements Runnable {
private final ArrayList<Long> _recentBuildIds = new ArrayList(100);
private RouterContext _context;
private Log _log;
private TunnelPoolManager _manager;
private final RouterContext _context;
private final Log _log;
private final TunnelPoolManager _manager;
/** list of TunnelCreatorConfig elements of tunnels currently being built */
private final Object _currentlyBuilding;
/** indexed by ptcc.getReplyMessageId() */
@ -37,7 +37,7 @@ class BuildExecutor implements Runnable {
/** indexed by ptcc.getReplyMessageId() */
private final ConcurrentHashMap<Long, PooledTunnelCreatorConfig> _recentlyBuildingMap;
private boolean _isRunning;
private BuildHandler _handler;
private final BuildHandler _handler;
private boolean _repoll;
private static final int MAX_CONCURRENT_BUILDS = 10;
/** accept replies up to a minute after we gave up on them */
@ -248,8 +248,6 @@ class BuildExecutor implements Runnable {
int pendingRemaining = 0;
//long loopBegin = 0;
//long beforeHandleInboundReplies = 0;
//long afterHandleInboundReplies = 0;
//long afterBuildZeroHop = 0;
long afterBuildReal = 0;
long afterHandleInbound = 0;
@ -268,10 +266,6 @@ class BuildExecutor implements Runnable {
wanted.add(pool);
}
//beforeHandleInboundReplies = System.currentTimeMillis();
_handler.handleInboundReplies();
//afterHandleInboundReplies = System.currentTimeMillis();
// allowed() also expires timed out requests (for new style requests)
int allowed = allowed();
@ -327,9 +321,6 @@ class BuildExecutor implements Runnable {
_log.debug("Configuring new tunnel " + i + " for " + pool + ": " + cfg);
buildTunnel(pool, cfg);
realBuilt++;
// we want replies to go to the top of the queue
_handler.handleInboundReplies();
} else {
i--;
}
@ -391,10 +382,8 @@ class BuildExecutor implements Runnable {
* This prevents a large number of client pools from starving the exploratory pool.
*
*/
private static class TunnelPoolComparator implements Comparator {
public int compare(Object l, Object r) {
TunnelPool tpl = (TunnelPool) l;
TunnelPool tpr = (TunnelPool) r;
private static class TunnelPoolComparator implements Comparator<TunnelPool> {
public int compare(TunnelPool tpl, TunnelPool tpr) {
if (tpl.getSettings().isExploratory() && !tpr.getSettings().isExploratory())
return -1;
if (tpr.getSettings().isExploratory() && !tpl.getSettings().isExploratory())

View File

@ -1,7 +1,7 @@
package net.i2p.router.tunnel.pool;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.data.Base64;
import net.i2p.data.ByteArray;
@ -27,14 +27,20 @@ import net.i2p.router.peermanager.TunnelHistory;
import net.i2p.router.tunnel.BuildMessageProcessor;
import net.i2p.router.tunnel.BuildReplyHandler;
import net.i2p.router.tunnel.HopConfig;
import net.i2p.router.tunnel.TunnelDispatcher;
import net.i2p.stat.Rate;
import net.i2p.stat.RateStat;
import net.i2p.util.Log;
/**
* Handle the received tunnel build message requests and replies,
* including sending responsses to requests, updating the
* lists of our tunnels and participating tunnels,
* and updating stats.
*
* Replies are handled immediately on reception; requests are queued.
*
* Note that 10 minute tunnel expiration is hardcoded in here.
*
*/
class BuildHandler {
private final RouterContext _context;
@ -42,29 +48,21 @@ class BuildHandler {
private final BuildExecutor _exec;
private final Job _buildMessageHandlerJob;
private final Job _buildReplyMessageHandlerJob;
/** list of BuildMessageState, oldest first */
private final List<BuildMessageState> _inboundBuildMessages;
/** list of BuildReplyMessageState, oldest first - unused unless HANDLE_REPLIES_INLINE == false */
private final List<BuildReplyMessageState> _inboundBuildReplyMessages;
/** list of BuildEndMessageState, oldest first - unused unless HANDLE_REPLIES_INLINE == false */
private final List<BuildEndMessageState> _inboundBuildEndMessages;
private final LinkedBlockingQueue<BuildMessageState> _inboundBuildMessages;
private final BuildMessageProcessor _processor;
private final ParticipatingThrottler _throttler;
private static final boolean HANDLE_REPLIES_INLINE = true;
/** TODO these may be too high, review and adjust */
private static final int MIN_QUEUE = 12;
private static final int MAX_QUEUE = 96;
public BuildHandler(RouterContext ctx, BuildExecutor exec) {
_context = ctx;
_log = ctx.logManager().getLog(getClass());
_exec = exec;
_inboundBuildMessages = new ArrayList(16);
if (HANDLE_REPLIES_INLINE) {
_inboundBuildEndMessages = null;
_inboundBuildReplyMessages = null;
} else {
_inboundBuildEndMessages = new ArrayList(16);
_inboundBuildReplyMessages = new ArrayList(16);
}
// Queue size = 12 * share BW / 48K
int sz = Math.min(MAX_QUEUE, Math.max(MIN_QUEUE, TunnelDispatcher.getShareBandwidth(ctx) * MIN_QUEUE / 48));
_inboundBuildMessages = new LinkedBlockingQueue(sz);
_context.statManager().createRateStat("tunnel.reject.10", "How often we reject a tunnel probabalistically", "Tunnels", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("tunnel.reject.20", "How often we reject a tunnel because of transient overload", "Tunnels", new long[] { 60*1000, 10*60*1000 });
@ -94,6 +92,7 @@ class BuildHandler {
_context.statManager().createRateStat("tunnel.corruptBuildReply", "", "Tunnels", new long[] { 24*60*60*1000l });
_processor = new BuildMessageProcessor(ctx);
_throttler = new ParticipatingThrottler(ctx);
_buildMessageHandlerJob = new TunnelBuildMessageHandlerJob(ctx);
_buildReplyMessageHandlerJob = new TunnelBuildReplyMessageHandlerJob(ctx);
TunnelBuildMessageHandlerJobBuilder tbmhjb = new TunnelBuildMessageHandlerJobBuilder();
@ -102,7 +101,6 @@ class BuildHandler {
ctx.inNetMessagePool().registerHandlerJobBuilder(TunnelBuildReplyMessage.MESSAGE_TYPE, tbrmhjb);
ctx.inNetMessagePool().registerHandlerJobBuilder(VariableTunnelBuildMessage.MESSAGE_TYPE, tbmhjb);
ctx.inNetMessagePool().registerHandlerJobBuilder(VariableTunnelBuildReplyMessage.MESSAGE_TYPE, tbrmhjb);
_throttler = new ParticipatingThrottler(ctx);
}
private static final int MAX_HANDLE_AT_ONCE = 2;
@ -110,122 +108,37 @@ class BuildHandler {
/**
* Blocking call to handle a few of the pending inbound requests, returning how many
* requests remain after this pass
* requests remain after this pass. This is called by BuildExecutor.
*/
int handleInboundRequests() {
int dropExpired = 0;
int remaining = 0;
List handled = null;
long beforeFindHandled = System.currentTimeMillis();
synchronized (_inboundBuildMessages) {
int toHandle = _inboundBuildMessages.size();
if (toHandle > 0) {
if (toHandle > MAX_HANDLE_AT_ONCE)
toHandle = MAX_HANDLE_AT_ONCE;
handled = new ArrayList(toHandle);
//if (false) {
// for (int i = 0; i < toHandle; i++) // LIFO for lower response time (should we RED it for DoS?)
// handled.add(_inboundBuildMessages.remove(_inboundBuildMessages.size()-1));
//} else {
// drop any expired messages
long dropBefore = System.currentTimeMillis() - (BuildRequestor.REQUEST_TIMEOUT/4);
do {
BuildMessageState state = (BuildMessageState)_inboundBuildMessages.get(0);
if (state.recvTime <= dropBefore) {
_inboundBuildMessages.remove(0);
dropExpired++;
if (_log.shouldLog(Log.WARN))
_log.warn("Not even trying to handle/decrypt the request " + state.msg.getUniqueId()
+ ", since we received it a long time ago: " + (System.currentTimeMillis() - state.recvTime));
_context.statManager().addRateData("tunnel.dropLoadDelay", System.currentTimeMillis() - state.recvTime, 0);
} else {
break;
}
} while (!_inboundBuildMessages.isEmpty());
if (dropExpired > 0)
_context.throttle().setTunnelStatus(_x("Dropping tunnel requests: Too slow"));
for (int i = 0; i < MAX_HANDLE_AT_ONCE; ) {
BuildMessageState state = _inboundBuildMessages.poll();
if (state == null)
return 0;
long dropBefore = System.currentTimeMillis() - (BuildRequestor.REQUEST_TIMEOUT/4);
if (state.recvTime <= dropBefore) {
if (_log.shouldLog(Log.WARN))
_log.warn("Not even trying to handle/decrypt the request " + state.msg.getUniqueId()
+ ", since we received it a long time ago: " + (System.currentTimeMillis() - state.recvTime));
_context.statManager().addRateData("tunnel.dropLoadDelay", System.currentTimeMillis() - state.recvTime, 0);
_context.throttle().setTunnelStatus(_x("Dropping tunnel requests: Too slow"));
continue;
}
// now pull off the oldest requests first (we're doing a tail-drop
// when adding)
for (int i = 0; i < toHandle && !_inboundBuildMessages.isEmpty(); i++)
handled.add(_inboundBuildMessages.remove(0));
//}
}
remaining = _inboundBuildMessages.size();
}
if (handled != null) {
i++;
long beforeHandle = System.currentTimeMillis();
long actualTime = handleRequest(state);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handling " + handled.size() + " requests (took " + (System.currentTimeMillis()-beforeFindHandled) + "ms to find them)");
for (int i = 0; i < handled.size(); i++) {
BuildMessageState state = (BuildMessageState)handled.get(i);
long beforeHandle = System.currentTimeMillis();
long actualTime = handleRequest(state);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handle took " + (System.currentTimeMillis()-beforeHandle) + "/" + actualTime + " (" + i + " out of " + handled.size() + " with " + remaining + " remaining)");
}
handled.clear();
_log.debug("Handle took " + (System.currentTimeMillis()-beforeHandle) + "/" + actualTime +
" (" + i + " with " + _inboundBuildMessages.size() + " remaining)");
}
if (!HANDLE_REPLIES_INLINE) {
synchronized (_inboundBuildEndMessages) {
int toHandle = _inboundBuildEndMessages.size();
if (toHandle > 0) {
if (handled == null)
handled = new ArrayList(_inboundBuildEndMessages);
else
handled.addAll(_inboundBuildEndMessages);
_inboundBuildEndMessages.clear();
}
}
}
if (handled != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handling " + handled.size() + " requests that are actually replies");
// these are inbound build messages that actually contain the full replies, since
// they are for inbound tunnels we have created
for (int i = 0; i < handled.size(); i++) {
BuildEndMessageState state = (BuildEndMessageState)handled.get(i);
handleRequestAsInboundEndpoint(state);
}
}
// anything else?
/*
synchronized (_inboundBuildMessages) {
int remaining = _inboundBuildMessages.size();
return remaining;
}
*/
int remaining = _inboundBuildMessages.size();
if (remaining > 0)
_context.statManager().addRateData("tunnel.handleRemaining", remaining, 0);
return remaining;
}
/** Warning - noop if HANDLE_REPLIES_INLINE == true */
void handleInboundReplies() {
if (HANDLE_REPLIES_INLINE)
return;
List handled = null;
synchronized (_inboundBuildReplyMessages) {
int toHandle = _inboundBuildReplyMessages.size();
if (toHandle > 0) {
// always handle all of them - they're replies that we were waiting for!
handled = new ArrayList(_inboundBuildReplyMessages);
_inboundBuildReplyMessages.clear();
}
}
if (handled != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handling " + handled.size() + " replies");
for (int i = 0; i < handled.size(); i++) {
BuildReplyMessageState state = (BuildReplyMessageState)handled.get(i);
handleReply(state);
}
}
}
private void handleReply(BuildReplyMessageState state) {
// search through the tunnels for a reply
long replyMessageId = state.msg.getUniqueId();
@ -343,7 +256,7 @@ class BuildHandler {
}
}
/** @return handle time or -1 */
/** @return handle time or -1 if it wasn't completely handled */
private long handleRequest(BuildMessageState state) {
long timeSinceReceived = System.currentTimeMillis()-state.recvTime;
if (_log.shouldLog(Log.DEBUG))
@ -365,7 +278,7 @@ class BuildHandler {
BuildRequestRecord req = _processor.decrypt(_context, state.msg, _context.routerHash(), _context.keyManager().getPrivateKey());
long decryptTime = System.currentTimeMillis() - beforeDecrypt;
_context.statManager().addRateData("tunnel.decryptRequestTime", decryptTime, decryptTime);
if (decryptTime > 500)
if (decryptTime > 500 && _log.shouldLog(Log.WARN))
_log.warn("Took too long to decrypt the request: " + decryptTime + " for message " + state.msg.getUniqueId() + " received " + (timeSinceReceived+decryptTime) + " ago");
if (req == null) {
// no records matched, or the decryption failed. bah
@ -379,7 +292,7 @@ class BuildHandler {
long readPeerTime = System.currentTimeMillis()-beforeLookup;
RouterInfo nextPeerInfo = _context.netDb().lookupRouterInfoLocally(nextPeer);
long lookupTime = System.currentTimeMillis()-beforeLookup;
if (lookupTime > 500)
if (lookupTime > 500 && _log.shouldLog(Log.WARN))
_log.warn("Took too long to lookup the request: " + lookupTime + "/" + readPeerTime + " for message " + state.msg.getUniqueId() + " received " + (timeSinceReceived+decryptTime) + " ago");
if (nextPeerInfo == null) {
if (_log.shouldLog(Log.DEBUG))
@ -416,9 +329,9 @@ class BuildHandler {
}
private class HandleReq extends JobImpl {
private BuildMessageState _state;
private BuildRequestRecord _req;
private Hash _nextPeer;
private final BuildMessageState _state;
private final BuildRequestRecord _req;
private final Hash _nextPeer;
HandleReq(RouterContext ctx, BuildMessageState state, BuildRequestRecord req, Hash nextPeer) {
super(ctx);
_state = state;
@ -439,9 +352,9 @@ class BuildHandler {
}
private static class TimeoutReq extends JobImpl {
private BuildMessageState _state;
private BuildRequestRecord _req;
private Hash _nextPeer;
private final BuildMessageState _state;
private final BuildRequestRecord _req;
private final Hash _nextPeer;
TimeoutReq(RouterContext ctx, BuildMessageState state, BuildRequestRecord req, Hash nextPeer) {
super(ctx);
_state = state;
@ -580,7 +493,7 @@ class BuildHandler {
Hash from = state.fromHash;
if (from == null)
from = state.from.calculateHash();
if (_throttler.shouldThrottle(from)) {
if (from != null && _throttler.shouldThrottle(from)) {
if (_log.shouldLog(Log.WARN))
_log.warn("Rejecting tunnel (hop throttle), previous hop: " + from);
// no setTunnelStatus() indication
@ -731,9 +644,7 @@ class BuildHandler {
}
public int getInboundBuildQueueSize() {
synchronized (_inboundBuildMessages) {
return _inboundBuildMessages.size();
}
}
/**
@ -756,14 +667,7 @@ class BuildHandler {
_log.error("received it, but its not inbound? " + cfg);
}
BuildEndMessageState state = new BuildEndMessageState(cfg, receivedMessage);
if (HANDLE_REPLIES_INLINE) {
handleRequestAsInboundEndpoint(state);
} else {
synchronized (_inboundBuildEndMessages) {
_inboundBuildEndMessages.add(state);
}
_exec.repoll();
}
handleRequestAsInboundEndpoint(state);
} else {
if (_exec.wasRecentlyBuilding(reqId)) {
// we are the IBEP but we already gave up?
@ -771,52 +675,53 @@ class BuildHandler {
_log.warn("Dropping the reply " + reqId + ", as we used to be building that");
_context.statManager().addRateData("tunnel.buildReplyTooSlow", 1, 0);
} else {
synchronized (_inboundBuildMessages) {
boolean removed = false;
int dropped = 0;
for (int i = 0; i < _inboundBuildMessages.size(); i++) {
BuildMessageState cur = (BuildMessageState)_inboundBuildMessages.get(i);
long age = System.currentTimeMillis() - cur.recvTime;
if (age >= BuildRequestor.REQUEST_TIMEOUT/4) {
_inboundBuildMessages.remove(i);
i--;
dropped++;
_context.statManager().addRateData("tunnel.dropLoad", age, _inboundBuildMessages.size());
}
}
if (dropped > 0) {
int sz = _inboundBuildMessages.size();
BuildMessageState cur = _inboundBuildMessages.peek();
boolean accept = true;
if (cur != null) {
long age = System.currentTimeMillis() - cur.recvTime;
if (age >= BuildRequestor.REQUEST_TIMEOUT/4) {
_context.statManager().addRateData("tunnel.dropLoad", age, sz);
_context.throttle().setTunnelStatus(_x("Dropping tunnel requests: High load"));
// if the queue is backlogged, stop adding new messages
_context.statManager().addRateData("tunnel.dropLoadBacklog", _inboundBuildMessages.size(), _inboundBuildMessages.size());
_context.statManager().addRateData("tunnel.dropLoadBacklog", sz, sz);
accept = false;
}
}
if (accept) {
int queueTime = estimateQueueTime(sz);
float pDrop = queueTime/((float)BuildRequestor.REQUEST_TIMEOUT*3);
pDrop = (float)Math.pow(pDrop, 16); // steeeep
float f = _context.random().nextFloat();
//if ( (pDrop > f) && (allowProactiveDrop()) ) {
if (pDrop > f) {
_context.throttle().setTunnelStatus(_x("Dropping tunnel requests: Queue time"));
_context.statManager().addRateData("tunnel.dropLoadProactive", queueTime, sz);
} else {
int queueTime = estimateQueueTime(_inboundBuildMessages.size());
float pDrop = queueTime/((float)BuildRequestor.REQUEST_TIMEOUT*3);
pDrop = (float)Math.pow(pDrop, 16); // steeeep
float f = _context.random().nextFloat();
if ( (pDrop > f) && (allowProactiveDrop()) ) {
_context.throttle().setTunnelStatus(_x("Dropping tunnel requests: Queue time"));
_context.statManager().addRateData("tunnel.dropLoadProactive", queueTime, _inboundBuildMessages.size());
accept = _inboundBuildMessages.offer(new BuildMessageState(receivedMessage, from, fromHash));
if (accept) {
// wake up the Executor to call handleInboundRequests()
_exec.repoll();
} else {
_inboundBuildMessages.add(new BuildMessageState(receivedMessage, from, fromHash));
_context.throttle().setTunnelStatus(_x("Dropping tunnel requests: High load"));
_context.statManager().addRateData("tunnel.dropLoadBacklog", sz, sz);
}
}
}
_exec.repoll();
}
}
return _buildMessageHandlerJob;
}
}
/****
private boolean allowProactiveDrop() {
String allow = _context.getProperty("router.allowProactiveDrop", "true");
boolean rv = false;
if ( (allow == null) || (Boolean.valueOf(allow).booleanValue()) )
rv = true;
boolean rv = _context.getBooleanPropertyDefaultTrue("router.allowProactiveDrop");
if (!rv)
_context.statManager().addRateData("tunnel.dropLoadProactiveAbort", 1, 0);
return rv;
}
****/
private int estimateQueueTime(int numPendingMessages) {
int decryptTime = 200;
@ -845,24 +750,17 @@ class BuildHandler {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive tunnel build reply message " + receivedMessage.getUniqueId() + " from "
+ (fromHash != null ? fromHash.toBase64() : from != null ? from.calculateHash().toBase64() : "a tunnel"));
if (HANDLE_REPLIES_INLINE) {
handleReply(new BuildReplyMessageState(receivedMessage));
} else {
synchronized (_inboundBuildReplyMessages) {
_inboundBuildReplyMessages.add(new BuildReplyMessageState(receivedMessage));
}
_exec.repoll();
}
handleReply(new BuildReplyMessageState(receivedMessage));
return _buildReplyMessageHandlerJob;
}
}
/** normal inbound requests from other people */
private static class BuildMessageState {
TunnelBuildMessage msg;
RouterIdentity from;
Hash fromHash;
long recvTime;
final TunnelBuildMessage msg;
final RouterIdentity from;
final Hash fromHash;
final long recvTime;
public BuildMessageState(I2NPMessage m, RouterIdentity f, Hash h) {
msg = (TunnelBuildMessage)m;
from = f;
@ -872,8 +770,8 @@ class BuildHandler {
}
/** replies for outbound tunnels that we have created */
private static class BuildReplyMessageState {
TunnelBuildReplyMessage msg;
long recvTime;
final TunnelBuildReplyMessage msg;
final long recvTime;
public BuildReplyMessageState(I2NPMessage m) {
msg = (TunnelBuildReplyMessage)m;
recvTime = System.currentTimeMillis();
@ -881,9 +779,9 @@ class BuildHandler {
}
/** replies for inbound tunnels we have created */
private static class BuildEndMessageState {
TunnelBuildMessage msg;
PooledTunnelCreatorConfig cfg;
long recvTime;
final TunnelBuildMessage msg;
final PooledTunnelCreatorConfig cfg;
final long recvTime;
public BuildEndMessageState(PooledTunnelCreatorConfig c, I2NPMessage m) {
cfg = c;
msg = (TunnelBuildMessage)m;
@ -891,13 +789,14 @@ class BuildHandler {
}
}
// noop
/** noop */
private static class TunnelBuildMessageHandlerJob extends JobImpl {
private TunnelBuildMessageHandlerJob(RouterContext ctx) { super(ctx); }
public void runJob() {}
public String getName() { return "Receive tunnel build message"; }
}
// noop
/** noop */
private static class TunnelBuildReplyMessageHandlerJob extends JobImpl {
private TunnelBuildReplyMessageHandlerJob(RouterContext ctx) { super(ctx); }
public void runJob() {}
@ -910,7 +809,7 @@ class BuildHandler {
* but it affects capacity calculations
*/
private static class TunnelBuildNextHopFailJob extends JobImpl {
HopConfig _cfg;
final HopConfig _cfg;
private TunnelBuildNextHopFailJob(RouterContext ctx, HopConfig cfg) {
super(ctx);
_cfg = cfg;