SSU: Sync/notify improvements (ticket #2260)

This commit is contained in:
zzz
2018-07-10 17:14:14 +00:00
parent 30fc9544fe
commit 02669fafde
4 changed files with 17 additions and 18 deletions

View File

@ -2,6 +2,7 @@
* Installer (ticket #1864): * Installer (ticket #1864):
- Fix wrapper selection on Windows 10 - Fix wrapper selection on Windows 10
- Add support for IzPack 5 - Add support for IzPack 5
* SSU: Sync/notify improvements (ticket #2260)
2018-07-08 zzz 2018-07-08 zzz
* i2psnark: Add comment icon (ticket #2278) * i2psnark: Add comment icon (ticket #2278)

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */ /** deprecated */
public final static String ID = "Monotone"; public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION; public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 8; public final static long BUILD = 9;
/** for example "-test" */ /** for example "-test" */
public final static String EXTRA = ""; public final static String EXTRA = "";

View File

@ -260,7 +260,7 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
// By calling add(), this also is a failsafe against possible // By calling add(), this also is a failsafe against possible
// races in OutboundMessageFragments. // races in OutboundMessageFragments.
if (newAck && from.getOutboundMessageCount() > 0) if (newAck && from.getOutboundMessageCount() > 0)
_outbound.add(from); _outbound.add(from, 0);
return rv; return rv;
} }

View File

@ -51,11 +51,6 @@ class OutboundMessageFragments {
*/ */
private Iterator<PeerState> _iterator; private Iterator<PeerState> _iterator;
/**
* Avoid sync in add() if possible (not 100% reliable)
*/
private volatile boolean _isWaiting;
private volatile boolean _alive; private volatile boolean _alive;
private final PacketBuilder _builder; private final PacketBuilder _builder;
@ -104,7 +99,7 @@ class OutboundMessageFragments {
_alive = false; _alive = false;
_activePeers.clear(); _activePeers.clear();
synchronized (_activePeers) { synchronized (_activePeers) {
_activePeers.notifyAll(); _activePeers.notify();
} }
} }
@ -165,7 +160,7 @@ class OutboundMessageFragments {
// will throw IAE if peer == null // will throw IAE if peer == null
OutboundMessageState state = new OutboundMessageState(_context, msg, peer); OutboundMessageState state = new OutboundMessageState(_context, msg, peer);
peer.add(state); peer.add(state);
add(peer); add(peer, state.fragmentSize(0));
} catch (IllegalArgumentException iae) { } catch (IllegalArgumentException iae) {
_transport.failed(msg, "Peer disconnected quickly"); _transport.failed(msg, "Peer disconnected quickly");
return; return;
@ -182,7 +177,7 @@ class OutboundMessageFragments {
if (peer == null) if (peer == null)
throw new RuntimeException("null peer for " + state); throw new RuntimeException("null peer for " + state);
peer.add(state); peer.add(state);
add(peer); add(peer, state.fragmentSize(0));
//_context.statManager().addRateData("udp.outboundActiveCount", active, 0); //_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
} }
@ -195,10 +190,15 @@ class OutboundMessageFragments {
if (peer == null) if (peer == null)
throw new RuntimeException("null peer"); throw new RuntimeException("null peer");
int sz = states.size(); int sz = states.size();
int min = peer.fragmentSize();
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
peer.add(states.get(i)); OutboundMessageState state = states.get(i);
peer.add(state);
int fsz = state.fragmentSize(0);
if (fsz < min)
min = fsz;
} }
add(peer); add(peer, min);
//_context.statManager().addRateData("udp.outboundActiveCount", active, 0); //_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
} }
@ -211,10 +211,10 @@ class OutboundMessageFragments {
* There are larger chances of adding the PeerState "behind" where * There are larger chances of adding the PeerState "behind" where
* the iterator is now... but these issues are the same as before concurrentification. * the iterator is now... but these issues are the same as before concurrentification.
* *
* @param the minimum size we can send, or 0 to always notify
* @since 0.8.9 * @since 0.8.9
*/ */
public void add(PeerState peer) { public void add(PeerState peer, int size) {
boolean wasEmpty = _activePeers.isEmpty();
boolean added = _activePeers.add(peer); boolean added = _activePeers.add(peer);
if (added) { if (added) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@ -229,9 +229,9 @@ class OutboundMessageFragments {
// no, this doesn't always work. // no, this doesn't always work.
// Also note that the iterator in getNextVolley may have alreay passed us, // Also note that the iterator in getNextVolley may have alreay passed us,
// or not reflect the addition. // or not reflect the addition.
if (_isWaiting || wasEmpty) { if (added || size <= 0 || peer.getSendWindowBytesRemaining() >= size) {
synchronized (_activePeers) { synchronized (_activePeers) {
_activePeers.notifyAll(); _activePeers.notify();
} }
} }
} }
@ -321,7 +321,6 @@ class OutboundMessageFragments {
// if we've gone all the way through the loop, wait // if we've gone all the way through the loop, wait
// ... unless nextSendDelay says we have more ready now // ... unless nextSendDelay says we have more ready now
if (states == null && peersProcessed >= _activePeers.size() && nextSendDelay > 0) { if (states == null && peersProcessed >= _activePeers.size() && nextSendDelay > 0) {
_isWaiting = true;
peersProcessed = 0; peersProcessed = 0;
// why? we do this in the loop one at a time // why? we do this in the loop one at a time
//finishMessages(); //finishMessages();
@ -341,7 +340,6 @@ class OutboundMessageFragments {
_log.debug("Woken up while waiting"); _log.debug("Woken up while waiting");
} }
} }
_isWaiting = false;
//} else { //} else {
// if (_log.shouldLog(Log.DEBUG)) // if (_log.shouldLog(Log.DEBUG))
// _log.debug("dont wait: alive=" + _alive + " state = " + state); // _log.debug("dont wait: alive=" + _alive + " state = " + state);