From 7b47d3f3145c5f961f6abbd9a98ea3669287b9d6 Mon Sep 17 00:00:00 2001 From: zzz Date: Tue, 14 Apr 2020 12:59:26 +0000 Subject: [PATCH] Streaming: Fix slow start (ticket #2708) Reset retransmission timer after ack (ticket #2710) Minor cleanups to prep for additional changes Original analysis and patches from zlatinb --- .../i2p/client/streaming/impl/Connection.java | 54 ++++++++++++++----- .../streaming/impl/ConnectionManager.java | 2 - .../impl/ConnectionPacketHandler.java | 14 ++--- .../client/streaming/impl/PacketLocal.java | 16 +++--- .../client/streaming/impl/PacketQueue.java | 2 +- history.txt | 9 ++++ .../src/net/i2p/router/RouterVersion.java | 2 +- 7 files changed, 70 insertions(+), 29 deletions(-) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java index ed549b8ab2..1a399426b8 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java @@ -53,6 +53,7 @@ class Connection { private final AtomicInteger _unackedPacketsReceived = new AtomicInteger(); private long _congestionWindowEnd; private volatile long _highestAckedThrough; + private volatile int _ssthresh; private final boolean _isInbound; private boolean _updatedShareOpts; /** Packet ID (Long) to PacketLocal for sent but unacked packets */ @@ -67,10 +68,9 @@ class Connection { private final AtomicLong _disconnectScheduledOn = new AtomicLong(); private long _lastReceivedOn; private final ActivityTimer _activityTimer; - /** window size when we last saw congestion */ - private int _lastCongestionSeenAt; private long _lastCongestionTime; private volatile long _lastCongestionHighestUnacked; + private volatile long _nextRetransmitTime; /** has the other side choked us? */ private volatile boolean _isChoked; /** are we choking the other side? */ @@ -156,7 +156,7 @@ class Connection { _createdOn = _context.clock().now(); _congestionWindowEnd = _options.getWindowSize()-1; _highestAckedThrough = -1; - _lastCongestionSeenAt = MAX_WINDOW_SIZE*2; // lets allow it to grow + _ssthresh = _options.getMaxWindowSize(); _lastCongestionTime = -1; _lastCongestionHighestUnacked = -1; _lastReceivedOn = -1; @@ -170,6 +170,13 @@ class Connection { if (_log.shouldLog(Log.INFO)) _log.info("New connection created with options: " + _options); } + + /** + * @since 0.9.46 + */ + int getSSThresh() { + return _ssthresh; + } public long getNextOutboundPacketNum() { return _lastSendId.incrementAndGet(); @@ -557,8 +564,10 @@ class Connection { } _outboundPackets.notifyAll(); } - if ((acked != null) && (!acked.isEmpty()) ) + if ((acked != null) && (!acked.isEmpty()) ) { _ackSinceCongestion.set(true); + _nextRetransmitTime = _context.clock().now() + getOptions().getRTO(); + } return acked; } @@ -1137,13 +1146,10 @@ class Connection { return (_lastSendTime > _lastReceivedOn ? _lastSendTime : _lastReceivedOn); } - public int getLastCongestionSeenAt() { return _lastCongestionSeenAt; } - private void congestionOccurred() { // if we hit congestion and e.g. 5 packets are resent, // dont set the size to (winSize >> 4). only set the if (_ackSinceCongestion.compareAndSet(true,false)) { - _lastCongestionSeenAt = _options.getWindowSize(); _lastCongestionTime = _context.clock().now(); _lastCongestionHighestUnacked = _lastSendId.get(); } @@ -1383,7 +1389,7 @@ class Connection { buf.append(" sent: ").append(1 + _lastSendId.get()); buf.append(" rcvd: ").append(1 + _inputStream.getHighestBlockId() - missing); buf.append(" ackThru ").append(_highestAckedThrough); - + buf.append(" ssThresh ").append(_ssthresh); buf.append(" maxWin ").append(getOptions().getMaxWindowSize()); buf.append(" MTU ").append(getOptions().getMaxMessageSize()); @@ -1420,6 +1426,7 @@ class Connection { class ResendPacketEvent extends SimpleTimer2.TimedEvent { private final PacketLocal _packet; private long _nextSend; + private boolean _fastRetransmit; public ResendPacketEvent(PacketLocal packet, long delay) { super(_timer); @@ -1433,6 +1440,14 @@ class Connection { public void timeReached() { retransmit(); } + /** + * @since 0.9.46 + */ + void fastRetransmit() { + _fastRetransmit = true; + reschedule(0); + } + /** * Retransmit the packet if we need to. * @@ -1452,6 +1467,16 @@ class Connection { _packet.cancelled(); return false; } + + long now = _context.clock().now(); + long nextRetransmitTime = _nextRetransmitTime; + if (nextRetransmitTime > now && !_fastRetransmit) { + long delay = nextRetransmitTime - now; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Resend time reached but will be delayed " + delay + " for packet " + _packet); + forceReschedule(delay); + return false; + } //if (_log.shouldLog(Log.DEBUG)) // _log.debug("Resend period reached for " + _packet); @@ -1469,8 +1494,7 @@ class Connection { resend = true; } if ( (resend) && (_packet.getAckTime() <= 0) ) { - boolean fastRetransmit = ( (_packet.getNACKs() >= FAST_RETRANSMIT_THRESHOLD) && (_packet.getNumSends() == 1)); - if ( (!isLowest) && (!fastRetransmit) ) { + if ( (!isLowest) && (!_fastRetransmit) ) { // we want to resend this packet, but there are already active // resends in the air and we dont want to make a bad situation // worse. wait another second @@ -1487,7 +1511,7 @@ class Connection { // It's the lowest, or it's fast retransmit time. Resend the packet. - if (fastRetransmit) + if (_fastRetransmit) _context.statManager().addRateData("stream.fastRetransmit", _packet.getLifetime(), _packet.getLifetime()); // revamp various fields, in case we need to ack more, etc @@ -1536,6 +1560,11 @@ class Connection { getOptions().doubleRTO(); getOptions().setWindowSize(newWindowSize); + if (_packet.getNumSends() == 1) { + int flightSize = getUnackedPacketsSent(); + _ssthresh = Math.max( flightSize / 2, 2 ); + } + if (_log.shouldLog(Log.INFO)) _log.info("Congestion, resending packet " + _packet.getSequenceNum() + " (new windowSize " + newWindowSize + "/" + getOptions().getWindowSize() + ") for " + Connection.this.toString()); @@ -1589,7 +1618,7 @@ class Connection { _activeResends.incrementAndGet(); if (_log.shouldLog(Log.INFO)) _log.info("Resent packet " + - (fastRetransmit ? "(fast) " : "(timeout) ") + + (_fastRetransmit ? "(fast) " : "(timeout) ") + _packet + " next resend in " + timeout + "ms" + " activeResends: " + _activeResends + @@ -1598,6 +1627,7 @@ class Connection { + (_context.clock().now() - _packet.getCreatedOn()) + "ms)"); _unackedPacketsReceived.set(0); _lastSendTime = _context.clock().now(); + _fastRetransmit = false; // timer reset added 0.9.1 resetActivityTimer(); } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java index 0f6bd1920a..57fcbd43f1 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java @@ -122,7 +122,6 @@ class ConnectionManager { _context.statManager().createRateStat("stream.con.lifetimeDupMessagesSent", "How many duplicate messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.con.lifetimeDupMessagesReceived", "How many duplicate messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.con.lifetimeRTT", "What is the final RTT when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); - _context.statManager().createRateStat("stream.con.lifetimeCongestionSeenAt", "When was the last congestion seen at when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.con.lifetimeSendWindowSize", "What is the final send window size when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.receiveActive", "How many streams are active when a new one is received (period being not yet dropped)", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); // Stats for Connection @@ -765,7 +764,6 @@ class ConnectionManager { _context.statManager().addRateData("stream.con.lifetimeDupMessagesSent", con.getLifetimeDupMessagesSent(), con.getLifetime()); _context.statManager().addRateData("stream.con.lifetimeDupMessagesReceived", con.getLifetimeDupMessagesReceived(), con.getLifetime()); _context.statManager().addRateData("stream.con.lifetimeRTT", con.getOptions().getRTT(), con.getLifetime()); - _context.statManager().addRateData("stream.con.lifetimeCongestionSeenAt", con.getLastCongestionSeenAt(), con.getLifetime()); _context.statManager().addRateData("stream.con.lifetimeSendWindowSize", con.getOptions().getWindowSize(), con.getLifetime()); if (I2PSocketManagerFull.pcapWriter != null) I2PSocketManagerFull.pcapWriter.flush(); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java index 3466ee6f0a..d6b7ba5773 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java @@ -204,9 +204,11 @@ class ConnectionPacketHandler { // see tickets 1939 and 2584 con.setNextSendTime(_context.clock().now() + IMMEDIATE_ACK_DELAY); } else { - int delay = con.getOptions().getSendAckDelay(); + int delay; if (packet.isFlagSet(Packet.FLAG_DELAY_REQUESTED)) // delayed ACK requested delay = packet.getOptionalDelay(); + else + delay = con.getOptions().getSendAckDelay(); con.setNextSendTime(delay + _context.clock().now()); if (_log.shouldLog(Log.DEBUG)) _log.debug("Scheduling ack in " + delay + "ms for received packet " + packet); @@ -432,8 +434,8 @@ class ConnectionPacketHandler { _context.statManager().addRateData("stream.trend", trend, newWindowSize); if ( (!congested) && (acked > 0) && (numResends <= 0) ) { - if (newWindowSize < con.getLastCongestionSeenAt() / 2) { - // Don't make this <= LastCongestion/2 or we'll jump right back to where we were + int ssthresh = con.getSSThresh(); + if (newWindowSize < ssthresh) { // slow start - exponential growth // grow acked/N times (where N = the slow start factor) // always grow at least 1 @@ -446,7 +448,7 @@ class ConnectionPacketHandler { if (newWindowSize >= MAX_SLOW_START_WINDOW) newWindowSize++; else - newWindowSize = Math.min(MAX_SLOW_START_WINDOW, newWindowSize + acked); + newWindowSize = Math.min(ssthresh, newWindowSize + acked); } else if (acked < factor) newWindowSize++; else @@ -483,8 +485,8 @@ class ConnectionPacketHandler { con.setCongestionWindowEnd(newWindowSize + lowest); if (_log.shouldLog(Log.INFO)) - _log.info("New window size " + newWindowSize + "/" + oldWindow + "/" + con.getOptions().getWindowSize() + " congestionSeenAt: " - + con.getLastCongestionSeenAt() + " (#resends: " + numResends + _log.info("New window size " + newWindowSize + "/" + oldWindow + "/" + con.getOptions().getWindowSize() + + " (#resends: " + numResends + ") for " + con); } else { if (_log.shouldLog(Log.DEBUG)) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java index 4a2ada5141..c66181cb75 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java @@ -13,7 +13,6 @@ import net.i2p.data.SessionTag; import net.i2p.data.SigningPrivateKey; import net.i2p.client.streaming.I2PSocketException; import net.i2p.util.Log; -import net.i2p.util.SimpleTimer2; /** * This is the class used for outbound packets. @@ -36,7 +35,7 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { private long _cancelledOn; private final AtomicInteger _nackCount = new AtomicInteger(); private volatile boolean _retransmitted; - private volatile SimpleTimer2.TimedEvent _resendEvent; + private volatile Connection.ResendPacketEvent _resendEvent; /** not bound to a connection */ public PacketLocal(I2PAppContext ctx, Destination to, I2PSession session) { @@ -133,13 +132,14 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { public long getCreatedOn() { return _createdOn; } public long getLifetime() { return _context.clock().now() - _createdOn; } + public void incrementSends() { _numSends.incrementAndGet(); _lastSend = _context.clock().now(); } private void cancelResend() { - SimpleTimer2.TimedEvent ev = _resendEvent; + Connection.ResendPacketEvent ev = _resendEvent; if (ev != null) ev.cancel(); } @@ -166,7 +166,7 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { _log.debug("Cancelled! " + toString(), new Exception("cancelled")); } - public SimpleTimer2.TimedEvent getResendEvent() { return _resendEvent; } + public Connection.ResendPacketEvent getResendEvent() { return _resendEvent; } /** how long after packet creation was it acked? * @return how long after packet creation the packet was ACKed in ms @@ -177,6 +177,7 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { else return (int)(_ackOn - _createdOn); } + public int getNumSends() { return _numSends.get(); } public long getLastSend() { return _lastSend; } @@ -189,11 +190,11 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { */ public void incrementNACKs() { final int cnt = _nackCount.incrementAndGet(); - SimpleTimer2.TimedEvent evt = _resendEvent; + Connection.ResendPacketEvent evt = _resendEvent; if (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD && evt != null && (!_retransmitted) && (_numSends.get() == 1 || _lastSend < _context.clock().now() - 4*1000)) { // Don't fast retx if we recently resent it _retransmitted = true; - evt.reschedule(0); + evt.fastRetransmit(); // the predicate used to be '+', changing to '-' --zab if (_log.shouldLog(Log.DEBUG)) { @@ -209,9 +210,10 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { _log.debug(log); } } + public int getNACKs() { return _nackCount.get(); } - public void setResendPacketEvent(SimpleTimer2.TimedEvent evt) { _resendEvent = evt; } + public void setResendPacketEvent(Connection.ResendPacketEvent evt) { _resendEvent = evt; } /** * Sign and write the packet to the buffer (starting at the offset) and return diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java index fb0e46395d..4288946b42 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java @@ -117,7 +117,7 @@ class PacketQueue implements SendMessageStatusListener, Closeable { // this should not block! begin = _context.clock().now(); long expires = 0; - Connection.ResendPacketEvent rpe = (Connection.ResendPacketEvent) packet.getResendEvent(); + Connection.ResendPacketEvent rpe = packet.getResendEvent(); if (rpe != null) { // we want the router to expire it a little before we do, // so if we retransmit it will use a new tunnel/lease combo diff --git a/history.txt b/history.txt index e2e6e6dde8..40e9aa305b 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,12 @@ +2020-04-14 zzz + * Ratchet: Improve muxed decrypt + * Streaming: + - Fix slow start (ticket #2708) + - Reset retransmission timer after ack (ticket #2710) + +2020-04-13 zzz + * i2ptunnel: Allow comments in CLI command files + 2020-04-10 zzz * Streaming: Fix retransmission time (ticket #2709) diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index a6204817e3..f2522cd4ed 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 8; + public final static long BUILD = 9; /** for example "-test" */ public final static String EXTRA = "";