- Send exploratory lookups directly to the floodfill if
    we are already connected to him
  - Don't encrypt RI lookups when overloaded
  - Don't explore when overloaded
  - SearchJob cleanups
Tunnels: Drop instead of reject requests on high job lag
This commit is contained in:
zzz
2015-03-18 12:10:30 +00:00
parent 0af1f67c33
commit d7a88db87a
6 changed files with 74 additions and 27 deletions

View File

@ -115,6 +115,10 @@ class RouterThrottleImpl implements RouterThrottle {
}
/**
* If we should send a reject, return a nonzero reject code.
* Anything that causes us to drop a request instead of rejecting it
* must go in BuildHandler.handleInboundRequest(), not here.
*
* @return 0 for accept or nonzero reject code
*/
public int acceptTunnelRequest() {
@ -132,6 +136,7 @@ class RouterThrottleImpl implements RouterThrottle {
return TunnelHistory.TUNNEL_REJECT_BANDWIDTH;
}
/**** Moved to BuildHandler
long lag = _context.jobQueue().getMaxLag();
if (lag > JOB_LAG_LIMIT_TUNNEL) {
if (_log.shouldLog(Log.WARN))
@ -140,6 +145,7 @@ class RouterThrottleImpl implements RouterThrottle {
setTunnelStatus(_x("Rejecting tunnels: High job lag"));
return TunnelHistory.TUNNEL_REJECT_BANDWIDTH;
}
****/
RateAverages ra = RateAverages.getTemp();

View File

@ -73,8 +73,8 @@ class ExploreJob extends SearchJob {
* and PeerSelector doesn't include the floodfill peers,
* so we add the ff peers ourselves and then use the regular PeerSelector.
*
* @param replyTunnelId tunnel to receive replies through
* @param replyGateway gateway for the reply tunnel
* @param replyTunnelId tunnel to receive replies through, or our router hash if replyGateway is null
* @param replyGateway gateway for the reply tunnel, if null, we are sending direct, do not encrypt
* @param expiration when the search should stop
* @param peer the peer to send it to
*
@ -89,7 +89,8 @@ class ExploreJob extends SearchJob {
//msg.setDontIncludePeers(getState().getClosestAttempted(MAX_CLOSEST));
Set<Hash> dontIncludePeers = getState().getClosestAttempted(MAX_CLOSEST);
msg.setMessageExpiration(expiration);
msg.setReplyTunnel(replyTunnelId);
if (replyTunnelId != null)
msg.setReplyTunnel(replyTunnelId);
int available = MAX_CLOSEST - dontIncludePeers.size();
if (available > 0) {
@ -134,7 +135,8 @@ class ExploreJob extends SearchJob {
// Now encrypt if we can
I2NPMessage outMsg;
if (getContext().getProperty(IterativeSearchJob.PROP_ENCRYPT_RI, IterativeSearchJob.DEFAULT_ENCRYPT_RI)) {
if (replyTunnelId != null &&
getContext().getProperty(IterativeSearchJob.PROP_ENCRYPT_RI, IterativeSearchJob.DEFAULT_ENCRYPT_RI)) {
// request encrypted reply?
if (DatabaseLookupMessage.supportsEncryptedReplies(peer)) {
MessageWrapper.OneTimeSession sess;

View File

@ -347,7 +347,9 @@ class IterativeSearchJob extends FloodSearchJob {
I2NPMessage outMsg = null;
if (isDirect) {
// never wrap
} else if (_isLease || getContext().getProperty(PROP_ENCRYPT_RI, DEFAULT_ENCRYPT_RI)) {
} else if (_isLease ||
(getContext().getProperty(PROP_ENCRYPT_RI, DEFAULT_ENCRYPT_RI) &&
getContext().jobQueue().getMaxLag() < 300)) {
// Full ElG is fairly expensive so only do it for LS lookups
// and for RI lookups on fast boxes.
// if we have the ff RI, garlic encrypt it

View File

@ -28,6 +28,7 @@ import net.i2p.router.JobImpl;
import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
import net.i2p.router.TunnelInfo;
import net.i2p.router.message.SendMessageDirectJob;
import net.i2p.util.Log;
/**
@ -47,6 +48,7 @@ class SearchJob extends JobImpl {
private final long _expiration;
private final long _timeoutMs;
private final boolean _keepStats;
private final boolean _isLease;
private Job _pendingRequeueJob;
private final PeerSelector _peerSelector;
private final List<Search> _deferredSearches;
@ -88,7 +90,8 @@ class SearchJob extends JobImpl {
* Create a new search for the routingKey specified
*
*/
public SearchJob(RouterContext context, KademliaNetworkDatabaseFacade facade, Hash key, Job onSuccess, Job onFailure, long timeoutMs, boolean keepStats, boolean isLease) {
public SearchJob(RouterContext context, KademliaNetworkDatabaseFacade facade, Hash key,
Job onSuccess, Job onFailure, long timeoutMs, boolean keepStats, boolean isLease) {
super(context);
if ( (key == null) || (key.getData() == null) )
throw new IllegalArgumentException("Search for null key? wtf");
@ -99,13 +102,14 @@ class SearchJob extends JobImpl {
_onFailure = onFailure;
_timeoutMs = timeoutMs;
_keepStats = keepStats;
_isLease = isLease;
_deferredSearches = new ArrayList<Search>(0);
_peerSelector = facade.getPeerSelector();
_startedOn = -1;
_expiration = getContext().clock().now() + timeoutMs;
getContext().statManager().addRateData("netDb.searchCount", 1, 0);
getContext().statManager().addRateData("netDb.searchCount", 1);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Search (" + getClass().getName() + " for " + key.toBase64(), new Exception("Search enqueued by"));
_log.debug("Search (" + getClass().getName() + " for " + key, new Exception("Search enqueued by"));
}
public void runJob() {
@ -348,6 +352,7 @@ class SearchJob extends JobImpl {
else
requeuePending(REQUEUE_DELAY);
}
private void requeuePending(long ms) {
if (_pendingRequeueJob == null)
_pendingRequeueJob = new RequeuePending(getContext());
@ -390,17 +395,24 @@ class SearchJob extends JobImpl {
return;
} else {
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Send search to " + router.getIdentity().getHash().toBase64()
+ " for " + _state.getTarget().toBase64()
_log.info(getJobId() + ": Send search to " + router.getIdentity().getHash()
+ " for " + _state.getTarget()
+ " w/ timeout " + getPerPeerTimeoutMs(router.getIdentity().calculateHash()));
}
getContext().statManager().addRateData("netDb.searchMessageCount", 1, 0);
getContext().statManager().addRateData("netDb.searchMessageCount", 1);
//if (_isLease || true) // always send searches out tunnels
// To minimize connection congestion, send RI lokups through exploratory tunnels if not connected.
// To minimize crypto overhead and response latency, send RI lookups directly if connected.
// But not too likely since we don't explore when we're floodfill.
// Always send LS lookups thru expl tunnels.
// But this is never used for LSes...
if (_isLease ||
!getContext().commSystem().isEstablished(router.getIdentity().calculateHash()))
sendLeaseSearch(router);
//else
// sendRouterSearch(router);
else
sendRouterSearch(router);
}
@ -461,27 +473,27 @@ class SearchJob extends JobImpl {
}
/** we're searching for a router, so we can just send direct */
/******* always send through the lease
protected void sendRouterSearch(RouterInfo router) {
int timeout = _facade.getPeerTimeout(router.getIdentity().getHash());
long expiration = getContext().clock().now() + timeout;
DatabaseLookupMessage msg = buildMessage(expiration);
// use the 4-arg one so we pick up the override in ExploreJob
//I2NPMessage msg = buildMessage(expiration);
I2NPMessage msg = buildMessage(null, router.getIdentity().getHash(), expiration, router);
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": Sending router search to " + router.getIdentity().getHash().toBase64()
+ " for " + msg.getSearchKey().toBase64() + " w/ replies to us ["
+ msg.getFrom().toBase64() + "]");
_log.debug(getJobId() + ": Sending router search directly to " + router.getIdentity().getHash()
+ " for " + _state.getTarget());
SearchMessageSelector sel = new SearchMessageSelector(getContext(), router, _expiration, _state);
SearchUpdateReplyFoundJob reply = new SearchUpdateReplyFoundJob(getContext(), router, _state, _facade, this);
SendMessageDirectJob j = new SendMessageDirectJob(getContext(), msg, router.getIdentity().getHash(),
reply, new FailedJob(getContext(), router), sel, timeout, SEARCH_PRIORITY);
reply, new FailedJob(getContext(), router), sel, timeout,
OutNetMessage.PRIORITY_EXPLORATORY);
if (FloodfillNetworkDatabaseFacade.isFloodfill(router))
_floodfillSearchesOutstanding++;
j.runJob();
//getContext().jobQueue().addJob(j);
}
**********/
/**
@ -495,6 +507,8 @@ class SearchJob extends JobImpl {
* @return a DatabaseLookupMessage
*/
protected I2NPMessage buildMessage(TunnelId replyTunnelId, Hash replyGateway, long expiration, RouterInfo peer) {
throw new UnsupportedOperationException("see ExploreJob");
/*******
DatabaseLookupMessage msg = new DatabaseLookupMessage(getContext(), true);
msg.setSearchKey(_state.getTarget());
//msg.setFrom(replyGateway.getIdentity().getHash());
@ -503,6 +517,7 @@ class SearchJob extends JobImpl {
msg.setMessageExpiration(expiration);
msg.setReplyTunnel(replyTunnelId);
return msg;
*********/
}
/**
@ -522,6 +537,7 @@ class SearchJob extends JobImpl {
}
*********/
/** found a reply */
void replyFound(DatabaseSearchReplyMessage message, Hash peer) {
long duration = _state.replyFound(peer);
// this processing can take a while, so split 'er up
@ -569,13 +585,13 @@ class SearchJob extends JobImpl {
_state.replyTimeout(_peer);
if (_penalizePeer) {
if (_log.shouldLog(Log.INFO))
_log.info("Penalizing peer for timeout on search: " + _peer.toBase64() + " after " + (getContext().clock().now() - _sentOn));
_log.info("Penalizing peer for timeout on search: " + _peer + " after " + (getContext().clock().now() - _sentOn));
getContext().profileManager().dbLookupFailed(_peer);
} else {
if (_log.shouldLog(Log.ERROR))
_log.error("NOT (!!) Penalizing peer for timeout on search: " + _peer.toBase64());
_log.error("NOT (!!) Penalizing peer for timeout on search: " + _peer);
}
getContext().statManager().addRateData("netDb.failedPeers", 1, 0);
getContext().statManager().addRateData("netDb.failedPeers", 1);
searchNext();
}
public String getName() { return "Kademlia Search Failed"; }
@ -593,7 +609,7 @@ class SearchJob extends JobImpl {
if (_keepStats) {
long time = getContext().clock().now() - _state.getWhenStarted();
getContext().statManager().addRateData("netDb.successTime", time, 0);
getContext().statManager().addRateData("netDb.successTime", time);
getContext().statManager().addRateData("netDb.successPeers", _state.getAttempted().size(), time);
}
if (_onSuccess != null)
@ -682,7 +698,7 @@ class SearchJob extends JobImpl {
protected void fail() {
if (isLocal()) {
if (_log.shouldLog(Log.ERROR))
_log.error(getJobId() + ": why did we fail if the target is local?: " + _state.getTarget().toBase64(), new Exception("failure cause"));
_log.error(getJobId() + ": why did we fail if the target is local?: " + _state.getTarget(), new Exception("failure cause"));
succeed();
return;
}
@ -697,7 +713,7 @@ class SearchJob extends JobImpl {
getContext().statManager().addRateData("netDb.failedAttemptedPeers", attempted, time);
if (_keepStats) {
getContext().statManager().addRateData("netDb.failedTime", time, 0);
getContext().statManager().addRateData("netDb.failedTime", time);
//_facade.fail(_state.getTarget());
}
if (_onFailure != null)
@ -782,6 +798,7 @@ class SearchJob extends JobImpl {
}
boolean wasAttempted(Hash peer) { return _state.wasAttempted(peer); }
long timeoutMs() { return _timeoutMs; }
/** @return true if peer was new */
@ -795,5 +812,6 @@ class SearchJob extends JobImpl {
}
return rv;
}
void decrementOutstandingFloodfillSearches() { _floodfillSearchesOutstanding--; }
}

View File

@ -42,6 +42,8 @@ class StartExplorersJob extends JobImpl {
private static final int MIN_ROUTERS = 250;
/** explore slowly if we have more than this many routers */
private static final int MAX_ROUTERS = 800;
private static final long MAX_LAG = 100;
private static final long MAX_MSG_DELAY = 1500;
public StartExplorersJob(RouterContext context, KademliaNetworkDatabaseFacade facade) {
super(context);
@ -50,8 +52,12 @@ class StartExplorersJob extends JobImpl {
}
public String getName() { return "Start Explorers Job"; }
public void runJob() {
if (! (_facade.floodfillEnabled() ||
getContext().jobQueue().getMaxLag() > MAX_LAG ||
getContext().throttle().getMessageDelay() > MAX_MSG_DELAY ||
// message delay limit also?
getContext().router().gracefulShutdownInProgress())) {
int num = MAX_PER_RUN;
if (_facade.getDataStore().size() < LOW_ROUTERS)

View File

@ -93,6 +93,8 @@ class BuildHandler implements Runnable {
/** must be > 1 hour due to rouding down */
private static final long MAX_REQUEST_AGE = 65*60*1000;
private static final long JOB_LAG_LIMIT_TUNNEL = 350;
public BuildHandler(RouterContext ctx, TunnelPoolManager manager, BuildExecutor exec) {
_context = ctx;
@ -248,6 +250,17 @@ class BuildHandler implements Runnable {
_context.throttle().setTunnelStatus(_x("Dropping tunnel requests: Too slow"));
return;
}
long lag = _context.jobQueue().getMaxLag();
// TODO reject instead of drop also for a lower limit? see throttle
if (lag > JOB_LAG_LIMIT_TUNNEL) {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping tunnel request, as the job lag is " + lag);
_context.statManager().addRateData("router.throttleTunnelCause", lag);
_context.throttle().setTunnelStatus(_x("Dropping tunnel requests: High job lag"));
return;
}
handleRequest(state);
//int remaining = _inboundBuildMessages.size();