SSU: Redesign of the congestion control (tickets #2412, #2649, #2654, #2713),

modelled on TCP Reno (RFCs 5681 and 6298)
- Use a single timer per connection
- Resend up to half the un-acked messages per timer event instead of a single message
- Only send either old or new messages, do not mix
- Cache/avoid several timer calls
- Instead of 3 return values, allocating bandwidth is now a boolean function
- Avoid one of the iterations over all un-acked messages every packet pusher loop
- Remove 100 ms failsafe
- Fix OMF debug log NPE
With the same cpu usage the bandwidth is much higher
Significant speed improvement for lossy connections (e.g. wifi)
Patch by zlatinb
This commit is contained in:
zzz
2020-10-21 18:14:51 +00:00
parent ee27bc3bbf
commit 49565a99f9
6 changed files with 140 additions and 164 deletions

View File

@ -2,7 +2,9 @@
* NetDB:
- ECIES router support for encrypted lookups and stores (proposal #156)
- Reseed after a long downtime
* SSU: Increase socket buffer size (ticket #2781)
* SSU:
- Increase socket buffer size (ticket #2781)
- Redesign of the congestion control (tickets #2412, #2649, #2654, #2713)
2020-10-17 zzz
* i2psnark: Remove references to "maggot" links

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 8;
public final static long BUILD = 9;
/** for example "-test" */
public final static String EXTRA = "";

View File

@ -294,17 +294,17 @@ class OutboundMessageFragments {
// race with add()
_iterator.remove();
if (_log.shouldLog(Log.DEBUG))
_log.debug("No more pending messages for " + peer.getRemotePeer());
_log.debug("No more pending messages for " + p.getRemotePeer());
continue;
}
peersProcessed++;
states = p.allocateSend();
states = p.allocateSend(now);
if (states != null) {
peer = p;
// we have something to send and we will be returning it
break;
}
int delay = p.getNextDelay();
int delay = p.getNextDelay(now);
if (delay < nextSendDelay)
nextSendDelay = delay;
@ -371,6 +371,16 @@ class OutboundMessageFragments {
return packets;
}
/**
* Wakes up the packet pusher thread.
* @since 0.9.48
*/
void nudge() {
synchronized(_activePeers) {
_activePeers.notify();
}
}
/**
* @return null if state or peer is null
*/

View File

@ -31,7 +31,6 @@ class OutboundMessageState implements CDPQEntry {
private long _fragmentAcks;
private final int _numFragments;
private final long _startedOn;
private long _nextSendTime;
private int _pushCount;
private int _maxSends;
// we can't use the ones in _message since it is null for injections
@ -77,7 +76,6 @@ class OutboundMessageState implements CDPQEntry {
_i2npMessage = msg;
_peer = peer;
_startedOn = _context.clock().now();
_nextSendTime = _startedOn;
_expiration = _startedOn + EXPIRATION;
//_expiration = msg.getExpiration();
@ -166,9 +164,6 @@ class OutboundMessageState implements CDPQEntry {
return isComplete();
}
public long getNextSendTime() { return _nextSendTime; }
public void setNextSendTime(long when) { _nextSendTime = when; }
/**
* The max number of sends for any fragment, which is the
* same as the push count, at least as it's coded now.

View File

@ -219,8 +219,8 @@ public class PeerState {
//private final CoDelPriorityBlockingQueue<OutboundMessageState> _outboundQueue;
private final PriBlockingQueue<OutboundMessageState> _outboundQueue;
/** which outbound message is currently being retransmitted */
private OutboundMessageState _retransmitter;
/** when the retransmit timer is about to trigger */
private long _retransmitTimer;
private final UDPTransport _transport;
@ -246,9 +246,6 @@ public class PeerState {
private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES;
private static final int MAX_SEND_WINDOW_BYTES = 1024*1024;
/** max number of msgs returned from allocateSend() */
private static final int MAX_ALLOCATE_SEND = 2;
/**
* Was 32 before 0.9.2, but since the streaming lib goes up to 128,
* we would just drop our own msgs right away during slow start.
@ -693,15 +690,14 @@ public class PeerState {
*
* Caller should synch
*/
private boolean allocateSendingBytes(int size, int messagePushCount) {
return allocateSendingBytes(size, false, messagePushCount);
private boolean allocateSendingBytes(int size, int messagePushCount, long now) {
return allocateSendingBytes(size, false, messagePushCount, now);
}
/**
* Caller should synch
*/
private boolean allocateSendingBytes(int size, boolean isForACK, int messagePushCount) {
long now = _context.clock().now();
private boolean allocateSendingBytes(int size, boolean isForACK, int messagePushCount, long now) {
long duration = now - _lastSendRefill;
if (duration >= 1000) {
_sendWindowBytesRemaining = _sendWindowBytes;
@ -928,6 +924,14 @@ public class PeerState {
_sendWindowBytes = MINIMUM_WINDOW_BYTES;
//if (congestionAt/2 < _slowStartThreshold)
_slowStartThreshold = congestionAt/2;
int oldRto = _rto;
long oldTimer = _retransmitTimer - now;
_rto = Math.min(MAX_RTO, Math.max(minRTO(), _rto << 1 ));
_retransmitTimer = now + _rto;
if (_log.shouldLog(Log.DEBUG))
_log.debug(_remotePeer + " Congestion, RTO: " + oldRto + " -> " + _rto + " timer: " + oldTimer + " -> " + (_retransmitTimer - now));
return true;
}
@ -1186,7 +1190,7 @@ public class PeerState {
* We sent a message which was ACKed containing the given # of bytes.
* Caller should synch on this
*/
private void locked_messageACKed(int bytesACKed, long lifetime, int numSends) {
private void locked_messageACKed(int bytesACKed, long lifetime, int numSends, boolean anyPending) {
_consecutiveFailedSends = 0;
// _lastFailedSendPeriod = -1;
if (numSends < 2) {
@ -1231,17 +1235,31 @@ public class PeerState {
adjustMTU();
//}
}
if (!anyPending) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(_remotePeer + " nothing pending, cancelling timer");
_retransmitTimer = 0;
} else {
// any time new data gets acked, push out the timer
long now = _context.clock().now();
long oldTimer = _retransmitTimer - now;
_retransmitTimer = now + getRTO();
if (_log.shouldLog(Log.DEBUG))
_log.debug(_remotePeer + " ACK, timer: " + oldTimer + " -> " + (_retransmitTimer - now));
}
_transport.getOMF().nudge();
}
/**
* We sent a message which was ACKed containing the given # of bytes.
*/
private void messageACKed(int bytesACKed, long lifetime, int numSends) {
private void messageACKed(int bytesACKed, long lifetime, int numSends, boolean anyPending) {
synchronized(this) {
locked_messageACKed(bytesACKed, lifetime, numSends);
locked_messageACKed(bytesACKed, lifetime, numSends, anyPending);
}
if (numSends >= 2 && _log.shouldLog(Log.INFO))
_log.info("acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed);
_log.info(_remotePeer + " acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed);
_context.statManager().addRateData("udp.sendBps", _sendBps);
}
@ -1548,7 +1566,6 @@ public class PeerState {
List<OutboundMessageState> tempList;
synchronized (_outboundMessages) {
_retransmitter = null;
tempList = new ArrayList<OutboundMessageState>(_outboundMessages);
_outboundMessages.clear();
}
@ -1610,21 +1627,15 @@ public class PeerState {
OutboundMessageState state = iter.next();
if (state.isComplete()) {
iter.remove();
if (_retransmitter == state)
_retransmitter = null;
if (succeeded == null) succeeded = new ArrayList<OutboundMessageState>(4);
succeeded.add(state);
} else if (state.isExpired(now)) {
iter.remove();
if (_retransmitter == state)
_retransmitter = null;
_context.statManager().addRateData("udp.sendFailed", state.getPushCount());
if (failed == null) failed = new ArrayList<OutboundMessageState>(4);
failed.add(state);
} else if (state.getPushCount() > OutboundMessageFragments.MAX_VOLLEYS) {
iter.remove();
if (state == _retransmitter)
_retransmitter = null;
_context.statManager().addRateData("udp.sendAggressiveFailed", state.getPushCount());
if (failed == null) failed = new ArrayList<OutboundMessageState>(4);
failed.add(state);
@ -1660,59 +1671,84 @@ public class PeerState {
/**
* Pick one or more messages we want to send and allocate them out of our window
* Adjusts the retransmit timer if necessary.
* High usage -
* OutboundMessageFragments.getNextVolley() calls this 2nd, if finishMessages() returned &gt; 0.
* TODO combine finishMessages(), allocateSend(), and getNextDelay() so we don't iterate 3 times.
* TODO combine finishMessages() and allocateSend() so we don't iterate 2 times.
*
* @return allocated messages to send (never empty), or null if no messages or no resources
*/
List<OutboundMessageState> allocateSend() {
List<OutboundMessageState> allocateSend(long now) {
long retransmitTimer;
synchronized(this) {
retransmitTimer = _retransmitTimer;
}
List<OutboundMessageState> rv = allocateSend2(retransmitTimer > 0 && now >= retransmitTimer, now);
if (rv != null && !rv.isEmpty()) {
synchronized(this) {
long old = _retransmitTimer;
if (_retransmitTimer == 0)
_retransmitTimer = now + getRTO();
if (_log.shouldLog(Log.DEBUG))
_log.debug(_remotePeer + " allocated " + rv.size() + " pushing retransmitter from " + old + " to " + _retransmitTimer);
}
}
return rv;
}
/**
* Pick one or more messages to send. This will alloace either old or new messages, but not both.
* @param canSendOld if any already sent messages can be sent. If false, only new messages will be considered
* @param now what time is it now
* @since 0.9.48
*/
private List<OutboundMessageState> allocateSend2(boolean canSendOld, long now) {
if (_dead) return null;
List<OutboundMessageState> rv = null;
synchronized (_outboundMessages) {
for (OutboundMessageState state : _outboundMessages) {
// We have 3 return values, because if allocateSendingBytes() returns false,
// then we can stop iterating.
ShouldSend should = locked_shouldSend(state);
if (should == ShouldSend.YES) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allocate sending (OLD) to " + _remotePeer + ": " + state.getMessageId());
/*
while (iter.hasNext()) {
OutboundMessageState later = (OutboundMessageState)iter.next();
OutNetMessage msg = later.getMessage();
if (msg != null)
msg.timestamp("not reached for allocation " + msgs.size() + " other peers");
}
*/
if (rv == null)
rv = new ArrayList<OutboundMessageState>(MAX_ALLOCATE_SEND);
rv.add(state);
if (rv.size() >= MAX_ALLOCATE_SEND)
if (canSendOld) {
for (OutboundMessageState state : _outboundMessages) {
boolean should = locked_shouldSend(state, now);
if (should) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allocate sending (OLD) to " + _remotePeer + ": " + state.getMessageId());
/*
while (iter.hasNext()) {
OutboundMessageState later = (OutboundMessageState)iter.next();
OutNetMessage msg = later.getMessage();
if (msg != null)
msg.timestamp("not reached for allocation " + msgs.size() + " other peers");
}
*/
if (rv == null)
rv = new ArrayList<OutboundMessageState>(_outboundMessages.size());
rv.add(state);
if (rv.size() >= _outboundMessages.size() / 2)
return rv;
} else {
// no more bandwidth available
// we don't bother looking for a smaller msg that would fit.
// By not looking further, we keep strict sending order, and that allows
// some efficiency in acked() below.
if (_log.shouldLog(Log.DEBUG)) {
if (rv == null)
_log.debug("Nothing to send (BW) to " + _remotePeer + ", with " + _outboundMessages.size() +
" / " + _outboundQueue.size() + " remaining");
else
_log.debug(_remotePeer + " ran out of BW, but managed to send " + rv.size());
}
return rv;
} else if (should == ShouldSend.NO_BW) {
// no more bandwidth available
// we don't bother looking for a smaller msg that would fit.
// By not looking further, we keep strict sending order, and that allows
// some efficiency in acked() below.
if (rv == null && _log.shouldLog(Log.DEBUG))
_log.debug("Nothing to send (BW) to " + _remotePeer + ", with " + _outboundMessages.size() +
" / " + _outboundQueue.size() + " remaining");
return rv;
} /* else {
OutNetMessage msg = state.getMessage();
if (msg != null)
msg.timestamp("passed over for allocation with " + msgs.size() + " peers");
} */
}
}
return null;
}
// Peek at head of _outboundQueue and see if we can send it.
// If so, pull it off, put it in _outbundMessages, test
// again for bandwidth if necessary, and return it.
OutboundMessageState state;
synchronized (_outboundQueue) {
while ((state = _outboundQueue.peek()) != null &&
ShouldSend.YES == locked_shouldSend(state)) {
locked_shouldSend(state, now)) {
// This is guaranted to be the same as what we got in peek(),
// due to locking and because we aren't using the dropping CDPBQ.
// If we do switch to CDPBQ,
@ -1729,9 +1765,9 @@ public class PeerState {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allocate sending (NEW) to " + _remotePeer + ": " + dequeuedState.getMessageId());
if (rv == null)
rv = new ArrayList<OutboundMessageState>(MAX_ALLOCATE_SEND);
rv = new ArrayList<OutboundMessageState>(_concurrentMessagesAllowed);
rv.add(dequeuedState);
if (rv.size() >= MAX_ALLOCATE_SEND)
if (rv.size() >= _concurrentMessagesAllowed)
return rv;
}
}
@ -1739,39 +1775,27 @@ public class PeerState {
}
if ( rv == null && _log.shouldLog(Log.DEBUG))
_log.debug("Nothing to send to " + _remotePeer + ", with " + _outboundMessages.size() +
" / " + _outboundQueue.size() + " remaining");
" / " + _outboundQueue.size() + " remaining, rtx timer in " + (_retransmitTimer - _context.clock().now()));
return rv;
}
/**
* High usage -
* OutboundMessageFragments.getNextVolley() calls this 3rd, if allocateSend() returned null.
* TODO combine finishMessages(), allocateSend(), and getNextDelay() so we don't iterate 3 times.
* TODO combine finishMessages(), allocateSend() so we don't iterate 2 times.
*
* @param now what time it is now
* @return how long to wait before sending, or Integer.MAX_VALUE if we have nothing to send.
* If ready now, will return 0 or a negative value.
*/
int getNextDelay() {
int getNextDelay(long now) {
int rv = Integer.MAX_VALUE;
if (_dead) return rv;
long now = _context.clock().now();
synchronized (_outboundMessages) {
if (_retransmitter != null) {
rv = (int)(_retransmitter.getNextSendTime() - now);
return rv;
}
for (OutboundMessageState state : _outboundMessages) {
int delay = (int)(state.getNextSendTime() - now);
// short circuit once we hit something ready to go
if (delay <= 0)
return delay;
if (delay < rv)
rv = delay;
}
synchronized(this) {
if (_retransmitTimer >= now)
return (int) (_retransmitTimer - now);
}
// failsafe... is this OK?
if (rv > 100 && !_outboundQueue.isEmpty())
rv = 100;
return rv;
}
@ -1782,20 +1806,6 @@ public class PeerState {
return _dead || _outboundQueue.isBacklogged();
}
/**
* If set to true, we should throttle retransmissions of all but the first message in
* flight to a peer. If set to false, we will only throttle the initial flight of a
* message to a peer while a retransmission is going on.
*/
private static final boolean THROTTLE_RESENDS = true;
/**
* if true, throttle the initial volley of a message if there is a resend in progress.
* if false, always send the first volley, regardless of retransmissions (but keeping in
* mind bw/cwin throttle, etc)
*
*/
private static final boolean THROTTLE_INITIAL_SEND = true;
/**
* Always leave room for this many explicit acks.
* Only for data packets. Does not affect ack-only packets.
@ -1817,88 +1827,42 @@ public class PeerState {
MIN_ACK_SIZE;
}
private enum ShouldSend { YES, NO, NO_BW };
/**
* Have 3 return values, because if allocateSendingBytes() returns false,
* then allocateSend() can stop iterating
*
* Caller should synch
*/
private ShouldSend locked_shouldSend(OutboundMessageState state) {
long now = _context.clock().now();
if (state.getNextSendTime() <= now) {
OutboundMessageState retrans = _retransmitter;
if ( (retrans != null) && ( (retrans.isExpired(now) || retrans.isComplete()) ) ) {
_retransmitter = null;
retrans = null;
}
if ( (retrans != null) && (retrans != state) ) {
// choke it, since there's already another message retransmitting to this
// peer.
_context.statManager().addRateData("udp.blockedRetransmissions", _packetsRetransmitted);
int max = state.getMaxSends();
if ( (max <= 0) && (!THROTTLE_INITIAL_SEND) ) {
//if (state.getMessage() != null)
// state.getMessage().timestamp("another message is retransmitting, but we want to send our first volley...");
} else if ( (max <= 0) || (THROTTLE_RESENDS) ) {
//if (state.getMessage() != null)
// state.getMessage().timestamp("choked, with another message retransmitting");
return ShouldSend.NO;
} else {
//if (state.getMessage() != null)
// state.getMessage().timestamp("another message is retransmitting, but since we've already begun sending...");
}
}
private boolean locked_shouldSend(OutboundMessageState state, long now) {
int size = state.getUnackedSize();
if (allocateSendingBytes(size, state.getPushCount())) {
if (allocateSendingBytes(size, state.getPushCount(), now)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allocation of " + size + " allowed with "
_log.debug(_remotePeer + " Allocation of " + size + " allowed with "
+ getSendWindowBytesRemaining()
+ "/" + getSendWindowBytes()
+ " remaining"
+ " for message " + state.getMessageId() + ": " + state);
int rto = getRTO();
if (state.getPushCount() > 0) {
_retransmitter = state;
rto = Math.min(MAX_RTO, rto << state.getPushCount()); // Section 5.5 RFC 6298
}
if (state.push())
_messagesSent++;
// messages with multiple fragments need more time
state.setNextSendTime(now + rto + ((state.getFragmentCount() - 1) * ACKSender.ACK_FREQUENCY));
//if (peer.getSendWindowBytesRemaining() > 0)
// _throttle.unchoke(peer.getRemotePeer());
return ShouldSend.YES;
return true;
} else {
_context.statManager().addRateData("udp.sendRejected", state.getPushCount());
//if (state.getMessage() != null)
// state.getMessage().timestamp("send rejected, available=" + getSendWindowBytesRemaining());
if (_log.shouldLog(Log.INFO))
_log.info("Allocation of " + size + " rejected w/ wsize=" + getSendWindowBytes()
_log.info(_remotePeer + " Allocation of " + size + " rejected w/ wsize=" + getSendWindowBytes()
+ " available=" + getSendWindowBytesRemaining()
+ " for message " + state.getMessageId() + ": " + state);
state.setNextSendTime(now + (ACKSender.ACK_FREQUENCY / 2) +
_context.random().nextInt(ACKSender.ACK_FREQUENCY)); //(now + 1024) & ~SECOND_MASK);
if (_log.shouldLog(Log.INFO))
_log.info("Retransmit after choke for next send time in " + (state.getNextSendTime()-now) + "ms");
//_throttle.choke(peer.getRemotePeer());
//if (state.getMessage() != null)
// state.getMessage().timestamp("choked, not enough available, wsize="
// + getSendWindowBytes() + " available="
// + getSendWindowBytesRemaining());
return ShouldSend.NO_BW;
return false;
}
} // nextTime <= now
return ShouldSend.NO;
}
/**
@ -1910,6 +1874,7 @@ public class PeerState {
boolean acked(long messageId) {
if (_dead) return false;
OutboundMessageState state = null;
boolean anyPending;
synchronized (_outboundMessages) {
for (Iterator<OutboundMessageState> iter = _outboundMessages.iterator(); iter.hasNext(); ) {
state = iter.next();
@ -1925,8 +1890,7 @@ public class PeerState {
state = null;
}
}
if ( (state != null) && (state == _retransmitter) )
_retransmitter = null;
anyPending = !_outboundMessages.isEmpty();
}
if (state != null) {
@ -1948,7 +1912,7 @@ public class PeerState {
_context.statManager().addRateData("udp.sendConfirmVolley", numSends);
_transport.succeeded(state);
// this adjusts the rtt/rto/window/etc
messageACKed(state.getMessageSize(), state.getLifetime(), numSends);
messageACKed(state.getMessageSize(), state.getLifetime(), numSends, anyPending);
//if (getSendWindowBytesRemaining() > 0)
// _throttle.unchoke(peer.getRemotePeer());
@ -1976,6 +1940,7 @@ public class PeerState {
OutboundMessageState state = null;
boolean isComplete = false;
boolean anyPending;
synchronized (_outboundMessages) {
for (Iterator<OutboundMessageState> iter = _outboundMessages.iterator(); iter.hasNext(); ) {
state = iter.next();
@ -1984,8 +1949,6 @@ public class PeerState {
if (complete) {
isComplete = true;
iter.remove();
if (state == _retransmitter)
_retransmitter = null;
}
break;
} else if (state.getPushCount() <= 0) {
@ -1997,6 +1960,7 @@ public class PeerState {
state = null;
}
}
anyPending = !_outboundMessages.isEmpty();
}
if (state != null) {
@ -2020,7 +1984,7 @@ public class PeerState {
_transport.succeeded(state);
// this adjusts the rtt/rto/window/etc
messageACKed(state.getMessageSize(), state.getLifetime(), numSends);
messageACKed(state.getMessageSize(), state.getLifetime(), numSends, anyPending);
//if (state.getPeer().getSendWindowBytesRemaining() > 0)
// _throttle.unchoke(state.getPeer().getRemotePeer());
@ -2085,13 +2049,10 @@ public class PeerState {
synchronized (oldPeer._outboundMessages) {
tmp2.addAll(oldPeer._outboundMessages);
oldPeer._outboundMessages.clear();
retransmitter = oldPeer._retransmitter;
oldPeer._retransmitter = null;
}
if (!_dead) {
synchronized (_outboundMessages) {
_outboundMessages.addAll(tmp2);
_retransmitter = retransmitter;
}
}
}

View File

@ -345,6 +345,14 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_context.simpleTimer2().addPeriodicEvent(new PingIntroducers(), MIN_EXPIRE_TIMEOUT * 3 / 4);
}
/**
* @returns the instance of OutboundMessageFragments
* @since 0.9.48
*/
OutboundMessageFragments getOMF() {
return _fragments;
}
/**
* Pick a port if not previously configured, so that TransportManager may