2006-03-30 jrandom

* Substantially reduced the lock contention in the message registry (a
      major hotspot that can choke most threads).  Also reworked the locking
      so we don't need per-message timer events
    * No need to have additional per-peer message clearing, as they are
      either unregistered individually or expired.
    * Include some of the more transient tunnel throttling
This commit is contained in:
jrandom
2006-03-30 07:26:43 +00:00
committed by zzz
parent 8b707e569f
commit 15e6c27c04
6 changed files with 131 additions and 367 deletions

View File

@ -1,4 +1,12 @@
$Id: history.txt,v 1.439 2006/03/25 18:50:51 jrandom Exp $
$Id: history.txt,v 1.440 2006/03/26 18:23:54 jrandom Exp $
2006-03-30 jrandom
* Substantially reduced the lock contention in the message registry (a
major hotspot that can choke most threads). Also reworked the locking
so we don't need per-message timer events
* No need to have additional per-peer message clearing, as they are
either unregistered individually or expired.
* Include some of the more transient tunnel throttling
* 2006-03-26 0.6.1.13 released

View File

@ -248,7 +248,7 @@ class RouterThrottleImpl implements RouterThrottle {
*/
private boolean allowTunnel(double bytesAllocated, int numTunnels) {
int maxKBps = Math.min(_context.bandwidthLimiter().getOutboundKBytesPerSecond(), _context.bandwidthLimiter().getInboundKBytesPerSecond());
int used1s = 0; //get1sRate(_context); // dont throttle on the 1s rate, its too volatile
int used1s = get1sRate(_context); // dont throttle on the 1s rate, its too volatile
int used1m = get1mRate(_context);
int used5m = 0; //get5mRate(_context); // don't throttle on the 5m rate, as that'd hide available bandwidth
int used = Math.max(Math.max(used1s, used1m), used5m);

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.380 $ $Date: 2006/03/25 18:50:48 $";
public final static String ID = "$Revision: 1.381 $ $Date: 2006/03/26 18:23:52 $";
public final static String VERSION = "0.6.1.13";
public final static long BUILD = 0;
public final static long BUILD = 1;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -84,7 +84,7 @@ public class Shitlist {
_context.netDb().fail(peer);
//_context.tunnelManager().peerFailed(peer);
_context.messageRegistry().peerFailed(peer);
//_context.messageRegistry().peerFailed(peer);
if (!wasAlready)
_context.messageHistory().shitlist(peer, reason);
return wasAlready;

View File

@ -47,7 +47,7 @@ public class GetBidsJob extends JobImpl {
if (context.shitlist().isShitlisted(to)) {
if (log.shouldLog(Log.WARN))
log.warn("Attempt to send a message to a shitlisted peer - " + to);
context.messageRegistry().peerFailed(to);
//context.messageRegistry().peerFailed(to);
fail(context, msg);
return;
}

View File

@ -10,14 +10,8 @@ package net.i2p.router.transport;
import java.io.IOException;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.*;
import net.i2p.data.Hash;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.Job;
import net.i2p.router.MessageSelector;
@ -29,29 +23,25 @@ import net.i2p.util.SimpleTimer;
public class OutboundMessageRegistry {
private Log _log;
/** Expiration date (Long) to OutNetMessage */
private TreeMap _pendingMessages;
/** list of currently active MessageSelector instances */
private List _selectors;
/** map of active MessageSelector to the OutNetMessage causing it (for quick removal) */
private Map _selectorToMessage;
/** set of active OutNetMessage (for quick removal and selector fetching) */
private Set _activeMessages;
private CleanupTask _cleanupTask;
private RouterContext _context;
private final static long CLEANUP_DELAY = 1000*5; // how often to expire pending unreplied messages
public OutboundMessageRegistry(RouterContext context) {
_context = context;
_log = _context.logManager().getLog(OutboundMessageRegistry.class);
_pendingMessages = new TreeMap();
//_context.jobQueue().addJob(new CleanupPendingMessagesJob());
_selectors = new ArrayList(64);
_selectorToMessage = new HashMap(64);
_activeMessages = new HashSet(64);
_cleanupTask = new CleanupTask();
}
public void shutdown() {
if (_log.shouldLog(Log.WARN)) {
StringBuffer buf = new StringBuffer(1024);
buf.append("Pending messages: ").append(_pendingMessages.size()).append("\n");
for (Iterator iter = _pendingMessages.values().iterator(); iter.hasNext(); ) {
buf.append(iter.next().toString()).append("\n\t");
}
_log.log(Log.WARN, buf.toString());
}
}
public void shutdown() {}
/**
* Retrieve all messages that are waiting for the specified message. In
@ -65,103 +55,54 @@ public class OutboundMessageRegistry {
* the payload
*/
public List getOriginalMessages(I2NPMessage message) {
ArrayList matches = new ArrayList(2);
ArrayList matchedSelectors = null;
ArrayList removedSelectors = null;
long beforeSync = _context.clock().now();
Map messages = null;
long matchTime = 0;
long continueTime = 0;
int numMessages = 0;
long afterSync1 = 0;
long afterSearch = 0;
int matchedRemoveCount = 0;
StringBuffer slow = null; // new StringBuffer(256);
synchronized (_pendingMessages) {
messages = _pendingMessages; //(Map)_pendingMessages.clone();
numMessages = messages.size();
afterSync1 = _context.clock().now();
for (Iterator iter = messages.keySet().iterator(); iter.hasNext(); ) {
Long exp = (Long)iter.next();
OutNetMessage msg = (OutNetMessage)messages.get(exp);
MessageSelector selector = msg.getReplySelector();
if (selector != null) {
long before = _context.clock().now();
boolean isMatch = selector.isMatch(message);
long after = _context.clock().now();
long diff = after-before;
if (diff > 100) {
if (_log.shouldLog(Log.WARN))
_log.warn("Matching with selector took too long (" + diff + "ms) : "
+ selector.getClass().getName());
if (slow == null) slow = new StringBuffer(256);
slow.append(selector.getClass().getName()).append(": ");
slow.append(diff).append(" ");
}
matchTime += diff;
synchronized (_selectors) {
for (int i = 0; i < _selectors.size(); i++) {
MessageSelector sel = (MessageSelector)_selectors.get(i);
boolean isMatch = sel.isMatch(message);
if (isMatch) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Selector matches [" + selector);
if (!matches.contains(msg))
matches.add(msg);
long beforeCon = _context.clock().now();
boolean continueMatching = selector.continueMatching();
long afterCon = _context.clock().now();
long diffCon = afterCon - beforeCon;
if (diffCon > 100) {
if (_log.shouldLog(Log.WARN))
_log.warn("Error continueMatching on a match took too long ("
+ diffCon + "ms) : " + selector.getClass().getName());
if (matchedSelectors == null) matchedSelectors = new ArrayList(1);
matchedSelectors.add(sel);
if (!sel.continueMatching()) {
if (removedSelectors == null) removedSelectors = new ArrayList(1);
removedSelectors.add(sel);
_selectors.remove(i);
i--;
}
}
}
}
continueTime += diffCon;
if (continueMatching) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Continue matching");
// noop
List rv = null;
if (matchedSelectors != null) {
rv = new ArrayList(matchedSelectors.size());
for (int i = 0; i < matchedSelectors.size(); i++) {
MessageSelector sel = (MessageSelector)matchedSelectors.get(i);
boolean removed = false;
OutNetMessage msg = null;
synchronized (_selectorToMessage) {
if ( (removedSelectors != null) && (removedSelectors.contains(sel)) ) {
msg = (OutNetMessage)_selectorToMessage.remove(sel);
removed = true;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Stop matching selector " + selector + " for message "
+ msg.getMessageType());
// i give in mihi, i'll use iter.remove just this once ;)
// (TreeMap supports it, and this synchronized block is a hotspot)
iter.remove();
matchedRemoveCount++;
msg = (OutNetMessage)_selectorToMessage.get(sel);
}
if (msg != null)
rv.add(msg);
}
if (removed && msg != null) {
synchronized (_activeMessages) {
_activeMessages.remove(msg);
}
}
}
} else {
//_log.debug("Selector does not match [" + selector + "]");
}
}
}
afterSearch = _context.clock().now();
rv = Collections.EMPTY_LIST;
}
long delay = _context.clock().now() - beforeSync;
long search = afterSearch - afterSync1;
long sync = afterSync1 - beforeSync;
int level = Log.DEBUG;
if (delay > 1000)
level = Log.ERROR;
if (_log.shouldLog(level)) {
StringBuffer buf = new StringBuffer(1024);
buf.append("getMessages took ").append(delay).append("ms with search time of");
buf.append(search).append("ms (match: ").append(matchTime).append("ms, continue: ");
buf.append(continueTime).append("ms, #: ").append(numMessages).append(") and sync time of ");
buf.append(sync).append("ms for ");
buf.append(matchedRemoveCount);
buf.append(" removed, ").append(matches.size()).append(" matches: slow = ");
if (slow != null)
buf.append(slow.toString());
_log.log(level, buf.toString());
}
return matches;
return rv;
}
public OutNetMessage registerPending(MessageSelector replySelector, ReplyJob onReply, Job onTimeout, int timeoutMs) {
@ -175,270 +116,85 @@ public class OutboundMessageRegistry {
return msg;
}
public void registerPending(OutNetMessage msg) {
registerPending(msg, false);
}
public void registerPending(OutNetMessage msg) { registerPending(msg, false); }
public void registerPending(OutNetMessage msg, boolean allowEmpty) {
if (msg == null)
throw new IllegalArgumentException("Null OutNetMessage specified? wtf");
if (!allowEmpty) {
if (msg.getMessage() == null)
if ( (!allowEmpty) && (msg.getMessage() == null) )
throw new IllegalArgumentException("OutNetMessage doesn't contain an I2NPMessage? wtf");
}
MessageSelector sel = msg.getReplySelector();
if (sel == null) throw new IllegalArgumentException("No reply selector? wtf");
long beforeSync = _context.clock().now();
long afterSync1 = 0;
long afterDone = 0;
try {
OutNetMessage oldMsg = null;
long l = msg.getExpiration();
synchronized (_pendingMessages) {
if (_pendingMessages.containsValue(msg)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Not adding an already pending message: " + msg,
new Exception("Duplicate message registration"));
return;
boolean alreadyPending = false;
synchronized (_activeMessages) {
if (!_activeMessages.add(msg))
return; // dont add dups
}
synchronized (_selectorToMessage) { _selectorToMessage.put(sel, msg); }
synchronized (_selectors) { _selectors.add(sel); }
while (_pendingMessages.containsKey(new Long(l)))
l++;
_pendingMessages.put(new Long(l), msg);
}
afterSync1 = _context.clock().now();
// this may get orphaned if the message is matched or explicitly
// removed, but its cheap enough to do an extra remove on the map
// that to poll the list periodically
SimpleTimer.getInstance().addEvent(new CleanupExpiredTask(l), l - _context.clock().now());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Register pending: " + msg.getReplySelector().getClass().getName()
+ " for " + msg.getMessage() + ": "
+ msg.getReplySelector().toString(), new Exception("Register pending"));
afterDone = _context.clock().now();
} finally {
long delay = _context.clock().now() - beforeSync;
long sync1 = afterSync1 - beforeSync;
long done = afterDone - afterSync1;
String warn = delay + "ms (sync = " + sync1 + "ms, done = " + done + "ms)";
if ( (delay > 1000) && (_log.shouldLog(Log.WARN)) ) {
_log.error("Synchronizing in the registry.register took too long! " + warn);
//_context.messageHistory().messageProcessingError(msg.getMessage().getUniqueId(),
// msg.getMessage().getClass().getName(),
// "RegisterPending took too long: " + warn);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Synchronizing in the registry.register was quick: " + warn);
}
}
//_log.debug("* Register called of " + msg + "\n\nNow pending are: " + renderStatusHTML(), new Exception("who registered a new one?"));
_cleanupTask.scheduleExpiration(sel);
}
public void unregisterPending(OutNetMessage msg) {
long beforeSync = _context.clock().now();
try {
synchronized (_pendingMessages) {
if (_pendingMessages.containsValue(msg)) {
Long found = null;
for (Iterator iter = _pendingMessages.keySet().iterator(); iter.hasNext();) {
Long exp = (Long)iter.next();
Object val = _pendingMessages.get(exp);
if (val.equals(msg)) {
found = exp;
break;
}
MessageSelector sel = msg.getReplySelector();
// remember, order matters
synchronized (_selectors) { _selectors.add(sel); }
synchronized (_selectorToMessage) { _selectorToMessage.put(sel, msg); }
synchronized (_activeMessages) { _activeMessages.remove(msg); }
}
if (found != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Unregistered message " + msg.getReplySelector()
+ ": " + msg, new Exception("Who unregistered?"));
_pendingMessages.remove(found);
} else {
_log.error("Arg, couldn't find the message that we... thought we could find?",
new Exception("WTF"));
}
}
}
} finally {
long delay = _context.clock().now() - beforeSync;
String warn = delay + "ms";
if ( (delay > 1000) && (_log.shouldLog(Log.WARN)) ) {
_log.warn("Synchronizing in the registry.unRegister took too long! " + warn);
_context.messageHistory().messageProcessingError(msg.getMessageId(), msg.getMessageType(), "Unregister took too long: " + warn);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Synchronizing in the registry.unRegister was quick: " + warn);
}
}
}
public void renderStatusHTML(Writer out) throws IOException {}
public void peerFailed(Hash peer) {
List failed = null;
int numFailed = 0;
synchronized (_pendingMessages) {
for (Iterator iter = _pendingMessages.values().iterator(); iter.hasNext(); ) {
OutNetMessage msg = (OutNetMessage)iter.next();
if (msg.getTarget() != null) {
Hash to = msg.getTarget().getIdentity().calculateHash();
if (to.equals(peer)) {
if (failed == null)
failed = new ArrayList(4);
failed.add(msg);
iter.remove();
numFailed++;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Peer failed: " + peer.toBase64().substring(0,6)
+ " but not killing a message to "
+ to.toBase64().substring(0,6));
}
}
}
}
if (failed != null) {
for (int i = 0; i < failed.size(); i++) {
OutNetMessage msg = (OutNetMessage)failed.get(i);
msg.discardData();
if (msg.getOnFailedSendJob() != null)
_context.jobQueue().addJob(msg.getOnFailedSendJob());
}
}
if (_log.shouldLog(Log.WARN))
_log.warn("Peer failed: " + peer.toBase64().substring(0,6) + " killing " + numFailed);
}
public void renderStatusHTML(Writer out) throws IOException {
StringBuffer buf = new StringBuffer(8192);
buf.append("<h2>Pending messages</h2>\n");
Map msgs = null;
synchronized (_pendingMessages) {
msgs = (Map)_pendingMessages.clone();
}
buf.append("<ul>");
for (Iterator iter = msgs.keySet().iterator(); iter.hasNext();) {
Long exp = (Long)iter.next();
OutNetMessage msg = (OutNetMessage)msgs.get(exp);
buf.append("<li>").append(msg.getMessageType());
buf.append(": expiring on ").append(new Date(exp.longValue()));
if (msg.getTarget() != null)
buf.append(" targetting ").append(msg.getTarget().getIdentity().getHash());
if (msg.getReplySelector() != null)
buf.append(" with reply selector ").append(msg.getReplySelector().toString());
else
buf.append(" with NO reply selector? WTF!");
buf.append("</li>\n");
}
buf.append("</ul>");
out.write(buf.toString());
out.flush();
}
private class CleanupExpiredTask implements SimpleTimer.TimedEvent {
private long _expiration;
public CleanupExpiredTask(long expiration) {
_expiration = expiration;
private class CleanupTask implements SimpleTimer.TimedEvent {
private List _removing;
private long _nextExpire;
public CleanupTask() {
_removing = new ArrayList(4);
_nextExpire = -1;
}
public void timeReached() {
long now = _context.clock().now();
synchronized (_selectors) {
for (int i = 0; i < _selectors.size(); i++) {
MessageSelector sel = (MessageSelector)_selectors.get(i);
long expiration = sel.getExpiration();
if (expiration <= now) {
_removing.add(sel);
_selectors.remove(i);
i--;
} else if (expiration < _nextExpire || _nextExpire < now) {
_nextExpire = expiration;
}
}
}
if (_removing.size() > 0) {
for (int i = 0; i < _removing.size(); i++) {
MessageSelector sel = (MessageSelector)_removing.get(i);
OutNetMessage msg = null;
synchronized (_pendingMessages) {
msg = (OutNetMessage)_pendingMessages.remove(new Long(_expiration));
synchronized (_selectorToMessage) {
msg = (OutNetMessage)_selectorToMessage.remove(sel);
}
if (msg != null) {
_context.messageHistory().replyTimedOut(msg);
synchronized (_activeMessages) {
_activeMessages.remove(msg);
}
Job fail = msg.getOnFailedReplyJob();
if (fail != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Removing message with selector " + msg.getReplySelector()
+ ": " + msg.getMessageType()
+ " and firing fail job: " + fail.getClass().getName());
if (fail != null)
_context.jobQueue().addJob(fail);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Removing message with selector " + msg.getReplySelector()
+ " and not firing any job");
}
}
}
_removing.clear();
}
/**
* Cleanup any messages that were pending replies but have expired
*
*/
/*
private class CleanupPendingMessagesJob extends JobImpl {
public CleanupPendingMessagesJob() {
super(OutboundMessageRegistry.this._context);
if (_nextExpire <= now)
_nextExpire = now + 10*1000;
SimpleTimer.getInstance().addEvent(CleanupTask.this, _nextExpire - now);
}
public String getName() { return "Cleanup any messages that timed out"; }
public void runJob() {
List removed = removeMessages();
RouterContext ctx = OutboundMessageRegistry.this._context;
for (int i = 0; i < removed.size(); i++) {
OutNetMessage msg = (OutNetMessage)removed.get(i);
if (msg != null) {
_context.messageHistory().replyTimedOut(msg);
Job fail = msg.getOnFailedReplyJob();
if (fail != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Removing message with selector " + msg.getReplySelector()
+ ": " + msg.getMessageType()
+ " and firing fail job: " + fail.getClass().getName());
_context.jobQueue().addJob(fail);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Removing message with selector " + msg.getReplySelector()
+ " and not firing any job");
public void scheduleExpiration(MessageSelector sel) {
long now = _context.clock().now();
if ( (_nextExpire <= now) || (sel.getExpiration() < _nextExpire) ) {
_nextExpire = sel.getExpiration();
SimpleTimer.getInstance().addEvent(CleanupTask.this, _nextExpire - now);
}
}
}
requeue(CLEANUP_DELAY);
}
/**
* Remove any messages whose expirations are in the past
*
* @return list of OutNetMessage objects that have expired
*/ /*
private List removeMessages() {
long now = OutboundMessageRegistry.this._context.clock().now();
List removedMessages = new ArrayList(2);
List expirationsToRemove = null;
synchronized (_pendingMessages) {
for (Iterator iter = _pendingMessages.keySet().iterator(); iter.hasNext();) {
Long expiration = (Long)iter.next();
if (expiration.longValue() < now) {
if (expirationsToRemove == null)
expirationsToRemove = new ArrayList(8);
expirationsToRemove.add(expiration);
} else {
// its sorted
break;
}
}
if (expirationsToRemove != null) {
for (int i = 0; i < expirationsToRemove.size(); i++) {
Long expiration = (Long)expirationsToRemove.get(i);
OutNetMessage msg = (OutNetMessage)_pendingMessages.remove(expiration);
if (msg != null)
removedMessages.add(msg);
}
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Removed " + removedMessages.size() + " messages");
return removedMessages;
}
}
*/
}