* Router: Shutdown clients first
* Throttle: - Use 60s rather than 10m tunnel.participatingMessageCount stat - Fix a summary bar message * Tunnel Dispatcher: Update tunnel.participatingMessageCount every 20s, rather than at tunnel expiration, to maintain a more current stat
This commit is contained in:
@ -806,10 +806,10 @@ public class Router {
|
|||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
_log.log(Log.CRIT, "Error running shutdown task", t);
|
_log.log(Log.CRIT, "Error running shutdown task", t);
|
||||||
}
|
}
|
||||||
|
try { _context.clientManager().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the client manager", t); }
|
||||||
try { _context.jobQueue().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the job queue", t); }
|
try { _context.jobQueue().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the job queue", t); }
|
||||||
//try { _context.adminManager().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the admin manager", t); }
|
//try { _context.adminManager().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the admin manager", t); }
|
||||||
try { _context.statPublisher().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the stats manager", t); }
|
try { _context.statPublisher().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the stats manager", t); }
|
||||||
try { _context.clientManager().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the client manager", t); }
|
|
||||||
try { _context.tunnelManager().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the tunnel manager", t); }
|
try { _context.tunnelManager().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the tunnel manager", t); }
|
||||||
try { _context.tunnelDispatcher().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the tunnel dispatcher", t); }
|
try { _context.tunnelDispatcher().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the tunnel dispatcher", t); }
|
||||||
try { _context.netDb().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the networkDb", t); }
|
try { _context.netDb().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the networkDb", t); }
|
||||||
@ -1199,6 +1199,8 @@ class CoalesceStatsEvent implements SimpleTimer.TimedEvent {
|
|||||||
getContext().statManager().addRateData("bw.sendRate", (long)getContext().bandwidthLimiter().getSendBps(), 0);
|
getContext().statManager().addRateData("bw.sendRate", (long)getContext().bandwidthLimiter().getSendBps(), 0);
|
||||||
getContext().statManager().addRateData("bw.recvRate", (long)getContext().bandwidthLimiter().getReceiveBps(), 0);
|
getContext().statManager().addRateData("bw.recvRate", (long)getContext().bandwidthLimiter().getReceiveBps(), 0);
|
||||||
|
|
||||||
|
getContext().tunnelDispatcher().updateParticipatingStats();
|
||||||
|
|
||||||
getContext().statManager().coalesceStats();
|
getContext().statManager().coalesceStats();
|
||||||
|
|
||||||
RateStat receiveRate = getContext().statManager().getRate("transport.receiveMessageSize");
|
RateStat receiveRate = getContext().statManager().getRate("transport.receiveMessageSize");
|
||||||
|
@ -139,7 +139,7 @@ class RouterThrottleImpl implements RouterThrottle {
|
|||||||
_log.warn("Probabalistically refusing tunnel request (avg=" + avg
|
_log.warn("Probabalistically refusing tunnel request (avg=" + avg
|
||||||
+ " current=" + numTunnels + ")");
|
+ " current=" + numTunnels + ")");
|
||||||
_context.statManager().addRateData("router.throttleTunnelProbTooFast", (long)(numTunnels-avg), 0);
|
_context.statManager().addRateData("router.throttleTunnelProbTooFast", (long)(numTunnels-avg), 0);
|
||||||
setTunnelStatus("Rejecting " + ((int) probAccept*100) + "% of tunnels: High number of requests");
|
setTunnelStatus("Rejecting " + (100 - (int) probAccept*100) + "% of tunnels: High number of requests");
|
||||||
return TunnelHistory.TUNNEL_REJECT_PROBABALISTIC_REJECT;
|
return TunnelHistory.TUNNEL_REJECT_PROBABALISTIC_REJECT;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -211,7 +211,7 @@ class RouterThrottleImpl implements RouterThrottle {
|
|||||||
r = null;
|
r = null;
|
||||||
double messagesPerTunnel = DEFAULT_MESSAGES_PER_TUNNEL_ESTIMATE;
|
double messagesPerTunnel = DEFAULT_MESSAGES_PER_TUNNEL_ESTIMATE;
|
||||||
if (rs != null) {
|
if (rs != null) {
|
||||||
r = rs.getRate(10*60*1000);
|
r = rs.getRate(60*1000);
|
||||||
if (r != null) {
|
if (r != null) {
|
||||||
long count = r.getLastEventCount() + r.getCurrentEventCount();
|
long count = r.getLastEventCount() + r.getCurrentEventCount();
|
||||||
if (count > 0)
|
if (count > 0)
|
||||||
|
@ -27,6 +27,7 @@ public class HopConfig {
|
|||||||
private long _expiration;
|
private long _expiration;
|
||||||
private Map _options;
|
private Map _options;
|
||||||
private long _messagesProcessed;
|
private long _messagesProcessed;
|
||||||
|
private long _oldMessagesProcessed;
|
||||||
|
|
||||||
/** IV length for {@link #getReplyIV} */
|
/** IV length for {@link #getReplyIV} */
|
||||||
public static final int REPLY_IV_LENGTH = 16;
|
public static final int REPLY_IV_LENGTH = 16;
|
||||||
@ -42,6 +43,7 @@ public class HopConfig {
|
|||||||
_expiration = -1;
|
_expiration = -1;
|
||||||
_options = null;
|
_options = null;
|
||||||
_messagesProcessed = 0;
|
_messagesProcessed = 0;
|
||||||
|
_oldMessagesProcessed = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** what tunnel ID are we receiving on? */
|
/** what tunnel ID are we receiving on? */
|
||||||
@ -115,6 +117,11 @@ public class HopConfig {
|
|||||||
/** take note of a message being pumped through this tunnel */
|
/** take note of a message being pumped through this tunnel */
|
||||||
public void incrementProcessedMessages() { _messagesProcessed++; }
|
public void incrementProcessedMessages() { _messagesProcessed++; }
|
||||||
public long getProcessedMessagesCount() { return _messagesProcessed; }
|
public long getProcessedMessagesCount() { return _messagesProcessed; }
|
||||||
|
public long getRecentMessagesCount() {
|
||||||
|
long rv = _messagesProcessed - _oldMessagesProcessed;
|
||||||
|
_oldMessagesProcessed = _messagesProcessed;
|
||||||
|
return rv;
|
||||||
|
}
|
||||||
|
|
||||||
public String toString() {
|
public String toString() {
|
||||||
StringBuffer buf = new StringBuffer(64);
|
StringBuffer buf = new StringBuffer(64);
|
||||||
|
@ -200,13 +200,10 @@ public class TunnelDispatcher implements Service {
|
|||||||
synchronized (_participants) {
|
synchronized (_participants) {
|
||||||
_participants.put(recvId, participant);
|
_participants.put(recvId, participant);
|
||||||
}
|
}
|
||||||
int numParticipants = 0;
|
|
||||||
synchronized (_participatingConfig) {
|
synchronized (_participatingConfig) {
|
||||||
_participatingConfig.put(recvId, cfg);
|
_participatingConfig.put(recvId, cfg);
|
||||||
numParticipants = _participatingConfig.size();
|
|
||||||
}
|
}
|
||||||
_context.messageHistory().tunnelJoined("participant", cfg);
|
_context.messageHistory().tunnelJoined("participant", cfg);
|
||||||
_context.statManager().addRateData("tunnel.participatingTunnels", numParticipants, 0);
|
|
||||||
_context.statManager().addRateData("tunnel.joinParticipant", 1, 0);
|
_context.statManager().addRateData("tunnel.joinParticipant", 1, 0);
|
||||||
if (cfg.getExpiration() > _lastParticipatingExpiration)
|
if (cfg.getExpiration() > _lastParticipatingExpiration)
|
||||||
_lastParticipatingExpiration = cfg.getExpiration();
|
_lastParticipatingExpiration = cfg.getExpiration();
|
||||||
@ -224,13 +221,10 @@ public class TunnelDispatcher implements Service {
|
|||||||
synchronized (_outboundEndpoints) {
|
synchronized (_outboundEndpoints) {
|
||||||
_outboundEndpoints.put(recvId, endpoint);
|
_outboundEndpoints.put(recvId, endpoint);
|
||||||
}
|
}
|
||||||
int numParticipants = 0;
|
|
||||||
synchronized (_participatingConfig) {
|
synchronized (_participatingConfig) {
|
||||||
_participatingConfig.put(recvId, cfg);
|
_participatingConfig.put(recvId, cfg);
|
||||||
numParticipants = _participatingConfig.size();
|
|
||||||
}
|
}
|
||||||
_context.messageHistory().tunnelJoined("outboundEndpoint", cfg);
|
_context.messageHistory().tunnelJoined("outboundEndpoint", cfg);
|
||||||
_context.statManager().addRateData("tunnel.participatingTunnels", numParticipants, 0);
|
|
||||||
_context.statManager().addRateData("tunnel.joinOutboundEndpoint", 1, 0);
|
_context.statManager().addRateData("tunnel.joinOutboundEndpoint", 1, 0);
|
||||||
|
|
||||||
if (cfg.getExpiration() > _lastParticipatingExpiration)
|
if (cfg.getExpiration() > _lastParticipatingExpiration)
|
||||||
@ -254,13 +248,10 @@ public class TunnelDispatcher implements Service {
|
|||||||
synchronized (_inboundGateways) {
|
synchronized (_inboundGateways) {
|
||||||
_inboundGateways.put(recvId, gw);
|
_inboundGateways.put(recvId, gw);
|
||||||
}
|
}
|
||||||
int numParticipants = 0;
|
|
||||||
synchronized (_participatingConfig) {
|
synchronized (_participatingConfig) {
|
||||||
_participatingConfig.put(recvId, cfg);
|
_participatingConfig.put(recvId, cfg);
|
||||||
numParticipants = _participatingConfig.size();
|
|
||||||
}
|
}
|
||||||
_context.messageHistory().tunnelJoined("inboundGateway", cfg);
|
_context.messageHistory().tunnelJoined("inboundGateway", cfg);
|
||||||
_context.statManager().addRateData("tunnel.participatingTunnels", numParticipants, 0);
|
|
||||||
_context.statManager().addRateData("tunnel.joinInboundGateway", 1, 0);
|
_context.statManager().addRateData("tunnel.joinInboundGateway", 1, 0);
|
||||||
|
|
||||||
if (cfg.getExpiration() > _lastParticipatingExpiration)
|
if (cfg.getExpiration() > _lastParticipatingExpiration)
|
||||||
@ -338,19 +329,14 @@ public class TunnelDispatcher implements Service {
|
|||||||
_log.debug("removing " + cfg);
|
_log.debug("removing " + cfg);
|
||||||
|
|
||||||
boolean removed = false;
|
boolean removed = false;
|
||||||
int numParticipants = 0;
|
|
||||||
synchronized (_participatingConfig) {
|
synchronized (_participatingConfig) {
|
||||||
removed = (null != _participatingConfig.remove(recvId));
|
removed = (null != _participatingConfig.remove(recvId));
|
||||||
numParticipants = _participatingConfig.size();
|
|
||||||
}
|
}
|
||||||
if (!removed) {
|
if (!removed) {
|
||||||
if (_log.shouldLog(Log.WARN))
|
if (_log.shouldLog(Log.WARN))
|
||||||
_log.warn("Participating tunnel, but no longer listed in participatingConfig? " + cfg);
|
_log.warn("Participating tunnel, but no longer listed in participatingConfig? " + cfg);
|
||||||
}
|
}
|
||||||
|
|
||||||
_context.statManager().addRateData("tunnel.participatingTunnels", numParticipants, 0);
|
|
||||||
_context.statManager().addRateData("tunnel.participatingMessageCount", cfg.getProcessedMessagesCount(), 10*60*1000);
|
|
||||||
|
|
||||||
synchronized (_participants) {
|
synchronized (_participants) {
|
||||||
removed = (null != _participants.remove(recvId));
|
removed = (null != _participants.remove(recvId));
|
||||||
}
|
}
|
||||||
@ -547,6 +533,35 @@ public class TunnelDispatcher implements Service {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generate a current estimate of usage per-participating-tunnel lifetime.
|
||||||
|
* The stats code calls this every 20s.
|
||||||
|
* This is better than waiting until the tunnel expires to update the rate,
|
||||||
|
* as we want this to be current because it's an important part of
|
||||||
|
* the throttle code.
|
||||||
|
*/
|
||||||
|
public void updateParticipatingStats() {
|
||||||
|
List participating = listParticipatingTunnels();
|
||||||
|
int size = participating.size();
|
||||||
|
long count = 0;
|
||||||
|
long tcount = 0;
|
||||||
|
long tooYoung = _context.clock().now() - 60*1000;
|
||||||
|
long tooOld = tooYoung - 9*60*1000;
|
||||||
|
for (int i = 0; i < size; i++) {
|
||||||
|
HopConfig cfg = (HopConfig)participating.get(i);
|
||||||
|
long c = cfg.getRecentMessagesCount();
|
||||||
|
long created = cfg.getCreation();
|
||||||
|
if (created > tooYoung || created < tooOld)
|
||||||
|
continue;
|
||||||
|
tcount++;
|
||||||
|
count += c;
|
||||||
|
}
|
||||||
|
// This is called every 20s from Router.java, with 11m tunnel lifetime, so *= 33
|
||||||
|
if (tcount > 0)
|
||||||
|
count = count * 33 / tcount;
|
||||||
|
_context.statManager().addRateData("tunnel.participatingMessageCount", count, 20*1000);
|
||||||
|
_context.statManager().addRateData("tunnel.participatingTunnels", size, 0);
|
||||||
|
}
|
||||||
|
|
||||||
private static final int DROP_BASE_INTERVAL = 40 * 1000;
|
private static final int DROP_BASE_INTERVAL = 40 * 1000;
|
||||||
private static final int DROP_RANDOM_BOOST = 10 * 1000;
|
private static final int DROP_RANDOM_BOOST = 10 * 1000;
|
||||||
|
Reference in New Issue
Block a user