SSU: Fix stall when higher-priority message is added to queue (ticket #2582)

This commit is contained in:
zzz
2019-08-04 16:28:41 +00:00
parent d3e3ec4d35
commit 0ce4811dec
3 changed files with 31 additions and 12 deletions

View File

@ -1,3 +1,9 @@
2019-08-04 zzz
* SSU: Fix stall when higher-priority message is queued (ticket #2582)
2019-08-03 zzz
* Transport: Allow local addresses when configured
2019-08-02 zzz
* Router: Fix Bloom filter false positives

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 5;
public final static long BUILD = 6;
/** for example "-test" */
public final static String EXTRA = "";

View File

@ -1482,7 +1482,10 @@ public class PeerState {
_log.debug("Adding to " + _remotePeer + ": " + state.getMessageId());
int rv = 0;
// will never fail for CDPQ
boolean fail = !_outboundQueue.offer(state);
boolean fail;
synchronized (_outboundQueue) {
fail = !_outboundQueue.offer(state);
}
/****
synchronized (_outboundMessages) {
rv = _outboundMessages.size() + 1;
@ -1555,7 +1558,9 @@ public class PeerState {
_outboundMessages.clear();
}
//_outboundQueue.drainAllTo(tempList);
_outboundQueue.drainTo(tempList);
synchronized (_outboundQueue) {
_outboundQueue.drainTo(tempList);
}
for (OutboundMessageState oms : tempList) {
_transport.failed(oms, false);
}
@ -1710,19 +1715,27 @@ public class PeerState {
// If so, pull it off, put it in _outbundMessages, test
// again for bandwidth if necessary, and return it.
OutboundMessageState state;
while ((state = _outboundQueue.peek()) != null &&
ShouldSend.YES == locked_shouldSend(state)) {
// we could get a different state, or null, when we poll,
// due to AQM drops, so we test again if necessary
OutboundMessageState dequeuedState = _outboundQueue.poll();
if (dequeuedState != null) {
_outboundMessages.add(dequeuedState);
if (dequeuedState == state || ShouldSend.YES == locked_shouldSend(state)) {
synchronized (_outboundQueue) {
while ((state = _outboundQueue.peek()) != null &&
ShouldSend.YES == locked_shouldSend(state)) {
// This is guaranted to be the same as what we got in peek(),
// due to locking and because we aren't using the dropping CDPBQ.
// If we do switch to CDPBQ,
// we could get a different state, or null, when we poll,
// due to AQM drops, so we test again if necessary
OutboundMessageState dequeuedState = _outboundQueue.poll();
if (dequeuedState != null) {
_outboundMessages.add(dequeuedState);
// TODO if we switch to CDPBQ, see ticket #2582
//if (dequeuedState != state) {
// // ignore result, always send?
// locked_shouldSend(dequeuedState);
//}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allocate sending (NEW) to " + _remotePeer + ": " + dequeuedState.getMessageId());
if (rv == null)
rv = new ArrayList<OutboundMessageState>(MAX_ALLOCATE_SEND);
rv.add(state);
rv.add(dequeuedState);
if (rv.size() >= MAX_ALLOCATE_SEND)
return rv;
}