2005-09-25 Complication

* Better i2paddresshelper handling in the I2PTunnel httpclient, plus a new
      conflict resolution page if the i2paddresshelper parameter differs from
      an existing name to destination mapping.
2005-09-25  jrandom
    * Fix a long standing streaming lib bug (in the inactivity detection code)
    * Improved handling of initial streaming lib packet retransmissions to
      kill the "lost first packet" bug (where a page shows up with the first
      few KB missing)
    * Add support for initial window sizes greater than 1 - useful for
      eepsites to transmit e.g. 4 packets full of data along with the initial
      ACK, thereby cutting down on the rtt latency.  The congestion window
      size can and does still shrink down to 1 packet though.
    * Adjusted the streaming lib retransmission calculation algorithm to be
      more TCP-like.
This commit is contained in:
jrandom
2005-09-25 09:28:59 +00:00
committed by zzz
parent aa9dd3e5c6
commit b9b59ff95f
18 changed files with 325 additions and 94 deletions

View File

@ -72,7 +72,7 @@ public class Connection {
private long _lifetimeDupMessageSent;
private long _lifetimeDupMessageReceived;
public static final long MAX_RESEND_DELAY = 5*1000;
public static final long MAX_RESEND_DELAY = 8*1000;
public static final long MIN_RESEND_DELAY = 3*1000;
/** wait up to 5 minutes after disconnection so we can ack/close packets */
@ -257,13 +257,13 @@ public class Connection {
if (packet.isFlagSet(Packet.FLAG_CLOSE) || (remaining < 2)) {
packet.setOptionalDelay(0);
} else {
int delay = _options.getRTT() / 2;
int delay = _options.getRTO() / 2;
packet.setOptionalDelay(delay);
_log.debug("Requesting ack delay of " + delay + "ms for packet " + packet);
}
packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
long timeout = _options.getRTT() + MIN_RESEND_DELAY;
long timeout = _options.getRTO();
if (timeout > MAX_RESEND_DELAY)
timeout = MAX_RESEND_DELAY;
if (_log.shouldLog(Log.DEBUG))
@ -308,7 +308,7 @@ public class Connection {
List ackPackets(long ackThrough, long nacks[]) {
if (nacks == null) {
_highestAckedThrough = ackThrough;
_highestAckedThrough = ackThrough;
} else {
long lowest = -1;
for (int i = 0; i < nacks.length; i++) {
@ -463,7 +463,9 @@ public class Connection {
_receiver.destroy();
if (_activityTimer != null)
SimpleTimer.getInstance().removeEvent(_activityTimer);
_activityTimer = null;
//_activityTimer = null;
if (_inputStream != null)
_inputStream.streamErrorOccurred(new IOException("disconnected!"));
if (_disconnectScheduledOn < 0) {
_disconnectScheduledOn = _context.clock().now();
@ -695,11 +697,19 @@ public class Connection {
}
private void resetActivityTimer() {
if (_options.getInactivityTimeout() <= 0) return;
if (_activityTimer == null) return;
if (_options.getInactivityTimeout() <= 0) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Resetting the inactivity timer, but its gone!", new Exception("where did it go?"));
return;
}
if (_activityTimer == null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Resetting the inactivity timer, but its gone!", new Exception("where did it go?"));
return;
}
long howLong = _activityTimer.getTimeLeft();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Resetting the inactivity timer to " + howLong);
_log.debug("Resetting the inactivity timer to " + howLong, new Exception("Reset by"));
// this will get rescheduled, and rescheduled, and rescheduled...
SimpleTimer.getInstance().addEvent(_activityTimer, howLong);
}
@ -707,15 +717,34 @@ public class Connection {
private class ActivityTimer implements SimpleTimer.TimedEvent {
public void timeReached() {
// uh, nothing more to do...
if (!_connected) return;
if (!_connected) {
if (_log.shouldLog(Log.DEBUG)) _log.debug("Inactivity timeout reached, but we are already closed");
return;
}
// we got rescheduled already
if (getTimeLeft() > 0) return;
long left = getTimeLeft();
if (left > 0) {
if (_log.shouldLog(Log.DEBUG)) _log.debug("Inactivity timeout reached, but there is time left (" + left + ")");
SimpleTimer.getInstance().addEvent(ActivityTimer.this, left);
return;
}
// these are either going to time out or cause further rescheduling
if (getUnackedPacketsSent() > 0) return;
if (getUnackedPacketsSent() > 0) {
if (_log.shouldLog(Log.DEBUG)) _log.debug("Inactivity timeout reached, but there are unacked packets");
return;
}
// wtf, this shouldn't have been scheduled
if (_options.getInactivityTimeout() <= 0) return;
if (_options.getInactivityTimeout() <= 0) {
if (_log.shouldLog(Log.DEBUG)) _log.debug("Inactivity timeout reached, but there is no timer...");
return;
}
// if one of us can't talk...
if ( (_closeSentOn > 0) || (_closeReceivedOn > 0) ) return;
if ( (_closeSentOn > 0) || (_closeReceivedOn > 0) ) {
if (_log.shouldLog(Log.DEBUG)) _log.debug("Inactivity timeout reached, but we are closing");
return;
}
if (_log.shouldLog(Log.DEBUG)) _log.debug("Inactivity timeout reached, with action=" + _options.getInactivityAction());
// bugger it, might as well do the hard work now
switch (_options.getInactivityAction()) {
@ -741,7 +770,9 @@ public class Connection {
_log.debug(buf.toString());
}
disconnect(true);
_inputStream.streamErrorOccurred(new IOException("Inactivity timeout"));
_outputStream.streamErrorOccurred(new IOException("Inactivity timeout"));
disconnect(false);
break;
}
}
@ -774,6 +805,7 @@ public class Connection {
buf.append(" wsize: ").append(_options.getWindowSize());
buf.append(" cwin: ").append(_congestionWindowEnd - _highestAckedThrough);
buf.append(" rtt: ").append(_options.getRTT());
buf.append(" rto: ").append(_options.getRTO());
// not synchronized to avoid some kooky races
buf.append(" unacked outbound: ").append(_outboundPackets.size()).append(" ");
/*
@ -950,10 +982,10 @@ public class Connection {
disconnect(false);
} else {
//long timeout = _options.getResendDelay() << numSends;
long rtt = _options.getRTT();
if (rtt < MIN_RESEND_DELAY)
rtt = MIN_RESEND_DELAY;
long timeout = rtt << (numSends-1);
long rto = _options.getRTO();
if (rto < MIN_RESEND_DELAY)
rto = MIN_RESEND_DELAY;
long timeout = rto << (numSends-1);
if ( (timeout > MAX_RESEND_DELAY) || (timeout <= 0) )
timeout = MAX_RESEND_DELAY;
if (_log.shouldLog(Log.DEBUG))

View File

@ -2,6 +2,7 @@ package net.i2p.client.streaming;
import net.i2p.I2PAppContext;
import net.i2p.data.ByteArray;
import net.i2p.data.DataHelper;
import net.i2p.util.Log;
/**
@ -166,6 +167,9 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
packet.setOptionalFrom(con.getSession().getMyDestination());
packet.setOptionalMaxSize(con.getOptions().getMaxMessageSize());
}
if (DataHelper.eq(con.getSendStreamId(), Packet.STREAM_ID_UNKNOWN)) {
packet.setFlag(Packet.FLAG_NO_ACK);
}
// don't set the closed flag if this is a plain ACK and there are outstanding
// packets sent, otherwise the other side could receive the CLOSE prematurely,

View File

@ -13,6 +13,8 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
private int _receiveWindow;
private int _profile;
private int _rtt;
private int _rttDev;
private int _rto;
private int _trend[];
private int _resendDelay;
private int _sendAckDelay;
@ -52,6 +54,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
public static final String PROP_SLOW_START_GROWTH_RATE_FACTOR = "i2p.streaming.slowStartGrowthRateFactor";
private static final int TREND_COUNT = 3;
static final int INITIAL_WINDOW_SIZE = 4;
public ConnectionOptions() {
super();
@ -68,6 +71,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
public ConnectionOptions(ConnectionOptions opts) {
super(opts);
if (opts != null) {
setMaxWindowSize(opts.getMaxWindowSize());
setConnectDelay(opts.getConnectDelay());
setProfile(opts.getProfile());
setRTT(opts.getRTT());
@ -80,7 +84,6 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
setInactivityTimeout(opts.getInactivityTimeout());
setInactivityAction(opts.getInactivityAction());
setInboundBufferSize(opts.getInboundBufferSize());
setMaxWindowSize(opts.getMaxWindowSize());
setCongestionAvoidanceGrowthRateFactor(opts.getCongestionAvoidanceGrowthRateFactor());
setSlowStartGrowthRateFactor(opts.getSlowStartGrowthRateFactor());
}
@ -90,6 +93,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
super.init(opts);
_trend = new int[TREND_COUNT];
setMaxWindowSize(getInt(opts, PROP_MAX_WINDOW_SIZE, Connection.MAX_WINDOW_SIZE));
setConnectDelay(getInt(opts, PROP_CONNECT_DELAY, -1));
setProfile(getInt(opts, PROP_PROFILE, PROFILE_BULK));
setMaxMessageSize(getInt(opts, PROP_MAX_MESSAGE_SIZE, 4*1024));
@ -97,22 +101,23 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
setReceiveWindow(getInt(opts, PROP_INITIAL_RECEIVE_WINDOW, 1));
setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 1000));
setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, 500));
setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, 1));
setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, INITIAL_WINDOW_SIZE));
setMaxResends(getInt(opts, PROP_MAX_RESENDS, 10));
setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1));
setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 5*60*1000));
setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 2*60*1000));
setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_DISCONNECT));
setInboundBufferSize(getMaxMessageSize() * (Connection.MAX_WINDOW_SIZE + 2));
setCongestionAvoidanceGrowthRateFactor(getInt(opts, PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR, 1));
setSlowStartGrowthRateFactor(getInt(opts, PROP_SLOW_START_GROWTH_RATE_FACTOR, 1));
setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT));
setMaxWindowSize(getInt(opts, PROP_MAX_WINDOW_SIZE, Connection.MAX_WINDOW_SIZE));
}
public void setProperties(Properties opts) {
super.setProperties(opts);
if (opts == null) return;
if (opts.containsKey(PROP_MAX_WINDOW_SIZE))
setMaxWindowSize(getInt(opts, PROP_MAX_WINDOW_SIZE, Connection.MAX_WINDOW_SIZE));
if (opts.containsKey(PROP_CONNECT_DELAY))
setConnectDelay(getInt(opts, PROP_CONNECT_DELAY, -1));
if (opts.containsKey(PROP_PROFILE))
@ -124,17 +129,17 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
if (opts.containsKey(PROP_INITIAL_RECEIVE_WINDOW))
setReceiveWindow(getInt(opts, PROP_INITIAL_RECEIVE_WINDOW, 1));
if (opts.containsKey(PROP_INITIAL_RESEND_DELAY))
setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 500));
setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 1000));
if (opts.containsKey(PROP_INITIAL_ACK_DELAY))
setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, 500));
if (opts.containsKey(PROP_INITIAL_WINDOW_SIZE))
setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, 1));
setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, INITIAL_WINDOW_SIZE));
if (opts.containsKey(PROP_MAX_RESENDS))
setMaxResends(getInt(opts, PROP_MAX_RESENDS, 10));
if (opts.containsKey(PROP_WRITE_TIMEOUT))
setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1));
if (opts.containsKey(PROP_INACTIVITY_TIMEOUT))
setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 5*60*1000));
setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 2*60*1000));
if (opts.containsKey(PROP_INACTIVITY_ACTION))
setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_DISCONNECT));
setInboundBufferSize(getMaxMessageSize() * (Connection.MAX_WINDOW_SIZE + 2));
@ -145,8 +150,6 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
if (opts.containsKey(PROP_CONNECT_TIMEOUT))
setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT));
if (opts.containsKey(PROP_MAX_WINDOW_SIZE))
setMaxWindowSize(getInt(opts, PROP_MAX_WINDOW_SIZE, Connection.MAX_WINDOW_SIZE));
}
/**
@ -191,6 +194,10 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
*/
public int getRTT() { return _rtt; }
public void setRTT(int ms) {
if (_rto == 0) {
_rttDev = ms;
_rto = (int)Connection.MAX_RESEND_DELAY;
}
synchronized (_trend) {
_trend[0] = _trend[1];
_trend[1] = _trend[2];
@ -201,10 +208,12 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
else
_trend[2] = 0;
}
_rtt = ms;
if (_rtt > 60*1000)
_rtt = 60*1000;
}
public int getRTO() { return _rto; }
/**
* If we have 3 consecutive rtt increases, we are trending upwards (1), or if we have
@ -225,7 +234,15 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
private static final double RTT_DAMPENING = 0.9;
public void updateRTT(int measuredValue) {
setRTT((int)(RTT_DAMPENING*_rtt + (1-RTT_DAMPENING)*measuredValue));
_rttDev = _rttDev + (int)(0.25d*(Math.abs(measuredValue-_rtt)-_rttDev));
int smoothed = (int)(RTT_DAMPENING*_rtt + (1-RTT_DAMPENING)*measuredValue);
_rto = smoothed + (_rttDev<<2);
if (_rto < Connection.MIN_RESEND_DELAY)
_rto = (int)Connection.MIN_RESEND_DELAY;
else if (_rto > Connection.MAX_RESEND_DELAY)
_rto = (int)Connection.MAX_RESEND_DELAY;
setRTT(smoothed);
}
/** How long after sending a packet will we wait before resending? */

View File

@ -187,6 +187,7 @@ public class ConnectionPacketHandler {
}
private boolean ack(Connection con, long ackThrough, long nacks[], Packet packet, boolean isNew, boolean choke) {
if (ackThrough < 0) return false;
//if ( (nacks != null) && (nacks.length > 0) )
// con.getOptions().setRTT(con.getOptions().getRTT() + nacks.length*1000);
@ -315,6 +316,12 @@ public class ConnectionPacketHandler {
return congested;
}
/**
* If we don't know the send stream id yet (we're just creating a connection), allow
* the first three packets to come in. The first of those should be the SYN, of course...
*/
private static final int MAX_INITIAL_PACKETS = ConnectionOptions.INITIAL_WINDOW_SIZE;
/**
* Make sure this packet is ok and that we can continue processing its data.
*
@ -335,7 +342,7 @@ public class ConnectionPacketHandler {
return true;
} else {
// neither RST nor SYN and we dont have the stream id yet?
if (packet.getSequenceNum() <= 2) {
if (packet.getSequenceNum() < MAX_INITIAL_PACKETS) {
return true;
} else {
if (_log.shouldLog(Log.ERROR))

View File

@ -44,6 +44,7 @@ public class MessageHandler implements I2PSessionListener {
_log.warn("Error receiving the message", ise);
return;
}
if (data == null) return;
Packet packet = new Packet();
try {
packet.readPacket(data, 0, data.length);

View File

@ -435,7 +435,9 @@ public class MessageInputStream extends InputStream {
*
*/
void streamErrorOccurred(IOException ioe) {
_streamError = ioe;
if (_streamError == null)
_streamError = ioe;
_locallyClosed = true;
synchronized (_dataLock) {
_dataLock.notifyAll();
}

View File

@ -312,11 +312,16 @@ public class MessageOutputStream extends OutputStream {
/** nonblocking close */
public void closeInternal() {
_closed = true;
_streamError = new IOException("Closed internally");
if (_streamError == null)
_streamError = new IOException("Closed internally");
clearData(true);
}
private void clearData(boolean shouldFlush) {
ByteArray ba = null;
synchronized (_dataLock) {
// flush any data, but don't wait for it
if (_dataReceiver != null)
if ( (_dataReceiver != null) && (_valid > 0) && shouldFlush)
_dataReceiver.writeData(_buf, 0, _valid);
_written += _valid;
_valid = 0;
@ -345,7 +350,9 @@ public class MessageOutputStream extends OutputStream {
}
void streamErrorOccurred(IOException ioe) {
_streamError = ioe;
if (_streamError == null)
_streamError = ioe;
clearData(false);
}
/**

View File

@ -135,6 +135,11 @@ public class Packet {
* ping reply (if receiveStreamId is set).
*/
public static final int FLAG_ECHO = (1 << 9);
/**
* If set, this packet doesn't really want to ack anything
*/
public static final int FLAG_NO_ACK = (1 << 10);
public static final int DEFAULT_MAX_SIZE = 32*1024;
private static final int MAX_DELAY_REQUEST = 65535;
@ -181,11 +186,21 @@ public class Packet {
/**
* The highest packet sequence number that received
* on the receiveStreamId. This field is ignored on the initial
* connection packet (where receiveStreamId is the unknown id).
* connection packet (where receiveStreamId is the unknown id) or
* if FLAG_NO_ACK is set.
*
*/
public long getAckThrough() { return _ackThrough; }
public void setAckThrough(long id) { _ackThrough = id; }
public long getAckThrough() {
if (isFlagSet(FLAG_NO_ACK))
return -1;
else
return _ackThrough;
}
public void setAckThrough(long id) {
if (id < 0)
setFlag(FLAG_NO_ACK);
_ackThrough = id;
}
/**
* List of packet sequence numbers below the getAckThrough() value
@ -566,7 +581,7 @@ public class Packet {
else
buf.append('\t');
buf.append(toFlagString());
buf.append(" ACK ").append(_ackThrough);
buf.append(" ACK ").append(getAckThrough());
if (_nacks != null) {
buf.append(" NACK");
for (int i = 0; i < _nacks.length; i++) {

View File

@ -22,19 +22,26 @@ public class PacketHandler {
private I2PAppContext _context;
private Log _log;
private int _lastDelay;
private int _dropped;
public PacketHandler(I2PAppContext ctx, ConnectionManager mgr) {
_manager = mgr;
_context = ctx;
_dropped = 0;
_log = ctx.logManager().getLog(PacketHandler.class);
_lastDelay = _context.random().nextInt(30*1000);
}
private boolean choke(Packet packet) {
if (false) {
// artificial choke: 2% random drop and a 0-30s
private boolean choke(Packet packet) {
if (true) return true;
//if ( (_dropped == 0) && true ) { //&& (_manager.getSent() <= 0) ) {
// _dropped++;
// return false;
//}
if (true) {
// artificial choke: 2% random drop and a 0-5s
// random tiered delay from 0-30s
if (_context.random().nextInt(100) >= 95) {
if (_context.random().nextInt(100) >= 98) {
displayPacket(packet, "DROP", null);
return false;
} else {
@ -42,7 +49,7 @@ public class PacketHandler {
/*
int delay = _context.random().nextInt(5*1000);
*/
int delay = _context.random().nextInt(6*1000);
int delay = _context.random().nextInt(1*1000);
int delayFactor = _context.random().nextInt(100);
if (delayFactor > 80) {
if (delayFactor > 98)
@ -97,7 +104,7 @@ public class PacketHandler {
Connection con = (sendId != null ? _manager.getConnectionByInboundId(sendId) : null);
if (con != null) {
receiveKnownCon(con, packet);
displayPacket(packet, "RECV", "wsize " + con.getOptions().getWindowSize());
displayPacket(packet, "RECV", "wsize " + con.getOptions().getWindowSize() + " rto " + con.getOptions().getRTO());
} else {
receiveUnknownCon(packet, sendId);
displayPacket(packet, "UNKN", null);

View File

@ -125,7 +125,7 @@ class PacketQueue {
_log.debug(msg);
}
Connection c = packet.getConnection();
String suffix = (c != null ? "wsize " + c.getOptions().getWindowSize() : null);
String suffix = (c != null ? "wsize " + c.getOptions().getWindowSize() + " rto " + c.getOptions().getRTO() : null);
_connectionManager.getPacketHandler().displayPacket(packet, "SEND", suffix);
}