* you mean we should implement congestion *avoidance* too?

* ack properly on duplicates
* set the message input stream buffer large enough to fit the max window (duh)
This commit is contained in:
jrandom
2004-11-09 13:26:10 +00:00
committed by zzz
parent 83165df7e5
commit 73a12d47de
3 changed files with 50 additions and 8 deletions

View File

@ -56,6 +56,9 @@ public class Connection {
private boolean _disconnectScheduled; private boolean _disconnectScheduled;
private long _lastReceivedOn; private long _lastReceivedOn;
private ActivityTimer _activityTimer; private ActivityTimer _activityTimer;
/** window size when we last saw congestion */
private int _lastCongestionSeenAt;
private boolean _ackSinceCongestion;
public static final long MAX_RESEND_DELAY = 60*1000; public static final long MAX_RESEND_DELAY = 60*1000;
public static final long MIN_RESEND_DELAY = 20*1000; public static final long MIN_RESEND_DELAY = 20*1000;
@ -63,6 +66,9 @@ public class Connection {
/** wait up to 5 minutes after disconnection so we can ack/close packets */ /** wait up to 5 minutes after disconnection so we can ack/close packets */
public static long DISCONNECT_TIMEOUT = 5*60*1000; public static long DISCONNECT_TIMEOUT = 5*60*1000;
/** lets be sane.. no more than 32 packets in the air in each dir */
public static final int MAX_WINDOW_SIZE = 32;
public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, PacketQueue queue, ConnectionPacketHandler handler) { public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, PacketQueue queue, ConnectionPacketHandler handler) {
this(ctx, manager, chooser, queue, handler, null); this(ctx, manager, chooser, queue, handler, null);
} }
@ -86,12 +92,14 @@ public class Connection {
_unackedPacketsReceived = 0; _unackedPacketsReceived = 0;
_congestionWindowEnd = 0; _congestionWindowEnd = 0;
_highestAckedThrough = -1; _highestAckedThrough = -1;
_lastCongestionSeenAt = MAX_WINDOW_SIZE;
_connectionManager = manager; _connectionManager = manager;
_resetReceived = false; _resetReceived = false;
_connected = true; _connected = true;
_disconnectScheduled = false; _disconnectScheduled = false;
_lastReceivedOn = -1; _lastReceivedOn = -1;
_activityTimer = new ActivityTimer(); _activityTimer = new ActivityTimer();
_ackSinceCongestion = true;
} }
public long getNextOutboundPacketNum() { public long getNextOutboundPacketNum() {
@ -275,6 +283,8 @@ public class Connection {
} }
_outboundPackets.notifyAll(); _outboundPackets.notifyAll();
} }
if ((acked != null) && (acked.size() > 0) )
_ackSinceCongestion = true;
return acked; return acked;
} }
@ -438,6 +448,17 @@ public class Connection {
return (_lastSendTime > _lastReceivedOn ? _lastSendTime : _lastReceivedOn); return (_lastSendTime > _lastReceivedOn ? _lastSendTime : _lastReceivedOn);
} }
public int getLastCongestionSeenAt() { return _lastCongestionSeenAt; }
void congestionOccurred() {
// if we hit congestion and e.g. 5 packets are resent,
// dont set the size to (winSize >> 4). only set the
if (_ackSinceCongestion) {
_lastCongestionSeenAt = _options.getWindowSize();
_ackSinceCongestion = false;
}
}
void packetReceived() { void packetReceived() {
_lastReceivedOn = _context.clock().now(); _lastReceivedOn = _context.clock().now();
resetActivityTimer(); resetActivityTimer();
@ -563,6 +584,7 @@ public class Connection {
// shrink the window // shrink the window
int newWindowSize = getOptions().getWindowSize(); int newWindowSize = getOptions().getWindowSize();
_lastCongestionSeenAt = newWindowSize;
newWindowSize /= 2; newWindowSize /= 2;
if (newWindowSize <= 0) if (newWindowSize <= 0)
newWindowSize = 1; newWindowSize = 1;

View File

@ -74,7 +74,7 @@ public class ConnectionOptions extends I2PSocketOptions {
setWriteTimeout(-1); setWriteTimeout(-1);
setInactivityTimeout(5*60*1000); setInactivityTimeout(5*60*1000);
setInactivityAction(INACTIVITY_ACTION_SEND); setInactivityAction(INACTIVITY_ACTION_SEND);
setInboundBufferSize(256*1024); setInboundBufferSize((Packet.MAX_PAYLOAD_SIZE + 2) * Connection.MAX_WINDOW_SIZE);
} }
} }
@ -103,15 +103,14 @@ public class ConnectionOptions extends I2PSocketOptions {
public boolean getRequireFullySigned() { return _fullySigned; } public boolean getRequireFullySigned() { return _fullySigned; }
public void setRequireFullySigned(boolean sign) { _fullySigned = sign; } public void setRequireFullySigned(boolean sign) { _fullySigned = sign; }
private static final int MAX_WINDOW_SIZE = 32;
/** /**
* How many messages will we send before waiting for an ACK? * How many messages will we send before waiting for an ACK?
* *
*/ */
public int getWindowSize() { return _windowSize; } public int getWindowSize() { return _windowSize; }
public void setWindowSize(int numMsgs) { public void setWindowSize(int numMsgs) {
if (numMsgs > MAX_WINDOW_SIZE) if (numMsgs > Connection.MAX_WINDOW_SIZE)
numMsgs = MAX_WINDOW_SIZE; numMsgs = Connection.MAX_WINDOW_SIZE;
_windowSize = numMsgs; _windowSize = numMsgs;
} }

View File

@ -124,10 +124,17 @@ public class ConnectionPacketHandler {
if ( (!isNew) && (sequenceNum > 0) ) { if ( (!isNew) && (sequenceNum > 0) ) {
// dup real packet // dup real packet
int oldSize = con.getOptions().getWindowSize(); int oldSize = con.getOptions().getWindowSize();
con.congestionOccurred();
oldSize >>>= 1; oldSize >>>= 1;
if (oldSize <= 0) if (oldSize <= 0)
oldSize = 1; oldSize = 1;
con.getOptions().setWindowSize(oldSize); con.getOptions().setWindowSize(oldSize);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Congestion occurred - new windowSize " + oldSize + " congestionSeenAt: "
+ con.getLastCongestionSeenAt() + " (#resends: " + numResends
+ ") for " + con);
return true; return true;
} else if (numResends > 0) { } else if (numResends > 0) {
// window sizes are shrunk on resend, not on ack // window sizes are shrunk on resend, not on ack
@ -137,9 +144,23 @@ public class ConnectionPacketHandler {
if (lowest >= con.getCongestionWindowEnd()) { if (lowest >= con.getCongestionWindowEnd()) {
// new packet that ack'ed uncongested data, or an empty ack // new packet that ack'ed uncongested data, or an empty ack
int newWindowSize = con.getOptions().getWindowSize(); int newWindowSize = con.getOptions().getWindowSize();
newWindowSize += 1; // acked; // 1
if (newWindowSize > con.getLastCongestionSeenAt() / 2) {
// congestion avoidance
// we can't use newWindowSize += 1/newWindowSize, since we're
// integers, so lets use a random distribution instead
int shouldIncrement = _context.random().nextInt(newWindowSize);
if (shouldIncrement <= 0)
newWindowSize += 1;
} else {
// slow start
newWindowSize += 1;
}
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("New window size " + newWindowSize + " (#resends: " + numResends _log.debug("New window size " + newWindowSize + " congestionSeenAt: "
+ con.getLastCongestionSeenAt() + " (#resends: " + numResends
+ ") for " + con); + ") for " + con);
con.getOptions().setWindowSize(newWindowSize); con.getOptions().setWindowSize(newWindowSize);
con.setCongestionWindowEnd(newWindowSize + lowest); con.setCongestionWindowEnd(newWindowSize + lowest);
@ -267,9 +288,9 @@ public class ConnectionPacketHandler {
_con = con; _con = con;
} }
public void timeReached() { public void timeReached() {
if (_con.getLastActivityOn() <= _created) { if (_con.getLastSendTime() <= _created) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Last activity was a while ago, and we want to ack a dup"); _log.debug("Last sent was a while ago, and we want to ack a dup");
// we haven't done anything since receiving the dup, send an // we haven't done anything since receiving the dup, send an
// ack now // ack now
_con.ackImmediately(); _con.ackImmediately();