Streaming:

Fix slow start (ticket #2708)
Reset retransmission timer after ack (ticket #2710)
Minor cleanups to prep for additional changes
Original analysis and patches from zlatinb
This commit is contained in:
zzz
2020-04-14 12:59:26 +00:00
parent 0d2dbcc8fc
commit 7b47d3f314
7 changed files with 70 additions and 29 deletions

View File

@ -53,6 +53,7 @@ class Connection {
private final AtomicInteger _unackedPacketsReceived = new AtomicInteger();
private long _congestionWindowEnd;
private volatile long _highestAckedThrough;
private volatile int _ssthresh;
private final boolean _isInbound;
private boolean _updatedShareOpts;
/** Packet ID (Long) to PacketLocal for sent but unacked packets */
@ -67,10 +68,9 @@ class Connection {
private final AtomicLong _disconnectScheduledOn = new AtomicLong();
private long _lastReceivedOn;
private final ActivityTimer _activityTimer;
/** window size when we last saw congestion */
private int _lastCongestionSeenAt;
private long _lastCongestionTime;
private volatile long _lastCongestionHighestUnacked;
private volatile long _nextRetransmitTime;
/** has the other side choked us? */
private volatile boolean _isChoked;
/** are we choking the other side? */
@ -156,7 +156,7 @@ class Connection {
_createdOn = _context.clock().now();
_congestionWindowEnd = _options.getWindowSize()-1;
_highestAckedThrough = -1;
_lastCongestionSeenAt = MAX_WINDOW_SIZE*2; // lets allow it to grow
_ssthresh = _options.getMaxWindowSize();
_lastCongestionTime = -1;
_lastCongestionHighestUnacked = -1;
_lastReceivedOn = -1;
@ -170,6 +170,13 @@ class Connection {
if (_log.shouldLog(Log.INFO))
_log.info("New connection created with options: " + _options);
}
/**
* @since 0.9.46
*/
int getSSThresh() {
return _ssthresh;
}
public long getNextOutboundPacketNum() {
return _lastSendId.incrementAndGet();
@ -557,8 +564,10 @@ class Connection {
}
_outboundPackets.notifyAll();
}
if ((acked != null) && (!acked.isEmpty()) )
if ((acked != null) && (!acked.isEmpty()) ) {
_ackSinceCongestion.set(true);
_nextRetransmitTime = _context.clock().now() + getOptions().getRTO();
}
return acked;
}
@ -1137,13 +1146,10 @@ class Connection {
return (_lastSendTime > _lastReceivedOn ? _lastSendTime : _lastReceivedOn);
}
public int getLastCongestionSeenAt() { return _lastCongestionSeenAt; }
private void congestionOccurred() {
// if we hit congestion and e.g. 5 packets are resent,
// dont set the size to (winSize >> 4). only set the
if (_ackSinceCongestion.compareAndSet(true,false)) {
_lastCongestionSeenAt = _options.getWindowSize();
_lastCongestionTime = _context.clock().now();
_lastCongestionHighestUnacked = _lastSendId.get();
}
@ -1383,7 +1389,7 @@ class Connection {
buf.append(" sent: ").append(1 + _lastSendId.get());
buf.append(" rcvd: ").append(1 + _inputStream.getHighestBlockId() - missing);
buf.append(" ackThru ").append(_highestAckedThrough);
buf.append(" ssThresh ").append(_ssthresh);
buf.append(" maxWin ").append(getOptions().getMaxWindowSize());
buf.append(" MTU ").append(getOptions().getMaxMessageSize());
@ -1420,6 +1426,7 @@ class Connection {
class ResendPacketEvent extends SimpleTimer2.TimedEvent {
private final PacketLocal _packet;
private long _nextSend;
private boolean _fastRetransmit;
public ResendPacketEvent(PacketLocal packet, long delay) {
super(_timer);
@ -1433,6 +1440,14 @@ class Connection {
public void timeReached() { retransmit(); }
/**
* @since 0.9.46
*/
void fastRetransmit() {
_fastRetransmit = true;
reschedule(0);
}
/**
* Retransmit the packet if we need to.
*
@ -1452,6 +1467,16 @@ class Connection {
_packet.cancelled();
return false;
}
long now = _context.clock().now();
long nextRetransmitTime = _nextRetransmitTime;
if (nextRetransmitTime > now && !_fastRetransmit) {
long delay = nextRetransmitTime - now;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Resend time reached but will be delayed " + delay + " for packet " + _packet);
forceReschedule(delay);
return false;
}
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Resend period reached for " + _packet);
@ -1469,8 +1494,7 @@ class Connection {
resend = true;
}
if ( (resend) && (_packet.getAckTime() <= 0) ) {
boolean fastRetransmit = ( (_packet.getNACKs() >= FAST_RETRANSMIT_THRESHOLD) && (_packet.getNumSends() == 1));
if ( (!isLowest) && (!fastRetransmit) ) {
if ( (!isLowest) && (!_fastRetransmit) ) {
// we want to resend this packet, but there are already active
// resends in the air and we dont want to make a bad situation
// worse. wait another second
@ -1487,7 +1511,7 @@ class Connection {
// It's the lowest, or it's fast retransmit time. Resend the packet.
if (fastRetransmit)
if (_fastRetransmit)
_context.statManager().addRateData("stream.fastRetransmit", _packet.getLifetime(), _packet.getLifetime());
// revamp various fields, in case we need to ack more, etc
@ -1536,6 +1560,11 @@ class Connection {
getOptions().doubleRTO();
getOptions().setWindowSize(newWindowSize);
if (_packet.getNumSends() == 1) {
int flightSize = getUnackedPacketsSent();
_ssthresh = Math.max( flightSize / 2, 2 );
}
if (_log.shouldLog(Log.INFO))
_log.info("Congestion, resending packet " + _packet.getSequenceNum() + " (new windowSize " + newWindowSize
+ "/" + getOptions().getWindowSize() + ") for " + Connection.this.toString());
@ -1589,7 +1618,7 @@ class Connection {
_activeResends.incrementAndGet();
if (_log.shouldLog(Log.INFO))
_log.info("Resent packet " +
(fastRetransmit ? "(fast) " : "(timeout) ") +
(_fastRetransmit ? "(fast) " : "(timeout) ") +
_packet +
" next resend in " + timeout + "ms" +
" activeResends: " + _activeResends +
@ -1598,6 +1627,7 @@ class Connection {
+ (_context.clock().now() - _packet.getCreatedOn()) + "ms)");
_unackedPacketsReceived.set(0);
_lastSendTime = _context.clock().now();
_fastRetransmit = false;
// timer reset added 0.9.1
resetActivityTimer();
}

View File

@ -122,7 +122,6 @@ class ConnectionManager {
_context.statManager().createRateStat("stream.con.lifetimeDupMessagesSent", "How many duplicate messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.con.lifetimeDupMessagesReceived", "How many duplicate messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.con.lifetimeRTT", "What is the final RTT when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.con.lifetimeCongestionSeenAt", "When was the last congestion seen at when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.con.lifetimeSendWindowSize", "What is the final send window size when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.receiveActive", "How many streams are active when a new one is received (period being not yet dropped)", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
// Stats for Connection
@ -765,7 +764,6 @@ class ConnectionManager {
_context.statManager().addRateData("stream.con.lifetimeDupMessagesSent", con.getLifetimeDupMessagesSent(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeDupMessagesReceived", con.getLifetimeDupMessagesReceived(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeRTT", con.getOptions().getRTT(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeCongestionSeenAt", con.getLastCongestionSeenAt(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeSendWindowSize", con.getOptions().getWindowSize(), con.getLifetime());
if (I2PSocketManagerFull.pcapWriter != null)
I2PSocketManagerFull.pcapWriter.flush();

View File

@ -204,9 +204,11 @@ class ConnectionPacketHandler {
// see tickets 1939 and 2584
con.setNextSendTime(_context.clock().now() + IMMEDIATE_ACK_DELAY);
} else {
int delay = con.getOptions().getSendAckDelay();
int delay;
if (packet.isFlagSet(Packet.FLAG_DELAY_REQUESTED)) // delayed ACK requested
delay = packet.getOptionalDelay();
else
delay = con.getOptions().getSendAckDelay();
con.setNextSendTime(delay + _context.clock().now());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Scheduling ack in " + delay + "ms for received packet " + packet);
@ -432,8 +434,8 @@ class ConnectionPacketHandler {
_context.statManager().addRateData("stream.trend", trend, newWindowSize);
if ( (!congested) && (acked > 0) && (numResends <= 0) ) {
if (newWindowSize < con.getLastCongestionSeenAt() / 2) {
// Don't make this <= LastCongestion/2 or we'll jump right back to where we were
int ssthresh = con.getSSThresh();
if (newWindowSize < ssthresh) {
// slow start - exponential growth
// grow acked/N times (where N = the slow start factor)
// always grow at least 1
@ -446,7 +448,7 @@ class ConnectionPacketHandler {
if (newWindowSize >= MAX_SLOW_START_WINDOW)
newWindowSize++;
else
newWindowSize = Math.min(MAX_SLOW_START_WINDOW, newWindowSize + acked);
newWindowSize = Math.min(ssthresh, newWindowSize + acked);
} else if (acked < factor)
newWindowSize++;
else
@ -483,8 +485,8 @@ class ConnectionPacketHandler {
con.setCongestionWindowEnd(newWindowSize + lowest);
if (_log.shouldLog(Log.INFO))
_log.info("New window size " + newWindowSize + "/" + oldWindow + "/" + con.getOptions().getWindowSize() + " congestionSeenAt: "
+ con.getLastCongestionSeenAt() + " (#resends: " + numResends
_log.info("New window size " + newWindowSize + "/" + oldWindow + "/" + con.getOptions().getWindowSize()
+ " (#resends: " + numResends
+ ") for " + con);
} else {
if (_log.shouldLog(Log.DEBUG))

View File

@ -13,7 +13,6 @@ import net.i2p.data.SessionTag;
import net.i2p.data.SigningPrivateKey;
import net.i2p.client.streaming.I2PSocketException;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer2;
/**
* This is the class used for outbound packets.
@ -36,7 +35,7 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
private long _cancelledOn;
private final AtomicInteger _nackCount = new AtomicInteger();
private volatile boolean _retransmitted;
private volatile SimpleTimer2.TimedEvent _resendEvent;
private volatile Connection.ResendPacketEvent _resendEvent;
/** not bound to a connection */
public PacketLocal(I2PAppContext ctx, Destination to, I2PSession session) {
@ -133,13 +132,14 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
public long getCreatedOn() { return _createdOn; }
public long getLifetime() { return _context.clock().now() - _createdOn; }
public void incrementSends() {
_numSends.incrementAndGet();
_lastSend = _context.clock().now();
}
private void cancelResend() {
SimpleTimer2.TimedEvent ev = _resendEvent;
Connection.ResendPacketEvent ev = _resendEvent;
if (ev != null)
ev.cancel();
}
@ -166,7 +166,7 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
_log.debug("Cancelled! " + toString(), new Exception("cancelled"));
}
public SimpleTimer2.TimedEvent getResendEvent() { return _resendEvent; }
public Connection.ResendPacketEvent getResendEvent() { return _resendEvent; }
/** how long after packet creation was it acked?
* @return how long after packet creation the packet was ACKed in ms
@ -177,6 +177,7 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
else
return (int)(_ackOn - _createdOn);
}
public int getNumSends() { return _numSends.get(); }
public long getLastSend() { return _lastSend; }
@ -189,11 +190,11 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
*/
public void incrementNACKs() {
final int cnt = _nackCount.incrementAndGet();
SimpleTimer2.TimedEvent evt = _resendEvent;
Connection.ResendPacketEvent evt = _resendEvent;
if (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD && evt != null && (!_retransmitted) &&
(_numSends.get() == 1 || _lastSend < _context.clock().now() - 4*1000)) { // Don't fast retx if we recently resent it
_retransmitted = true;
evt.reschedule(0);
evt.fastRetransmit();
// the predicate used to be '+', changing to '-' --zab
if (_log.shouldLog(Log.DEBUG)) {
@ -209,9 +210,10 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
_log.debug(log);
}
}
public int getNACKs() { return _nackCount.get(); }
public void setResendPacketEvent(SimpleTimer2.TimedEvent evt) { _resendEvent = evt; }
public void setResendPacketEvent(Connection.ResendPacketEvent evt) { _resendEvent = evt; }
/**
* Sign and write the packet to the buffer (starting at the offset) and return

View File

@ -117,7 +117,7 @@ class PacketQueue implements SendMessageStatusListener, Closeable {
// this should not block!
begin = _context.clock().now();
long expires = 0;
Connection.ResendPacketEvent rpe = (Connection.ResendPacketEvent) packet.getResendEvent();
Connection.ResendPacketEvent rpe = packet.getResendEvent();
if (rpe != null) {
// we want the router to expire it a little before we do,
// so if we retransmit it will use a new tunnel/lease combo