* Streaming:

- When an "immediate" ack is requested, do it within
        250 ms (was 2000)
      - Request immediate acks when < 1/3 of window remains,
        or when < 3 packets remain in window,
        and every 8 packets (was when < 2 packets in window remain)
      - Change requested delay to RTT/2 (was RTO/2)
      - Log cleanup and javadoc
This commit is contained in:
zzz
2009-11-24 20:12:27 +00:00
parent e78dd1fdc3
commit 94faf74aa4
5 changed files with 78 additions and 36 deletions

View File

@ -189,9 +189,9 @@ public class Connection {
+ _activeResends + "), waiting " + timeLeft);
try { _outboundPackets.wait(Math.min(timeLeft,250l)); } catch (InterruptedException ie) { if (_log.shouldLog(Log.DEBUG)) _log.debug("InterruptedException while Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends +")"); return false;}
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends
+ "), waiting indefinitely");
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends
// + "), waiting indefinitely");
try { _outboundPackets.wait(250); } catch (InterruptedException ie) {if (_log.shouldLog(Log.DEBUG)) _log.debug("InterruptedException while Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends + ")"); return false;} //10*1000
}
} else {
@ -297,37 +297,48 @@ public class Connection {
if ( (packet.getSequenceNum() == 0) && (!packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) ) {
ackOnly = true;
if (_log.shouldLog(Log.DEBUG))
_log.debug("No resend for " + packet);
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("No resend for " + packet);
} else {
int remaining = 0;
int windowSize;
int remaining;
synchronized (_outboundPackets) {
_outboundPackets.put(new Long(packet.getSequenceNum()), packet);
remaining = _options.getWindowSize() - _outboundPackets.size() ;
windowSize = _options.getWindowSize();
remaining = windowSize - _outboundPackets.size() ;
_outboundPackets.notifyAll();
}
if (remaining < 0)
remaining = 0;
if (packet.isFlagSet(Packet.FLAG_CLOSE) || (remaining < 2)) {
// the other end has no idea what our window size is, so
// help him out by requesting acks below the 1/3 point,
// if remaining < 3, and every 8 minimum.
if (packet.isFlagSet(Packet.FLAG_CLOSE) ||
(remaining < (windowSize + 2) / 3) ||
(remaining < 3) ||
(packet.getSequenceNum() % 8 == 0)) {
packet.setOptionalDelay(0);
packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Requesting no ack delay for packet " + packet);
} else {
int delay = _options.getRTO() / 2;
// This is somewhat of a waste of time, unless the RTT < 4000,
// since the other end limits it to getSendAckDelay()
// which is always 2000, but it's good for diagnostics to see what the other end thinks
// the RTT is.
int delay = _options.getRTT() / 2;
packet.setOptionalDelay(delay);
if (delay > 0)
packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Requesting ack delay of " + delay + "ms for packet " + packet);
}
// WHY always set?
packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
long timeout = _options.getRTO();
if (timeout > MAX_RESEND_DELAY)
timeout = MAX_RESEND_DELAY;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Resend in " + timeout + " for " + packet, new Exception("Sent by"));
_log.debug("Resend in " + timeout + " for " + packet);
// schedules itself
ResendPacketEvent rpe = new ResendPacketEvent(packet, timeout);
@ -370,6 +381,10 @@ public class Connection {
}
*********/
/**
* Process the acks and nacks received in a packet
* @return List of packets acked or null
*/
List ackPackets(long ackThrough, long nacks[]) {
if (ackThrough < _highestAckedThrough) {
// dupack which won't tell us anything
@ -685,6 +700,14 @@ public class Connection {
* @return the next time the scheduler will want to send a packet, or -1 if never.
*/
public long getNextSendTime() { return _nextSendTime; }
/**
* If the next send time is currently >= 0 (i.e. not "never"),
* this may make the next time sooner but will not make it later.
* If the next send time is currently < 0 (i.e. "never"),
* this will set it to the time specified, but not later than
* options.getSendAckDelay() from now (2000 ms)
*/
public void setNextSendTime(long when) {
if (_nextSendTime >= 0) {
if (when < _nextSendTime)
@ -699,12 +722,12 @@ public class Connection {
_nextSendTime = max;
}
if (_log.shouldLog(Log.DEBUG) && false) {
if (_nextSendTime <= 0)
_log.debug("set next send time to an unknown time", new Exception(toString()));
else
_log.debug("set next send time to " + (_nextSendTime-_context.clock().now()) + "ms from now", new Exception(toString()));
}
//if (_log.shouldLog(Log.DEBUG) && false) {
// if (_nextSendTime <= 0)
// _log.debug("set next send time to an unknown time", new Exception(toString()));
// else
// _log.debug("set next send time to " + (_nextSendTime-_context.clock().now()) + "ms from now", new Exception(toString()));
//}
}
/** how many packets have we sent and the other side has ACKed?
@ -742,7 +765,9 @@ public class Connection {
public long getCongestionWindowEnd() { return _congestionWindowEnd; }
public void setCongestionWindowEnd(long endMsg) { _congestionWindowEnd = endMsg; }
/** @return the highest outbound packet we have recieved an ack for */
public long getHighestAckedThrough() { return _highestAckedThrough; }
/** @deprecated unused */
public void setHighestAckedThrough(long msgNum) { _highestAckedThrough = msgNum; }
public long getLastActivityOn() {
@ -835,8 +860,8 @@ public class Connection {
}
long howLong = _options.getInactivityTimeout();
howLong += _randomWait; // randomize it a bit, so both sides don't do it at once
if (_log.shouldLog(Log.DEBUG))
_log.debug("Resetting the inactivity timer to " + howLong, new Exception(toString()));
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Resetting the inactivity timer to " + howLong);
// this will get rescheduled, and rescheduled, and rescheduled...
_activityTimer.reschedule(howLong, false); // use the later of current and previous timeout
}
@ -1087,6 +1112,8 @@ public class Connection {
// we want to resend this packet, but there are already active
// resends in the air and we dont want to make a bad situation
// worse. wait another second
// BUG? seq# = 0, activeResends = 0, loop forever - why?
// also seen with seq# > 0. Is the _activeResends count reliable?
if (_log.shouldLog(Log.INFO))
_log.info("Delaying resend of " + _packet + " as there are "
+ _activeResends + " active resends already in play");
@ -1104,6 +1131,7 @@ public class Connection {
_packet.setOptionalDelay(choke);
if (choke > 0)
_packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
// this seems unnecessary to send the MSS again:
_packet.setOptionalMaxSize(getOptions().getMaxMessageSize());
// bugfix release 0.7.8, we weren't dividing by 1000
_packet.setResendDelay(getOptions().getResendDelay() / 1000);

View File

@ -166,6 +166,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
packet.setReceiveStreamId(con.getReceiveStreamId());
con.getInputStream().updateAcks(packet);
// note that the optional delay is usually rewritten in Connection.sendPacket()
int choke = con.getOptions().getChoke();
packet.setOptionalDelay(choke);
if (choke > 0)
@ -197,12 +198,9 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
( (size > 0) || (con.getUnackedPacketsSent() <= 0) || (packet.getSequenceNum() > 0) ) ) {
packet.setFlag(Packet.FLAG_CLOSE);
con.setCloseSentOn(_context.clock().now());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Closed is set for a new packet on " + con + ": " + packet);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Closed is not set for a new packet on " + _connection + ": " + packet);
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("New outbound packet on " + _connection + ": " + packet);
return packet;
}

View File

@ -46,6 +46,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
public static final String PROP_INITIAL_RESEND_DELAY = "i2p.streaming.initialResendDelay";
public static final String PROP_INITIAL_ACK_DELAY = "i2p.streaming.initialAckDelay";
public static final String PROP_INITIAL_WINDOW_SIZE = "i2p.streaming.initialWindowSize";
/** unused */
public static final String PROP_INITIAL_RECEIVE_WINDOW = "i2p.streaming.initialReceiveWindow";
public static final String PROP_INACTIVITY_TIMEOUT = "i2p.streaming.inactivityTimeout";
public static final String PROP_INACTIVITY_ACTION = "i2p.streaming.inactivityAction";
@ -58,6 +59,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
static final int INITIAL_WINDOW_SIZE = 6;
static final int DEFAULT_MAX_SENDS = 8;
public static final int DEFAULT_INITIAL_RTT = 8*1000;
public static final int DEFAULT_INITIAL_ACK_DELAY = 2*1000;
static final int MIN_WINDOW_SIZE = 1;
private static final boolean DEFAULT_ANSWER_PINGS = true;
@ -217,7 +219,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
setRTT(getInt(opts, PROP_INITIAL_RTT, DEFAULT_INITIAL_RTT));
setReceiveWindow(getInt(opts, PROP_INITIAL_RECEIVE_WINDOW, 1));
setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 1000));
setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, 2000));
setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, DEFAULT_INITIAL_ACK_DELAY));
setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, INITIAL_WINDOW_SIZE));
setMaxResends(getInt(opts, PROP_MAX_RESENDS, DEFAULT_MAX_SENDS));
setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1));
@ -249,7 +251,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
if (opts.containsKey(PROP_INITIAL_RESEND_DELAY))
setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 1000));
if (opts.containsKey(PROP_INITIAL_ACK_DELAY))
setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, 2000));
setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, DEFAULT_INITIAL_ACK_DELAY));
if (opts.containsKey(PROP_INITIAL_WINDOW_SIZE))
setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, INITIAL_WINDOW_SIZE));
if (opts.containsKey(PROP_MAX_RESENDS))
@ -295,6 +297,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
* @return if we want signatures on all packets.
*/
public boolean getRequireFullySigned() { return _fullySigned; }
/** unused, see above */
public void setRequireFullySigned(boolean sign) { _fullySigned = sign; }
/**
@ -325,7 +328,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
}
/** after how many consecutive messages should we ack?
* This doesn't appear to be used.
* @deprecated This doesn't appear to be used.
* @return receive window size.
*/
public int getReceiveWindow() { return _receiveWindow; }
@ -405,6 +408,10 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
* @return ACK delay in ms
*/
public int getSendAckDelay() { return _sendAckDelay; }
/**
* Unused except here, so expect the default initial delay of 2000 ms unless set by the user
* to remain constant.
*/
public void setSendAckDelay(int delayMs) { _sendAckDelay = delayMs; }
/** What is the largest message we want to send or receive?

View File

@ -131,11 +131,14 @@ public class ConnectionPacketHandler {
isNew = false;
}
if ( (packet.getSequenceNum() == 0) && (packet.getPayloadSize() > 0) ) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("seq=0 && size=" + packet.getPayloadSize() + ": isNew? " + isNew
+ " packet: " + packet + " con: " + con);
}
//if ( (packet.getSequenceNum() == 0) && (packet.getPayloadSize() > 0) ) {
// if (_log.shouldLog(Log.DEBUG))
// _log.debug("seq=0 && size=" + packet.getPayloadSize() + ": isNew? " + isNew
// + " packet: " + packet + " con: " + con);
//}
if (_log.shouldLog(Log.DEBUG))
_log.debug((isNew ? "New" : "Dup or ack-only") + " inbound packet on " + con + ": " + packet);
// close *after* receiving the data, as well as after verifying the signatures / etc
if (packet.isFlagSet(Packet.FLAG_CLOSE) && packet.isFlagSet(Packet.FLAG_SIGNATURE_INCLUDED))
@ -151,7 +154,9 @@ public class ConnectionPacketHandler {
if (packet.isFlagSet(Packet.FLAG_DELAY_REQUESTED) && (packet.getOptionalDelay() <= 0) ) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Scheduling immediate ack for " + packet);
con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay());
//con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay());
// honor request "almost" immediately
con.setNextSendTime(_context.clock().now() + 250);
} else {
int delay = con.getOptions().getSendAckDelay();
if (packet.isFlagSet(Packet.FLAG_DELAY_REQUESTED)) // delayed ACK requested
@ -222,6 +227,10 @@ public class ConnectionPacketHandler {
// con.fastRetransmit();
}
/**
* Process the acks in a received packet, and adjust our window and RTT
* @return are we congested?
*/
private boolean ack(Connection con, long ackThrough, long nacks[], Packet packet, boolean isNew, boolean choke) {
if (ackThrough < 0) return false;
//if ( (nacks != null) && (nacks.length > 0) )
@ -287,7 +296,7 @@ public class ConnectionPacketHandler {
return adjustWindow(con, isNew, packet.getSequenceNum(), numResends, (acked != null ? acked.size() : 0), choke);
}
/** @return are we congested? */
private boolean adjustWindow(Connection con, boolean isNew, long sequenceNum, int numResends, int acked, boolean choke) {
boolean congested = false;
if ( (!isNew) && (sequenceNum > 0) ) {

View File

@ -42,7 +42,7 @@ import net.i2p.util.Log;
* <li>{@link #FLAG_MAX_PACKET_SIZE_INCLUDED}: 2 byte integer</li>
* <li>{@link #FLAG_PROFILE_INTERACTIVE}: no option data</li>
* <li>{@link #FLAG_ECHO}: no option data</li>
* <li>{@link #FLAG_NO_ACK}: no option data</li>
* <li>{@link #FLAG_NO_ACK}: no option data - this appears to be unused, we always ack, even for the first packet</li>
* </ol>
*
* <p>If the signature is included, it uses the Destination's DSA key