2005-10-24 jrandom

* Defer netDb searches for newly referenced peers until we actually want
      them
    * Ignore netDb references to peers on our shitlist
    * Set the timeout for end to end client messages to the max delay after
      finding the leaseSet, so we don't have as many expired messages floating
      around.
    * Add a floor to the streaming lib window size
    * When we need to send a streaming lib ACK, try to retransmit one of the
      unacked packets instead (with updated ACK/NACK fields, of course).  The
      bandwidth cost of an unnecessary retransmission should be minor as
      compared to both an ACK packet (rounded up to 1KB in the tunnels) and
      the probability of a necessary retransmission.
    * Adjust the streaming lib cwin algorithm to allow growth after a full
      cwin messages if the rtt is trending downwards.  If it is not, use the
      existing algorithm.
    * Increased the maximum rto size in the streaming lib.
    * Load balancing bugfix on end to end messages to distribute across
      tunnels more evenly.
This commit is contained in:
jrandom
2005-10-25 19:13:53 +00:00
committed by zzz
parent 84383c3dab
commit 4de302101d
12 changed files with 225 additions and 96 deletions

View File

@ -72,8 +72,8 @@ public class Connection {
private long _lifetimeDupMessageSent;
private long _lifetimeDupMessageReceived;
public static final long MAX_RESEND_DELAY = 8*1000;
public static final long MIN_RESEND_DELAY = 3*1000;
public static final long MAX_RESEND_DELAY = 15*1000;
public static final long MIN_RESEND_DELAY = 2*1000;
/** wait up to 5 minutes after disconnection so we can ack/close packets */
public static int DISCONNECT_TIMEOUT = 5*60*1000;
@ -193,7 +193,28 @@ public class Connection {
}
void ackImmediately() {
PacketLocal packet = _receiver.send(null, 0, 0);
PacketLocal packet = null;
synchronized (_outboundPackets) {
if (_outboundPackets.size() > 0) {
// ordered, so pick the lowest to retransmit
Iterator iter = _outboundPackets.values().iterator();
packet = (PacketLocal)iter.next();
//iter.remove();
}
}
if (packet != null) {
ResendPacketEvent evt = (ResendPacketEvent)packet.getResendEvent();
if (evt != null) {
boolean sent = evt.retransmit(false);
if (sent) {
return;
} else {
//SimpleTimer.getInstance().addEvent(evt, evt.getNextSendTime());
}
}
}
// if we don't have anything to retransmit, send a small ack
packet = _receiver.send(null, 0, 0);
//packet.releasePayload();
}
@ -277,7 +298,7 @@ public class Connection {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Resend in " + timeout + " for " + packet, new Exception("Sent by"));
SimpleTimer.getInstance().addEvent(new ResendPacketEvent(packet), timeout);
SimpleTimer.getInstance().addEvent(new ResendPacketEvent(packet, timeout + _context.clock().now()), timeout);
}
_context.statManager().getStatLog().addData(Packet.toId(_sendStreamId), "stream.rtt", _options.getRTT(), _options.getWindowSize());
@ -899,18 +920,29 @@ public class Connection {
*/
private class ResendPacketEvent implements SimpleTimer.TimedEvent {
private PacketLocal _packet;
public ResendPacketEvent(PacketLocal packet) {
private long _nextSendTime;
public ResendPacketEvent(PacketLocal packet, long sendTime) {
_packet = packet;
_nextSendTime = sendTime;
packet.setResendPacketEvent(ResendPacketEvent.this);
}
public void timeReached() {
public long getNextSendTime() { return _nextSendTime; }
public void timeReached() { retransmit(true); }
/**
* Retransmit the packet if we need to.
*
* @param penalize true if this retransmission is caused by a timeout, false if we
* are just sending this packet instead of an ACK
* @return true if the packet was sent, false if it was not
*/
public boolean retransmit(boolean penalize) {
if (_packet.getAckTime() > 0)
return;
return false;
if (_resetSent || _resetReceived) {
_packet.cancelled();
return;
return false;
}
//if (_log.shouldLog(Log.DEBUG))
@ -932,7 +964,8 @@ public class Connection {
_log.warn("Delaying resend of " + _packet + " as there are "
+ _activeResends + " active resends already in play");
SimpleTimer.getInstance().addEvent(ResendPacketEvent.this, 1000);
return;
_nextSendTime = 1000 + _context.clock().now();
return false;
}
// revamp various fields, in case we need to ack more, etc
_inputStream.updateAcks(_packet);
@ -949,7 +982,7 @@ public class Connection {
int newWindowSize = getOptions().getWindowSize();
if (_ackSinceCongestion) {
if (penalize && _ackSinceCongestion) {
// only shrink the window once per window
if (_packet.getSequenceNum() > _lastCongestionHighestUnacked) {
congestionOccurred();
@ -1004,7 +1037,7 @@ public class Connection {
synchronized (_outboundPackets) {
_outboundPackets.notifyAll();
}
return;
return true;
}
if (numSends - 1 > _options.getMaxResends()) {
@ -1023,11 +1056,14 @@ public class Connection {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Scheduling resend in " + timeout + "ms for " + _packet);
SimpleTimer.getInstance().addEvent(ResendPacketEvent.this, timeout);
_nextSendTime = timeout + _context.clock().now();
}
return true;
} else {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Packet acked before resend (resend="+ resend + "): "
// + _packet + " on " + Connection.this);
return false;
}
}
}

View File

@ -54,9 +54,11 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
public static final String PROP_SLOW_START_GROWTH_RATE_FACTOR = "i2p.streaming.slowStartGrowthRateFactor";
private static final int TREND_COUNT = 3;
static final int INITIAL_WINDOW_SIZE = 4;
static final int INITIAL_WINDOW_SIZE = 6;
static final int DEFAULT_MAX_SENDS = 8;
static final int MIN_WINDOW_SIZE = 6;
public ConnectionOptions() {
super();
}
@ -183,6 +185,8 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
numMsgs = _maxWindowSize;
else if (numMsgs <= 0)
numMsgs = 1;
if (numMsgs < MIN_WINDOW_SIZE)
numMsgs = MIN_WINDOW_SIZE;
_windowSize = numMsgs;
}

View File

@ -52,6 +52,16 @@ public class ConnectionPacketHandler {
packet.releasePayload();
return;
}
if ( (con.getCloseSentOn() > 0) && (con.getUnackedPacketsSent() <= 0) &&
(packet.getSequenceNum() > 0) && (packet.getPayloadSize() > 0)) {
if (_log.shouldLog(Log.WARN))
_log.warn("Received new data when we've sent them data and all of our data is acked: "
+ packet + " on " + con + "");
con.sendReset();
packet.releasePayload();
return;
}
if (packet.isFlagSet(Packet.FLAG_MAX_PACKET_SIZE_INCLUDED)) {
if (packet.getOptionalMaxSize() < con.getOptions().getMaxMessageSize()) {
@ -285,8 +295,10 @@ public class ConnectionPacketHandler {
_context.statManager().addRateData("stream.trend", trend, newWindowSize);
if ( (!congested) && (acked > 0) && (numResends <= 0) ) {
if ( (newWindowSize > con.getLastCongestionSeenAt() / 2) ||
(trend > 0) ) { // tcp vegas: avoidance if rtt is increasing, even if we arent at ssthresh/2 yet
if (trend < 0) {
// rtt is shrinking, so lets increment the cwin
newWindowSize++;
} else if (newWindowSize > con.getLastCongestionSeenAt() / 2) {
// congestion avoidance
// we can't use newWindowSize += 1/newWindowSize, since we're

View File

@ -236,8 +236,8 @@ public class PacketHandler {
}
packet.releasePayload();
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet received on an unknown stream (and not an ECHO): " + packet);
if (_log.shouldLog(Log.DEBUG) && !packet.isFlagSet(Packet.FLAG_SYNCHRONIZE))
_log.debug("Packet received on an unknown stream (and not an ECHO or SYN): " + packet);
if (sendId <= 0) {
Connection con = _manager.getConnectionByOutboundId(packet.getReceiveStreamId());
if (con != null) {

View File

@ -101,6 +101,7 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
if (_log.shouldLog(Log.DEBUG))
_log.debug("Cancelled! " + toString(), new Exception("cancelled"));
}
public SimpleTimer.TimedEvent getResendEvent() { return _resendEvent; }
/** how long after packet creation was it acked? */
public int getAckTime() {