forked from I2P_Developers/i2p.i2p
Remove one global lock in OutboundMessageRegistry.
This isn't the cause of the ISJ deadlocks though.
This commit is contained in:
@ -25,6 +25,7 @@ import net.i2p.router.MessageSelector;
|
||||
import net.i2p.router.OutNetMessage;
|
||||
import net.i2p.router.ReplyJob;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.util.ConcurrentHashSet;
|
||||
import net.i2p.util.Log;
|
||||
import net.i2p.util.SimpleTimer;
|
||||
|
||||
@ -37,7 +38,14 @@ public class OutboundMessageRegistry {
|
||||
private final List<MessageSelector> _selectors;
|
||||
/** map of active MessageSelector to either an OutNetMessage or a List of OutNetMessages causing it (for quick removal) */
|
||||
private final Map<MessageSelector, Object> _selectorToMessage;
|
||||
/** set of active OutNetMessage (for quick removal and selector fetching) */
|
||||
/**
|
||||
* set of active OutNetMessage (for quick removal and selector fetching)
|
||||
* !! Really? seems only for dup detection in registerPending().
|
||||
* Changed to concurrent, but it could perhaps be removed completely,
|
||||
* It would seem difficult to add a dup since every OutNetMessage is different,
|
||||
* and it's generally instantiated just before ctx.outNetMessagePool().add().
|
||||
* But in TransportImpl.afterSend() it does requeue a previous ONM if allowRequeue=true.
|
||||
*/
|
||||
private final Set<OutNetMessage> _activeMessages;
|
||||
private final CleanupTask _cleanupTask;
|
||||
private final RouterContext _context;
|
||||
@ -47,7 +55,7 @@ public class OutboundMessageRegistry {
|
||||
_log = _context.logManager().getLog(OutboundMessageRegistry.class);
|
||||
_selectors = new ArrayList(64);
|
||||
_selectorToMessage = new HashMap(64);
|
||||
_activeMessages = new HashSet(64);
|
||||
_activeMessages = new ConcurrentHashSet(64);
|
||||
_cleanupTask = new CleanupTask();
|
||||
}
|
||||
|
||||
@ -63,9 +71,7 @@ public class OutboundMessageRegistry {
|
||||
}
|
||||
// Calling the fail job for every active message would
|
||||
// be way too much at shutdown/restart, right?
|
||||
synchronized (_activeMessages) {
|
||||
_activeMessages.clear();
|
||||
}
|
||||
_activeMessages.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -84,6 +90,10 @@ public class OutboundMessageRegistry {
|
||||
*
|
||||
* This is called only by InNetMessagePool.
|
||||
*
|
||||
* TODO this calls isMatch() in the selectors from inside the lock, which
|
||||
* can lead to deadlocks if the selector does too much in isMatch().
|
||||
* Remove the lock if possible.
|
||||
*
|
||||
* @param message Payload received that may be a reply to something we sent
|
||||
* @return non-null List of OutNetMessage describing messages that were waiting for
|
||||
* the payload
|
||||
@ -97,7 +107,7 @@ public class OutboundMessageRegistry {
|
||||
//for (Iterator<MessageSelector> iter = _selectors.iterator(); iter.hasNext(); ) {
|
||||
// MessageSelector sel = iter.next();
|
||||
for (int i = 0; i < _selectors.size(); i++) {
|
||||
MessageSelector sel = (MessageSelector)_selectors.get(i);
|
||||
MessageSelector sel = _selectors.get(i);
|
||||
boolean isMatch = sel.isMatch(message);
|
||||
if (isMatch) {
|
||||
if (matchedSelectors == null) matchedSelectors = new ArrayList(1);
|
||||
@ -141,13 +151,9 @@ public class OutboundMessageRegistry {
|
||||
}
|
||||
if (removed) {
|
||||
if (msg != null) {
|
||||
synchronized (_activeMessages) {
|
||||
_activeMessages.remove(msg);
|
||||
}
|
||||
_activeMessages.remove(msg);
|
||||
} else if (msgs != null) {
|
||||
synchronized (_activeMessages) {
|
||||
_activeMessages.removeAll(msgs);
|
||||
}
|
||||
_activeMessages.removeAll(msgs);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -195,11 +201,9 @@ public class OutboundMessageRegistry {
|
||||
MessageSelector sel = msg.getReplySelector();
|
||||
if (sel == null) throw new IllegalArgumentException("No reply selector? wtf");
|
||||
|
||||
boolean alreadyPending = false;
|
||||
synchronized (_activeMessages) {
|
||||
if (!_activeMessages.add(msg))
|
||||
return; // dont add dups
|
||||
}
|
||||
if (!_activeMessages.add(msg))
|
||||
return; // dont add dups
|
||||
|
||||
synchronized (_selectorToMessage) {
|
||||
Object oldMsg = _selectorToMessage.put(sel, msg);
|
||||
if (oldMsg != null) {
|
||||
@ -246,7 +250,7 @@ public class OutboundMessageRegistry {
|
||||
}
|
||||
if (!stillActive)
|
||||
synchronized (_selectors) { _selectors.remove(sel); }
|
||||
synchronized (_activeMessages) { _activeMessages.remove(msg); }
|
||||
_activeMessages.remove(msg);
|
||||
}
|
||||
|
||||
/** @deprecated unused */
|
||||
@ -293,16 +297,12 @@ public class OutboundMessageRegistry {
|
||||
}
|
||||
}
|
||||
if (msg != null) {
|
||||
synchronized (_activeMessages) {
|
||||
_activeMessages.remove(msg);
|
||||
}
|
||||
_activeMessages.remove(msg);
|
||||
Job fail = msg.getOnFailedReplyJob();
|
||||
if (fail != null)
|
||||
_context.jobQueue().addJob(fail);
|
||||
} else if (msgs != null) {
|
||||
synchronized (_activeMessages) {
|
||||
_activeMessages.removeAll(msgs);
|
||||
}
|
||||
_activeMessages.removeAll(msgs);
|
||||
for (OutNetMessage m : msgs) {
|
||||
Job fail = m.getOnFailedReplyJob();
|
||||
if (fail != null)
|
||||
|
Reference in New Issue
Block a user