forked from I2P_Developers/i2p.i2p
Streaming: Westwood+ congestion control (ticket #2719)
Increase max slow start window to 64 Change RTT calculations from double to float Original idea from jogger Original patch from zlatinb Developed and tested with zlatinb
This commit is contained in:
@ -0,0 +1,20 @@
|
|||||||
|
package net.i2p.client.streaming.impl;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A Westwood bandwidth estimator
|
||||||
|
*
|
||||||
|
* @since 0.9.46
|
||||||
|
*/
|
||||||
|
interface BandwidthEstimator {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Records an arriving ack.
|
||||||
|
* @param acked how many packets were acked with this ack
|
||||||
|
*/
|
||||||
|
public void addSample(int acked);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the current bandwidth estimate in packets/ms.
|
||||||
|
*/
|
||||||
|
public float getBandwidthEstimate();
|
||||||
|
}
|
@ -89,6 +89,7 @@ class Connection {
|
|||||||
private final int _localPort;
|
private final int _localPort;
|
||||||
private final int _remotePort;
|
private final int _remotePort;
|
||||||
private final SimpleTimer2 _timer;
|
private final SimpleTimer2 _timer;
|
||||||
|
private final BandwidthEstimator _bwEstimator;
|
||||||
|
|
||||||
private final AtomicLong _lifetimeBytesSent = new AtomicLong();
|
private final AtomicLong _lifetimeBytesSent = new AtomicLong();
|
||||||
/** TBD for tcpdump-compatible ack output */
|
/** TBD for tcpdump-compatible ack output */
|
||||||
@ -170,6 +171,7 @@ class Connection {
|
|||||||
_nextSendLock = new Object();
|
_nextSendLock = new Object();
|
||||||
_connectionEvent = new ConEvent();
|
_connectionEvent = new ConEvent();
|
||||||
_retransmitEvent = new RetransmitEvent();
|
_retransmitEvent = new RetransmitEvent();
|
||||||
|
_bwEstimator = new SimpleBandwidthEstimator(ctx, _options);
|
||||||
_randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage
|
_randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage
|
||||||
// all createRateStats in ConnectionManager
|
// all createRateStats in ConnectionManager
|
||||||
if (_log.shouldLog(Log.INFO))
|
if (_log.shouldLog(Log.INFO))
|
||||||
@ -579,6 +581,7 @@ class Connection {
|
|||||||
}
|
}
|
||||||
if ((acked != null) && (!acked.isEmpty()) ) {
|
if ((acked != null) && (!acked.isEmpty()) ) {
|
||||||
_ackSinceCongestion.set(true);
|
_ackSinceCongestion.set(true);
|
||||||
|
_bwEstimator.addSample(acked.size());
|
||||||
if (anyLeft) {
|
if (anyLeft) {
|
||||||
// RFC 6298 section 5.3
|
// RFC 6298 section 5.3
|
||||||
int rto = getOptions().getRTO();
|
int rto = getOptions().getRTO();
|
||||||
@ -1417,6 +1420,7 @@ class Connection {
|
|||||||
buf.append(" rcvd: ").append(1 + _inputStream.getHighestBlockId() - missing);
|
buf.append(" rcvd: ").append(1 + _inputStream.getHighestBlockId() - missing);
|
||||||
buf.append(" ackThru ").append(_highestAckedThrough);
|
buf.append(" ackThru ").append(_highestAckedThrough);
|
||||||
buf.append(" ssThresh ").append(_ssthresh);
|
buf.append(" ssThresh ").append(_ssthresh);
|
||||||
|
buf.append(" minRTT ").append(getOptions().getMinRTT());
|
||||||
buf.append(" maxWin ").append(getOptions().getMaxWindowSize());
|
buf.append(" maxWin ").append(getOptions().getMaxWindowSize());
|
||||||
buf.append(" MTU ").append(getOptions().getMaxMessageSize());
|
buf.append(" MTU ").append(getOptions().getMaxMessageSize());
|
||||||
|
|
||||||
@ -1477,7 +1481,7 @@ class Connection {
|
|||||||
final long now = _context.clock().now();
|
final long now = _context.clock().now();
|
||||||
pushBackRTO(getOptions().doubleRTO());
|
pushBackRTO(getOptions().doubleRTO());
|
||||||
|
|
||||||
// 2. cut ssthresh in half the outstanding size (RFC 5681, equation 4)
|
// 2. cut ssthresh to bandwidth estimate, window to 1
|
||||||
List<PacketLocal> toResend = null;
|
List<PacketLocal> toResend = null;
|
||||||
synchronized(_outboundPackets) {
|
synchronized(_outboundPackets) {
|
||||||
if (_outboundPackets.isEmpty()) {
|
if (_outboundPackets.isEmpty()) {
|
||||||
@ -1490,8 +1494,8 @@ class Connection {
|
|||||||
if (oldest.getNumSends() == 1) {
|
if (oldest.getNumSends() == 1) {
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug(Connection.this + " cutting ssthresh and window");
|
_log.debug(Connection.this + " cutting ssthresh and window");
|
||||||
int flightSize = _outboundPackets.size();
|
_ssthresh = Math.max( (int)(_bwEstimator.getBandwidthEstimate() * getOptions().getMinRTT()), 2 );
|
||||||
_ssthresh = Math.min(ConnectionPacketHandler.MAX_SLOW_START_WINDOW, Math.max( flightSize / 2, 2 ));
|
_ssthresh = Math.min(ConnectionPacketHandler.MAX_SLOW_START_WINDOW, _ssthresh);
|
||||||
getOptions().setWindowSize(1);
|
getOptions().setWindowSize(1);
|
||||||
} else if (_log.shouldLog(Log.DEBUG))
|
} else if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug(Connection.this + " not cutting ssthresh and window");
|
_log.debug(Connection.this + " not cutting ssthresh and window");
|
||||||
@ -1693,11 +1697,12 @@ class Connection {
|
|||||||
// This prevents being stuck at a window size of 1, retransmitting every packet,
|
// This prevents being stuck at a window size of 1, retransmitting every packet,
|
||||||
// never updating the RTT or RTO.
|
// never updating the RTT or RTO.
|
||||||
getOptions().doubleRTO();
|
getOptions().doubleRTO();
|
||||||
getOptions().setWindowSize(1);
|
|
||||||
|
|
||||||
if (_packet.getNumSends() == 1) {
|
if (_packet.getNumSends() == 1) {
|
||||||
int flightSize = getUnackedPacketsSent();
|
_ssthresh = Math.max( (int)(_bwEstimator.getBandwidthEstimate() * getOptions().getMinRTT()), 2 );
|
||||||
_ssthresh = Math.min(ConnectionPacketHandler.MAX_SLOW_START_WINDOW, Math.max( flightSize / 2, 2 ));
|
_ssthresh = Math.min(ConnectionPacketHandler.MAX_SLOW_START_WINDOW, _ssthresh);
|
||||||
|
int wsize = getOptions().getWindowSize();
|
||||||
|
getOptions().setWindowSize(Math.min(_ssthresh, wsize));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (_log.shouldLog(Log.INFO))
|
if (_log.shouldLog(Log.INFO))
|
||||||
|
@ -28,6 +28,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
|
|||||||
private int _receiveWindow;
|
private int _receiveWindow;
|
||||||
private int _profile;
|
private int _profile;
|
||||||
private int _rtt;
|
private int _rtt;
|
||||||
|
private int _minRtt = Integer.MAX_VALUE;
|
||||||
private int _rttDev;
|
private int _rttDev;
|
||||||
private int _rto = INITIAL_RTO;
|
private int _rto = INITIAL_RTO;
|
||||||
private int _resendDelay;
|
private int _resendDelay;
|
||||||
@ -83,9 +84,9 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
|
|||||||
* These values are specified in RFC 6298
|
* These values are specified in RFC 6298
|
||||||
* Do not change unless you know what you're doing
|
* Do not change unless you know what you're doing
|
||||||
*/
|
*/
|
||||||
private static final double TCP_ALPHA = 1.0/8;
|
private static final float TCP_ALPHA = 1.0f/8;
|
||||||
private static final double TCP_BETA = 1.0/4;
|
private static final float TCP_BETA = 1.0f/4;
|
||||||
private static final double TCP_KAPPA = 4;
|
private static final float TCP_KAPPA = 4;
|
||||||
|
|
||||||
private static final String PROP_INITIAL_RTO = "i2p.streaming.initialRTO";
|
private static final String PROP_INITIAL_RTO = "i2p.streaming.initialRTO";
|
||||||
private static final int INITIAL_RTO = 9000;
|
private static final int INITIAL_RTO = 9000;
|
||||||
@ -578,6 +579,13 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
|
|||||||
*/
|
*/
|
||||||
public synchronized int getRTT() { return _rtt; }
|
public synchronized int getRTT() { return _rtt; }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return minimum RTT observed over the life of the connection, greater than zero
|
||||||
|
* @since 0.9.46
|
||||||
|
*/
|
||||||
|
public synchronized int getMinRTT() {return _minRtt; }
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* not public, use updateRTT()
|
* not public, use updateRTT()
|
||||||
*/
|
*/
|
||||||
@ -677,6 +685,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
|
|||||||
* @param measuredValue must be positive
|
* @param measuredValue must be positive
|
||||||
*/
|
*/
|
||||||
public synchronized void updateRTT(int measuredValue) {
|
public synchronized void updateRTT(int measuredValue) {
|
||||||
|
_minRtt = Math.min(_minRtt, measuredValue);
|
||||||
switch(_initState) {
|
switch(_initState) {
|
||||||
case INIT:
|
case INIT:
|
||||||
_initState = AckInit.FIRST;
|
_initState = AckInit.FIRST;
|
||||||
|
@ -28,7 +28,7 @@ class ConnectionPacketHandler {
|
|||||||
private final Log _log;
|
private final Log _log;
|
||||||
private final ByteCache _cache = ByteCache.getInstance(32, 4*1024);
|
private final ByteCache _cache = ByteCache.getInstance(32, 4*1024);
|
||||||
|
|
||||||
public static final int MAX_SLOW_START_WINDOW = 24;
|
public static final int MAX_SLOW_START_WINDOW = 64;
|
||||||
|
|
||||||
// see tickets 1939 and 2584
|
// see tickets 1939 and 2584
|
||||||
private static final int IMMEDIATE_ACK_DELAY = 150;
|
private static final int IMMEDIATE_ACK_DELAY = 150;
|
||||||
|
@ -0,0 +1,159 @@
|
|||||||
|
package net.i2p.client.streaming.impl;
|
||||||
|
|
||||||
|
import net.i2p.I2PAppContext;
|
||||||
|
import net.i2p.data.DataHelper;
|
||||||
|
import net.i2p.util.Log;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A Westwood+ bandwidth estimator with
|
||||||
|
* a first stage anti-aliasing low pass filter based on RTT,
|
||||||
|
* and the time-varying Westwood filter based on inter-arrival time.
|
||||||
|
*
|
||||||
|
* Ref: TCP Westwood: End-to-End Congestion Control for Wired/Wireless Networks
|
||||||
|
* Casetti et al
|
||||||
|
* (Westwood)
|
||||||
|
*
|
||||||
|
* Ref: End-to-End Bandwidth Estimation for Congestion Control in Packet Networks
|
||||||
|
* Grieco and Mascolo
|
||||||
|
* (Westwood+)
|
||||||
|
*
|
||||||
|
* Adapted from: Linux kernel tcp_westwood.c (GPLv2)
|
||||||
|
*
|
||||||
|
* @since 0.9.46
|
||||||
|
*/
|
||||||
|
class SimpleBandwidthEstimator implements BandwidthEstimator {
|
||||||
|
|
||||||
|
private final I2PAppContext _context;
|
||||||
|
private final Log _log;
|
||||||
|
private final ConnectionOptions _opts;
|
||||||
|
|
||||||
|
private long _tAck;
|
||||||
|
// bw_est, bw_ns_est
|
||||||
|
private float _bKFiltered, _bK_ns_est;
|
||||||
|
// bk
|
||||||
|
private int _acked;
|
||||||
|
|
||||||
|
// As in kernel tcp_westwood.c
|
||||||
|
// Should probably match ConnectionOptions.TCP_ALPHA
|
||||||
|
private static final int DECAY_FACTOR = 8;
|
||||||
|
private static final int WESTWOOD_RTT_MIN = 500;
|
||||||
|
|
||||||
|
SimpleBandwidthEstimator(I2PAppContext ctx, ConnectionOptions opts) {
|
||||||
|
_log = ctx.logManager().getLog(SimpleBandwidthEstimator.class);
|
||||||
|
_context = ctx;
|
||||||
|
_opts = opts;
|
||||||
|
// assume we're about to send something
|
||||||
|
_tAck = ctx.clock().now();
|
||||||
|
_acked = -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Records an arriving ack.
|
||||||
|
* @param acked how many packets were acked with this ack
|
||||||
|
*/
|
||||||
|
public synchronized void addSample(int acked) {
|
||||||
|
long now = _context.clock().now();
|
||||||
|
if (_acked < 0) {
|
||||||
|
// first sample
|
||||||
|
// use time since constructed as the RTT
|
||||||
|
// getRTT() would return zero here.
|
||||||
|
long deltaT = Math.max(now - _tAck, WESTWOOD_RTT_MIN);
|
||||||
|
float bkdt = ((float) acked) / deltaT;
|
||||||
|
_bKFiltered = bkdt;
|
||||||
|
_bK_ns_est = bkdt;
|
||||||
|
_acked = 0;
|
||||||
|
_tAck = now;
|
||||||
|
if (_log.shouldDebug())
|
||||||
|
_log.debug("first sample packets: " + acked + " deltaT: " + deltaT + ' ' + this);
|
||||||
|
} else {
|
||||||
|
_acked += acked;
|
||||||
|
// anti-aliasing filter
|
||||||
|
// As in kernel tcp_westwood.c
|
||||||
|
// and the Westwood+ paper
|
||||||
|
if (now - _tAck >= Math.max(_opts.getRTT(), WESTWOOD_RTT_MIN))
|
||||||
|
computeBWE(now);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the current bandwidth estimate in packets/ms.
|
||||||
|
*/
|
||||||
|
public synchronized float getBandwidthEstimate() {
|
||||||
|
long now = _context.clock().now();
|
||||||
|
// anti-aliasing filter
|
||||||
|
// As in kernel tcp_westwood.c
|
||||||
|
// and the Westwood+ paper
|
||||||
|
if (now - _tAck >= Math.max(_opts.getRTT(), WESTWOOD_RTT_MIN))
|
||||||
|
return computeBWE(now);
|
||||||
|
return _bKFiltered;
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized float computeBWE(final long now) {
|
||||||
|
if (_acked < 0)
|
||||||
|
return 0.0f; // nothing ever sampled
|
||||||
|
updateBK(now, _acked);
|
||||||
|
_acked = 0;
|
||||||
|
return _bKFiltered;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Optimized version of updateBK with packets == 0
|
||||||
|
*/
|
||||||
|
private void decay() {
|
||||||
|
_bK_ns_est *= (DECAY_FACTOR - 1) / (float) DECAY_FACTOR;
|
||||||
|
_bKFiltered = westwood_do_filter(_bKFiltered, _bK_ns_est);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Here we insert virtual null samples if necessary as in Westwood,
|
||||||
|
* And use a very simple EWMA (exponential weighted moving average)
|
||||||
|
* time-varying filter, as in kernel tcp_westwood.c
|
||||||
|
*
|
||||||
|
* @param time the time of the measurement
|
||||||
|
* @param packets number of packets acked
|
||||||
|
*/
|
||||||
|
private void updateBK(long time, int packets) {
|
||||||
|
long deltaT = time - _tAck;
|
||||||
|
int rtt = Math.max(_opts.getRTT(), WESTWOOD_RTT_MIN);
|
||||||
|
if (deltaT > 2 * rtt) {
|
||||||
|
// Decay with virtual null samples as in the Westwood paper
|
||||||
|
int numrtts = Math.min((int) ((deltaT / rtt) - 1), 2 * DECAY_FACTOR);
|
||||||
|
for (int i = 0; i < numrtts; i++) {
|
||||||
|
decay();
|
||||||
|
}
|
||||||
|
deltaT -= numrtts * rtt;
|
||||||
|
if (_log.shouldDebug())
|
||||||
|
_log.debug("decayed " + numrtts + " times, new _bK_ns_est: " + _bK_ns_est + ' ' + this);
|
||||||
|
}
|
||||||
|
float bkdt;
|
||||||
|
if (packets > 0) {
|
||||||
|
// As in kernel tcp_westwood.c
|
||||||
|
bkdt = ((float) packets) / deltaT;
|
||||||
|
_bK_ns_est = westwood_do_filter(_bK_ns_est, bkdt);
|
||||||
|
_bKFiltered = westwood_do_filter(_bKFiltered, _bK_ns_est);
|
||||||
|
} else {
|
||||||
|
bkdt = 0;
|
||||||
|
decay();
|
||||||
|
}
|
||||||
|
_tAck = time;
|
||||||
|
if (_log.shouldDebug())
|
||||||
|
_log.debug("computeBWE packets: " + packets + " deltaT: " + deltaT +
|
||||||
|
" bk/deltaT: " + bkdt + " _bK_ns_est: " + _bK_ns_est + ' ' + this);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* As in kernel tcp_westwood.c
|
||||||
|
*/
|
||||||
|
private static float westwood_do_filter(float a, float b) {
|
||||||
|
return (((DECAY_FACTOR - 1) * a) + b) / DECAY_FACTOR;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized String toString() {
|
||||||
|
return "SBE[" +
|
||||||
|
" _bKFiltered " + _bKFiltered +
|
||||||
|
" _tAck " + _tAck + "; " +
|
||||||
|
DataHelper.formatSize2Decimal((long) (_bKFiltered * 1000 * _opts.getMaxMessageSize()), false) +
|
||||||
|
"Bps]";
|
||||||
|
}
|
||||||
|
}
|
18
history.txt
18
history.txt
@ -1,3 +1,21 @@
|
|||||||
|
2020-04-30 zzz
|
||||||
|
* Ratchet: Error handling fixes
|
||||||
|
* Streaming: Westwood+ congestion control (ticket #2719)
|
||||||
|
|
||||||
|
2020-04-29 zzz
|
||||||
|
* Ratchet: Increase callback timeout
|
||||||
|
* Router: Replace old data structure classes
|
||||||
|
|
||||||
|
2020-04-27 zzz
|
||||||
|
* Crypto: Reduce max ElG tagset expiration at receiver
|
||||||
|
* i2psnark:
|
||||||
|
- Don't unchoke when we don't have pieces
|
||||||
|
- Don't avoid partial pieces if there are several seeds
|
||||||
|
|
||||||
|
2020-04-25 zzz
|
||||||
|
* i2psnark: Increase min size for video preview
|
||||||
|
* Ratchet: Variable tagset lookahead/trim limits
|
||||||
|
|
||||||
2020-04-24 zzz
|
2020-04-24 zzz
|
||||||
* i2psnark: Don't mark torrent BAD on I2CP errors (ticket #2725)
|
* i2psnark: Don't mark torrent BAD on I2CP errors (ticket #2725)
|
||||||
* Logging: Log to wrapper log after log manager shutdown (ticket #2725)
|
* Logging: Log to wrapper log after log manager shutdown (ticket #2725)
|
||||||
|
@ -18,7 +18,7 @@ public class RouterVersion {
|
|||||||
/** deprecated */
|
/** deprecated */
|
||||||
public final static String ID = "Monotone";
|
public final static String ID = "Monotone";
|
||||||
public final static String VERSION = CoreVersion.VERSION;
|
public final static String VERSION = CoreVersion.VERSION;
|
||||||
public final static long BUILD = 13;
|
public final static long BUILD = 14;
|
||||||
|
|
||||||
/** for example "-test" */
|
/** for example "-test" */
|
||||||
public final static String EXTRA = "";
|
public final static String EXTRA = "";
|
||||||
|
Reference in New Issue
Block a user