From 0ce4811decbfc8297181747dccd19d20530d59e5 Mon Sep 17 00:00:00 2001 From: zzz Date: Sun, 4 Aug 2019 16:28:41 +0000 Subject: [PATCH] SSU: Fix stall when higher-priority message is added to queue (ticket #2582) --- history.txt | 6 ++++ .../src/net/i2p/router/RouterVersion.java | 2 +- .../i2p/router/transport/udp/PeerState.java | 35 +++++++++++++------ 3 files changed, 31 insertions(+), 12 deletions(-) diff --git a/history.txt b/history.txt index bcb9b10b4f..4e443dd4a1 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,9 @@ +2019-08-04 zzz + * SSU: Fix stall when higher-priority message is queued (ticket #2582) + +2019-08-03 zzz + * Transport: Allow local addresses when configured + 2019-08-02 zzz * Router: Fix Bloom filter false positives diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 4e28a2c8d2..c10128fe62 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 5; + public final static long BUILD = 6; /** for example "-test" */ public final static String EXTRA = ""; diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index 81ab8b7a81..4b71002cab 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -1482,7 +1482,10 @@ public class PeerState { _log.debug("Adding to " + _remotePeer + ": " + state.getMessageId()); int rv = 0; // will never fail for CDPQ - boolean fail = !_outboundQueue.offer(state); + boolean fail; + synchronized (_outboundQueue) { + fail = !_outboundQueue.offer(state); + } /**** synchronized (_outboundMessages) { rv = _outboundMessages.size() + 1; @@ -1555,7 +1558,9 @@ public class PeerState { _outboundMessages.clear(); } //_outboundQueue.drainAllTo(tempList); - _outboundQueue.drainTo(tempList); + synchronized (_outboundQueue) { + _outboundQueue.drainTo(tempList); + } for (OutboundMessageState oms : tempList) { _transport.failed(oms, false); } @@ -1710,19 +1715,27 @@ public class PeerState { // If so, pull it off, put it in _outbundMessages, test // again for bandwidth if necessary, and return it. OutboundMessageState state; - while ((state = _outboundQueue.peek()) != null && - ShouldSend.YES == locked_shouldSend(state)) { - // we could get a different state, or null, when we poll, - // due to AQM drops, so we test again if necessary - OutboundMessageState dequeuedState = _outboundQueue.poll(); - if (dequeuedState != null) { - _outboundMessages.add(dequeuedState); - if (dequeuedState == state || ShouldSend.YES == locked_shouldSend(state)) { + synchronized (_outboundQueue) { + while ((state = _outboundQueue.peek()) != null && + ShouldSend.YES == locked_shouldSend(state)) { + // This is guaranted to be the same as what we got in peek(), + // due to locking and because we aren't using the dropping CDPBQ. + // If we do switch to CDPBQ, + // we could get a different state, or null, when we poll, + // due to AQM drops, so we test again if necessary + OutboundMessageState dequeuedState = _outboundQueue.poll(); + if (dequeuedState != null) { + _outboundMessages.add(dequeuedState); + // TODO if we switch to CDPBQ, see ticket #2582 + //if (dequeuedState != state) { + // // ignore result, always send? + // locked_shouldSend(dequeuedState); + //} if (_log.shouldLog(Log.DEBUG)) _log.debug("Allocate sending (NEW) to " + _remotePeer + ": " + dequeuedState.getMessageId()); if (rv == null) rv = new ArrayList(MAX_ALLOCATE_SEND); - rv.add(state); + rv.add(dequeuedState); if (rv.size() >= MAX_ALLOCATE_SEND) return rv; }