diff --git a/router/java/src/net/i2p/router/transport/OutboundMessageRegistry.java b/router/java/src/net/i2p/router/transport/OutboundMessageRegistry.java index f584b4763..83220a702 100644 --- a/router/java/src/net/i2p/router/transport/OutboundMessageRegistry.java +++ b/router/java/src/net/i2p/router/transport/OutboundMessageRegistry.java @@ -47,8 +47,19 @@ public class OutboundMessageRegistry { _log.log(Log.CRIT, buf.toString()); } + /** + * Retrieve all messages that are waiting for the specified message. In + * addition, those matches may include instructions to either continue or not + * continue waiting for further replies - if it should continue, the matched + * message remains in the registry, but if it shouldn't continue, the matched + * message is removed from the registry. + * + * @param message Payload received that may be a reply to something we sent + * @return List of OutNetMessage describing messages that were waiting for + * the payload + */ public List getOriginalMessages(I2NPMessage message) { - HashSet matches = new HashSet(4); + ArrayList matches = new ArrayList(2); long beforeSync = _context.clock().now(); Map messages = null; @@ -62,7 +73,7 @@ public class OutboundMessageRegistry { long afterSync1 = _context.clock().now(); - ArrayList matchedRemove = new ArrayList(32); + ArrayList matchedRemove = null; // new ArrayList(32); for (Iterator iter = messages.keySet().iterator(); iter.hasNext(); ) { Long exp = (Long)iter.next(); OutNetMessage msg = (OutNetMessage)messages.get(exp); @@ -82,7 +93,8 @@ public class OutboundMessageRegistry { if (isMatch) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Selector matches [" + selector); - matches.add(msg); + if (!matches.contains(msg)) + matches.add(msg); long beforeCon = _context.clock().now(); boolean continueMatching = selector.continueMatching(); long afterCon = _context.clock().now(); @@ -102,6 +114,8 @@ public class OutboundMessageRegistry { if (_log.shouldLog(Log.DEBUG)) _log.debug("Stop matching selector " + selector + " for message " + msg.getMessageType()); + if (matchedRemove == null) + matchedRemove = new ArrayList(4); matchedRemove.add(exp); } } else { @@ -111,25 +125,8 @@ public class OutboundMessageRegistry { } long afterSearch = _context.clock().now(); - for (Iterator iter = matchedRemove.iterator(); iter.hasNext(); ) { - Long expiration = (Long)iter.next(); - OutNetMessage m = null; - long before = _context.clock().now(); - synchronized (_pendingMessages) { - m = (OutNetMessage)_pendingMessages.remove(expiration); - } - long diff = _context.clock().now() - before; - if ( (diff > 500) && (_log.shouldLog(Log.WARN)) ) - _log.warn("Took too long syncing on remove (" + diff + "ms"); - - if (m != null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Removing message with selector " - + m.getReplySelector().getClass().getName() - + " :" + m.getReplySelector().toString()); - } - } - + doRemove(matchedRemove); + long delay = _context.clock().now() - beforeSync; long search = afterSearch - afterSync1; long sync = afterSync1 - beforeSync; @@ -141,11 +138,39 @@ public class OutboundMessageRegistry { _log.log(level, "getMessages took " + delay + "ms with search time of " + search + "ms (match: " + matchTime + "ms, continue: " + continueTime + "ms, #: " + numMessages + ") and sync time of " - + sync + "ms for " + matchedRemove.size() + " removed, " - + matches.size() + " matches"); + + sync + "ms for " + (matchedRemove == null ? 0 : matchedRemove.size()) + + " removed, " + matches.size() + " matches"); } - return new ArrayList(matches); + return matches; + } + + /** + * Remove the specified messages from the pending list + * + * @param matchedRemove expiration (Long) of the pending message to remove + */ + private void doRemove(List matchedRemove) { + if (matchedRemove != null) { + for (int i = 0; i < matchedRemove.size(); i++) { + Long expiration = (Long)matchedRemove.get(i); + OutNetMessage m = null; + long before = _context.clock().now(); + synchronized (_pendingMessages) { + m = (OutNetMessage)_pendingMessages.remove(expiration); + } + long diff = _context.clock().now() - before; + if ( (diff > 500) && (_log.shouldLog(Log.WARN)) ) + _log.warn("Took too long syncing on remove (" + diff + "ms"); + + if (m != null) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Removing message with selector " + + m.getReplySelector().getClass().getName() + + " :" + m.getReplySelector().toString()); + } + } + } } public void registerPending(OutNetMessage msg) { @@ -307,23 +332,28 @@ public class OutboundMessageRegistry { */ private List removeMessages() { long now = OutboundMessageRegistry.this._context.clock().now(); - List removedMessages = new ArrayList(8); - List expirationsToRemove = new ArrayList(8); + List removedMessages = new ArrayList(2); + List expirationsToRemove = null; synchronized (_pendingMessages) { for (Iterator iter = _pendingMessages.keySet().iterator(); iter.hasNext();) { Long expiration = (Long)iter.next(); if (expiration.longValue() < now) { + if (expirationsToRemove == null) + expirationsToRemove = new ArrayList(8); expirationsToRemove.add(expiration); } else { // its sorted break; } } - for (int i = 0; i < expirationsToRemove.size(); i++) { - Long expiration = (Long)expirationsToRemove.get(i); - OutNetMessage msg = (OutNetMessage)_pendingMessages.remove(expiration); - if (msg != null) - removedMessages.add(msg); + + if (expirationsToRemove != null) { + for (int i = 0; i < expirationsToRemove.size(); i++) { + Long expiration = (Long)expirationsToRemove.get(i); + OutNetMessage msg = (OutNetMessage)_pendingMessages.remove(expiration); + if (msg != null) + removedMessages.add(msg); + } } } if (_log.shouldLog(Log.DEBUG))