Streaming: Fix optional delay and choking (tickets #1046, 1939)

- Don't always send optional delay
- Don't overwrite choking delay with non-choking delay
- Don't send optional delay of 0 every 8 packets
- Don't set options both in CDR.buildPacket() and Conn.sendPacket()
- Set or clear optional delay in packet when retransmitting
- Move choking state variables from ConnectionOptions to Connection
- Move updateAcks() call from PacketLocal to PacketQueue
- Fully implement choking and un-choking
- Reduce periods for some stats
- Comment out some debug logging
- Cleanups
- Fix javadoc HTML broken in previous checkin
This commit is contained in:
zzz
2017-02-09 17:24:03 +00:00
parent f0241d4a1c
commit 2d8f0c2956
14 changed files with 224 additions and 116 deletions

View File

@ -69,6 +69,11 @@ class Connection {
private int _lastCongestionSeenAt; private int _lastCongestionSeenAt;
private long _lastCongestionTime; private long _lastCongestionTime;
private volatile long _lastCongestionHighestUnacked; private volatile long _lastCongestionHighestUnacked;
/** has the other side choked us? */
private volatile boolean _isChoked;
/** are we choking the other side? */
private volatile boolean _isChoking;
private final AtomicInteger _unchokesToSend = new AtomicInteger();
private final AtomicBoolean _ackSinceCongestion; private final AtomicBoolean _ackSinceCongestion;
/** Notify this on connection (or connection failure) */ /** Notify this on connection (or connection failure) */
private final Object _connectLock; private final Object _connectLock;
@ -102,6 +107,7 @@ class Connection {
private static final long MAX_CONNECT_TIMEOUT = 2*60*1000; private static final long MAX_CONNECT_TIMEOUT = 2*60*1000;
public static final int MAX_WINDOW_SIZE = 128; public static final int MAX_WINDOW_SIZE = 128;
private static final int UNCHOKES_TO_SEND = 8;
/**** /****
public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser,
@ -187,7 +193,7 @@ class Connection {
long timeLeft = writeExpire - _context.clock().now(); long timeLeft = writeExpire - _context.clock().now();
synchronized (_outboundPackets) { synchronized (_outboundPackets) {
if (!started) if (!started)
_context.statManager().addRateData("stream.chokeSizeBegin", _outboundPackets.size(), timeoutMs); _context.statManager().addRateData("stream.chokeSizeBegin", _outboundPackets.size());
if (start + 5*60*1000 < _context.clock().now()) // ok, 5 minutes blocking? I dont think so if (start + 5*60*1000 < _context.clock().now()) // ok, 5 minutes blocking? I dont think so
return false; return false;
@ -205,20 +211,20 @@ class Connection {
// Limit (highest-lowest) to twice the window (if far end doesn't like it, it can send a choke) // Limit (highest-lowest) to twice the window (if far end doesn't like it, it can send a choke)
int unacked = _outboundPackets.size(); int unacked = _outboundPackets.size();
int wsz = _options.getWindowSize(); int wsz = _options.getWindowSize();
if (unacked >= wsz || if (_isChoked || unacked >= wsz ||
_activeResends.get() >= (wsz + 1) / 2 || _activeResends.get() >= (wsz + 1) / 2 ||
_lastSendId.get() - _highestAckedThrough >= Math.max(MAX_WINDOW_SIZE, 2 * wsz)) { _lastSendId.get() - _highestAckedThrough >= Math.max(MAX_WINDOW_SIZE, 2 * wsz)) {
if (timeoutMs > 0) { if (timeoutMs > 0) {
if (timeLeft <= 0) { if (timeLeft <= 0) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Outbound window is full " + unacked _log.info("Outbound window is full (choked? " + _isChoked + ' ' + unacked
+ " unacked with " + _activeResends + " active resends" + " unacked with " + _activeResends + " active resends"
+ " and we've waited too long (" + (0-(timeLeft - timeoutMs)) + "ms): " + " and we've waited too long (" + (0-(timeLeft - timeoutMs)) + "ms): "
+ toString()); + toString());
return false; return false;
} }
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Outbound window is full (" + unacked + "/" + wsz + "/" _log.debug("Outbound window is full (choked? " + _isChoked + ' ' + unacked + '/' + wsz + '/'
+ _activeResends + "), waiting " + timeLeft); + _activeResends + "), waiting " + timeLeft);
try { try {
_outboundPackets.wait(Math.min(timeLeft,250l)); _outboundPackets.wait(Math.min(timeLeft,250l));
@ -240,7 +246,7 @@ class Connection {
} //10*1000 } //10*1000
} }
} else { } else {
_context.statManager().addRateData("stream.chokeSizeEnd", _outboundPackets.size(), _context.clock().now() - start); _context.statManager().addRateData("stream.chokeSizeEnd", _outboundPackets.size());
return true; return true;
} }
} }
@ -257,7 +263,7 @@ class Connection {
} }
void ackImmediately() { void ackImmediately() {
PacketLocal packet = null; PacketLocal packet;
/*** why would we do this? /*** why would we do this?
was it to force a congestion indication at the other end? was it to force a congestion indication at the other end?
an expensive way to do that... an expensive way to do that...
@ -343,6 +349,12 @@ class Connection {
} }
} }
/**
* This sends all 'normal' packets (acks and data) for the first time.
* Retransmits are done in ResendPacketEvent below.
* Resets, pings, and pongs are done elsewhere in this class,
* or in ConnectionManager or ConnectionHandler.
*/
void sendPacket(PacketLocal packet) { void sendPacket(PacketLocal packet) {
if (packet == null) return; if (packet == null) return;
@ -353,8 +365,15 @@ class Connection {
} }
if ( (packet.getSequenceNum() == 0) && (!packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) ) { if ( (packet.getSequenceNum() == 0) && (!packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) ) {
//if (_log.shouldLog(Log.DEBUG)) // ACK-only
// _log.debug("No resend for " + packet); if (_isChoking) {
packet.setOptionalDelay(Packet.SEND_DELAY_CHOKE);
packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
} else if (_unchokesToSend.decrementAndGet() > 0) {
// don't worry about wrapping around
packet.setOptionalDelay(0);
packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
}
} else { } else {
int windowSize; int windowSize;
int remaining; int remaining;
@ -364,13 +383,18 @@ class Connection {
remaining = windowSize - _outboundPackets.size() ; remaining = windowSize - _outboundPackets.size() ;
_outboundPackets.notifyAll(); _outboundPackets.notifyAll();
} }
// the other end has no idea what our window size is, so
// help him out by requesting acks below the 1/3 point, if (_isChoking) {
// if remaining < 3, and every 8 minimum. packet.setOptionalDelay(Packet.SEND_DELAY_CHOKE);
if (packet.isFlagSet(Packet.FLAG_CLOSE) || packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
(remaining < (windowSize + 2) / 3) || } else if (packet.isFlagSet(Packet.FLAG_CLOSE) ||
_unchokesToSend.decrementAndGet() > 0 ||
// the other end has no idea what our window size is, so
// help him out by requesting acks below the 1/3 point,
// if remaining < 3, and every 8 minimum.
(remaining < 3) || (remaining < 3) ||
(packet.getSequenceNum() % 8 == 0)) { (remaining < (windowSize + 2) / 3) /* ||
(packet.getSequenceNum() % 8 == 0) */ ) {
packet.setOptionalDelay(0); packet.setOptionalDelay(0);
packet.setFlag(Packet.FLAG_DELAY_REQUESTED); packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
//if (_log.shouldLog(Log.DEBUG)) //if (_log.shouldLog(Log.DEBUG))
@ -380,15 +404,15 @@ class Connection {
// since the other end limits it to getSendAckDelay() // since the other end limits it to getSendAckDelay()
// which is always 2000, but it's good for diagnostics to see what the other end thinks // which is always 2000, but it's good for diagnostics to see what the other end thinks
// the RTT is. // the RTT is.
/**
int delay = _options.getRTT() / 2; int delay = _options.getRTT() / 2;
packet.setOptionalDelay(delay); packet.setOptionalDelay(delay);
if (delay > 0) if (delay > 0)
packet.setFlag(Packet.FLAG_DELAY_REQUESTED); packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Requesting ack delay of " + delay + "ms for packet " + packet); _log.debug("Requesting ack delay of " + delay + "ms for packet " + packet);
**/
} }
// WHY always set?
//packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
long timeout = _options.getRTO(); long timeout = _options.getRTO();
if (timeout > MAX_RESEND_DELAY) if (timeout > MAX_RESEND_DELAY)
@ -984,6 +1008,55 @@ class Connection {
} }
} }
/**
* Set or clear if we are choking the other side.
* If on is true or the value has changed, this will call ackImmediately().
* @param on true for choking
* @since 0.9.29
*/
public void setChoking(boolean on) {
if (on != _isChoking) {
_isChoking = on;
if (!on)
_unchokesToSend.set(UNCHOKES_TO_SEND);
ackImmediately();
} else if (on) {
ackImmediately();
}
}
/**
* Set or clear if we are being choked by the other side.
* @param on true for choked
* @since 0.9.29
*/
public void setChoked(boolean on) {
_isChoked = on;
if (on) {
congestionOccurred();
// https://en.wikipedia.org/wiki/Transmission_Control_Protocol
// When a receiver advertises a window size of 0, the sender stops sending data and starts the persist timer.
// The persist timer is used to protect TCP from a deadlock situation that could arise
// if a subsequent window size update from the receiver is lost,
// and the sender cannot send more data until receiving a new window size update from the receiver.
// When the persist timer expires, the TCP sender attempts recovery by sending a small packet
// so that the receiver responds by sending another acknowledgement containing the new window size.
// ...
// We don't do any of that, but we set the window size to 1, and let the retransmission
// of packets do the "attempted recovery".
getOptions().setWindowSize(1);
}
}
/**
* Is the other side choking us?
* @return if choked
* @since 0.9.29
*/
public boolean isChoked() {
return _isChoked;
}
/** how many packets have we sent and the other side has ACKed? /** how many packets have we sent and the other side has ACKed?
* @return Count of how many packets ACKed. * @return Count of how many packets ACKed.
*/ */
@ -1381,10 +1454,18 @@ class Connection {
// revamp various fields, in case we need to ack more, etc // revamp various fields, in case we need to ack more, etc
// updateAcks done in enqueue() // updateAcks done in enqueue()
//_inputStream.updateAcks(_packet); //_inputStream.updateAcks(_packet);
int choke = getOptions().getChoke(); if (_isChoking) {
_packet.setOptionalDelay(choke); _packet.setOptionalDelay(Packet.SEND_DELAY_CHOKE);
if (choke > 0)
_packet.setFlag(Packet.FLAG_DELAY_REQUESTED); _packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
} else if (_unchokesToSend.decrementAndGet() > 0) {
// don't worry about wrapping around
_packet.setOptionalDelay(0);
_packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
} else {
// clear flag
_packet.setFlag(Packet.FLAG_DELAY_REQUESTED, false);
}
// this seems unnecessary to send the MSS again: // this seems unnecessary to send the MSS again:
//_packet.setOptionalMaxSize(getOptions().getMaxMessageSize()); //_packet.setOptionalMaxSize(getOptions().getMaxMessageSize());
// bugfix release 0.7.8, we weren't dividing by 1000 // bugfix release 0.7.8, we weren't dividing by 1000
@ -1396,7 +1477,10 @@ class Connection {
int newWindowSize = getOptions().getWindowSize(); int newWindowSize = getOptions().getWindowSize();
if (_ackSinceCongestion.get()) { if (_isChoked) {
congestionOccurred();
getOptions().setWindowSize(1);
} else if (_ackSinceCongestion.get()) {
// only shrink the window once per window // only shrink the window once per window
if (_packet.getSequenceNum() > _lastCongestionHighestUnacked) { if (_packet.getSequenceNum() > _lastCongestionHighestUnacked) {
congestionOccurred(); congestionOccurred();

View File

@ -156,6 +156,11 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
} }
/** /**
* Compose a packet.
* Most flags are set here; however, some are set in Connection.sendPacket()
* and Connection.ResendPacketEvent.retransmit().
* Take care not to set the same options both here and in Connection.
*
* @param buf data to be sent - may be null * @param buf data to be sent - may be null
* @param off offset into the buffer to start writing from * @param off offset into the buffer to start writing from
* @param size how many bytes of the buffer to write (may be 0) * @param size how many bytes of the buffer to write (may be 0)
@ -164,12 +169,11 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
* @return the packet to be sent * @return the packet to be sent
*/ */
private PacketLocal buildPacket(byte buf[], int off, int size, boolean forceIncrement) { private PacketLocal buildPacket(byte buf[], int off, int size, boolean forceIncrement) {
Connection con = _connection;
if (size > Packet.MAX_PAYLOAD_SIZE) throw new IllegalArgumentException("size is too large (" + size + ")"); if (size > Packet.MAX_PAYLOAD_SIZE) throw new IllegalArgumentException("size is too large (" + size + ")");
boolean ackOnly = isAckOnly(con, size); boolean ackOnly = isAckOnly(_connection, size);
boolean isFirst = (con.getAckedPackets() <= 0) && (con.getUnackedPacketsSent() <= 0); boolean isFirst = (_connection.getAckedPackets() <= 0) && (_connection.getUnackedPacketsSent() <= 0);
PacketLocal packet = new PacketLocal(_context, con.getRemotePeer(), con); PacketLocal packet = new PacketLocal(_context, _connection.getRemotePeer(), _connection);
//ByteArray data = packet.acquirePayload(); //ByteArray data = packet.acquirePayload();
ByteArray data = new ByteArray(new byte[size]); ByteArray data = new ByteArray(new byte[size]);
if (size > 0) if (size > 0)
@ -180,36 +184,32 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
if ( (ackOnly && !forceIncrement) && (!isFirst) ) if ( (ackOnly && !forceIncrement) && (!isFirst) )
packet.setSequenceNum(0); packet.setSequenceNum(0);
else else
packet.setSequenceNum(con.getNextOutboundPacketNum()); packet.setSequenceNum(_connection.getNextOutboundPacketNum());
packet.setSendStreamId(con.getSendStreamId()); packet.setSendStreamId(_connection.getSendStreamId());
packet.setReceiveStreamId(con.getReceiveStreamId()); packet.setReceiveStreamId(_connection.getReceiveStreamId());
// not needed here, handled in PacketQueue.enqueue() // not needed here, handled in PacketQueue.enqueue()
//con.getInputStream().updateAcks(packet); //con.getInputStream().updateAcks(packet);
// note that the optional delay is usually rewritten in Connection.sendPacket()
int choke = con.getOptions().getChoke(); // Do not set optional delay here, set in Connection.sendPacket()
packet.setOptionalDelay(choke);
if (choke > 0)
packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
// bugfix release 0.7.8, we weren't dividing by 1000 // bugfix release 0.7.8, we weren't dividing by 1000
packet.setResendDelay(con.getOptions().getResendDelay() / 1000); packet.setResendDelay(_connection.getOptions().getResendDelay() / 1000);
if (con.getOptions().getProfile() == ConnectionOptions.PROFILE_INTERACTIVE) if (_connection.getOptions().getProfile() == ConnectionOptions.PROFILE_INTERACTIVE)
packet.setFlag(Packet.FLAG_PROFILE_INTERACTIVE, true); packet.setFlag(Packet.FLAG_PROFILE_INTERACTIVE, true);
else else
packet.setFlag(Packet.FLAG_PROFILE_INTERACTIVE, false); packet.setFlag(Packet.FLAG_PROFILE_INTERACTIVE, false);
packet.setFlag(Packet.FLAG_SIGNATURE_REQUESTED, con.getOptions().getRequireFullySigned());
//if ( (!ackOnly) && (packet.getSequenceNum() <= 0) ) { //if ( (!ackOnly) && (packet.getSequenceNum() <= 0) ) {
if (isFirst) { if (isFirst) {
packet.setFlag(Packet.FLAG_SYNCHRONIZE); packet.setFlag(Packet.FLAG_SYNCHRONIZE);
packet.setOptionalFrom(); packet.setOptionalFrom();
packet.setOptionalMaxSize(con.getOptions().getMaxMessageSize()); packet.setOptionalMaxSize(_connection.getOptions().getMaxMessageSize());
} }
packet.setLocalPort(con.getLocalPort()); packet.setLocalPort(_connection.getLocalPort());
packet.setRemotePort(con.getPort()); packet.setRemotePort(_connection.getPort());
if (con.getSendStreamId() == Packet.STREAM_ID_UNKNOWN) { if (_connection.getSendStreamId() == Packet.STREAM_ID_UNKNOWN) {
packet.setFlag(Packet.FLAG_NO_ACK); packet.setFlag(Packet.FLAG_NO_ACK);
} }
@ -221,10 +221,10 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
// FIXME Implement better half-close by sending CLOSE whenever. Needs 0.9.9 bug fixes // FIXME Implement better half-close by sending CLOSE whenever. Needs 0.9.9 bug fixes
// throughout network? // throughout network?
// //
if (con.getOutputStream().getClosed() && if (_connection.getOutputStream().getClosed() &&
( (size > 0) || (con.getUnackedPacketsSent() <= 0) || (packet.getSequenceNum() > 0) ) ) { ( (size > 0) || (_connection.getUnackedPacketsSent() <= 0) || (packet.getSequenceNum() > 0) ) ) {
packet.setFlag(Packet.FLAG_CLOSE); packet.setFlag(Packet.FLAG_CLOSE);
con.notifyCloseSent(); _connection.notifyCloseSent();
} }
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("New OB pkt (acks not yet filled in): " + packet + " on " + _connection); _log.debug("New OB pkt (acks not yet filled in): " + packet + " on " + _connection);

View File

@ -105,13 +105,13 @@ class ConnectionManager {
_context.statManager().createRateStat("stream.con.lifetimeSendWindowSize", "What is the final send window size when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.con.lifetimeSendWindowSize", "What is the final send window size when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.receiveActive", "How many streams are active when a new one is received (period being not yet dropped)", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.receiveActive", "How many streams are active when a new one is received (period being not yet dropped)", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
// Stats for Connection // Stats for Connection
_context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*60*1000 });
_context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*60*1000 });
_context.statManager().createRateStat("stream.chokeSizeEnd", "How many messages were outstanding when we stopped being choked?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.chokeSizeEnd", "How many messages were outstanding when we stopped being choked?", "Stream", new long[] { 60*60*1000 });
_context.statManager().createRateStat("stream.fastRetransmit", "How long a packet has been around for if it has been resent per the fast retransmit timer?", "Stream", new long[] { 60*1000, 10*60*1000 }); _context.statManager().createRateStat("stream.fastRetransmit", "How long a packet has been around for if it has been resent per the fast retransmit timer?", "Stream", new long[] { 10*60*1000 });
// Stats for PacketQueue // Stats for PacketQueue
_context.statManager().createRateStat("stream.con.sendMessageSize", "Size of a message sent on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.con.sendMessageSize", "Size of a message sent on a connection", "Stream", new long[] { 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.con.sendDuplicateSize", "Size of a message resent on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.con.sendDuplicateSize", "Size of a message resent on a connection", "Stream", new long[] { 10*60*1000, 60*60*1000 });
} }
Connection getConnectionByInboundId(long id) { Connection getConnectionByInboundId(long id) {

View File

@ -33,7 +33,6 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
private int _resendDelay; private int _resendDelay;
private int _sendAckDelay; private int _sendAckDelay;
private int _maxMessageSize; private int _maxMessageSize;
private int _choke;
private int _maxResends; private int _maxResends;
private int _inactivityTimeout; private int _inactivityTimeout;
private int _inactivityAction; private int _inactivityAction;
@ -327,7 +326,6 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
setWindowSize(opts.getWindowSize()); setWindowSize(opts.getWindowSize());
setResendDelay(opts.getResendDelay()); setResendDelay(opts.getResendDelay());
setMaxMessageSize(opts.getMaxMessageSize()); setMaxMessageSize(opts.getMaxMessageSize());
setChoke(opts.getChoke());
setMaxResends(opts.getMaxResends()); setMaxResends(opts.getMaxResends());
setInactivityTimeout(opts.getInactivityTimeout()); setInactivityTimeout(opts.getInactivityTimeout());
setInactivityAction(opts.getInactivityAction()); setInactivityAction(opts.getInactivityAction());
@ -677,15 +675,6 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
public int getMaxMessageSize() { return _maxMessageSize; } public int getMaxMessageSize() { return _maxMessageSize; }
public void setMaxMessageSize(int bytes) { _maxMessageSize = Math.max(bytes, MIN_MESSAGE_SIZE); } public void setMaxMessageSize(int bytes) { _maxMessageSize = Math.max(bytes, MIN_MESSAGE_SIZE); }
/**
* how long we want to wait before any data is transferred on the
* connection in either direction
*
* @return how long to wait before any data is transferred in either direction in ms
*/
public int getChoke() { return _choke; }
public void setChoke(int ms) { _choke = ms; }
/** /**
* What profile do we want to use for this connection? * What profile do we want to use for this connection?
* TODO: Only bulk is supported so far. * TODO: Only bulk is supported so far.

View File

@ -96,23 +96,27 @@ class ConnectionPacketHandler {
boolean choke = false; boolean choke = false;
if (packet.isFlagSet(Packet.FLAG_DELAY_REQUESTED)) { if (packet.isFlagSet(Packet.FLAG_DELAY_REQUESTED)) {
if (packet.getOptionalDelay() > 60000) { if (packet.getOptionalDelay() >= Packet.MIN_DELAY_CHOKE) {
// requested choke // requested choke
choke = true; choke = true;
if (_log.shouldWarn())
_log.warn("Got a choke on connection " + con + ": " + packet);
//con.getOptions().setRTT(con.getOptions().getRTT() + 10*1000); //con.getOptions().setRTT(con.getOptions().getRTT() + 10*1000);
} }
// Only call this if the flag is set
con.setChoked(choke);
} }
if (!con.getInputStream().canAccept(seqNum, packet.getPayloadSize())) { if (!con.getInputStream().canAccept(seqNum, packet.getPayloadSize())) {
if (_log.shouldWarn()) if (_log.shouldWarn())
_log.warn("Inbound buffer exceeded on connection " + con + _log.warn("Inbound buffer exceeded on connection " + con +
", dropping " + packet); ", choking and dropping " + packet);
con.getOptions().setChoke(61*1000); // this will call ackImmediately()
con.setChoking(true);
// TODO we could still process the acks for this packet before discarding
packet.releasePayload(); packet.releasePayload();
con.ackImmediately();
return; return;
} } // else we will call setChoking(false) below
con.getOptions().setChoke(0);
_context.statManager().addRateData("stream.con.receiveMessageSize", packet.getPayloadSize()); _context.statManager().addRateData("stream.con.receiveMessageSize", packet.getPayloadSize());
@ -132,12 +136,20 @@ class ConnectionPacketHandler {
// MessageInputStream will know the last sequence number. // MessageInputStream will know the last sequence number.
// But not ack-only packets! // But not ack-only packets!
boolean isNew; boolean isNew;
if (seqNum > 0 || isSYN) if (seqNum > 0 || isSYN) {
isNew = con.getInputStream().messageReceived(seqNum, packet.getPayload()); isNew = con.getInputStream().messageReceived(seqNum, packet.getPayload()) &&
else !allowAck;
isNew = false; } else {
if (!allowAck)
isNew = false; isNew = false;
}
if (isNew && packet.getPayloadSize() > 1500) {
// don't clear choking unless it was new, and a big packet
// this will call ackImmediately() if changed
// TODO if this filled in a hole, we shouldn't unchoke
// TODO a bunch of small packets should unchoke also
con.setChoking(false);
}
//if ( (packet.getSequenceNum() == 0) && (packet.getPayloadSize() > 0) ) { //if ( (packet.getSequenceNum() == 0) && (packet.getPayloadSize() > 0) ) {
// if (_log.shouldLog(Log.DEBUG)) // if (_log.shouldLog(Log.DEBUG))
@ -158,7 +170,6 @@ class ConnectionPacketHandler {
_log.debug(type + " IB pkt: " + packet + " on " + con); _log.debug(type + " IB pkt: " + packet + " on " + con);
} }
boolean fastAck = false;
boolean ackOnly = false; boolean ackOnly = false;
if (isNew) { if (isNew) {
@ -170,6 +181,9 @@ class ConnectionPacketHandler {
_log.debug("Scheduling immediate ack for " + packet); _log.debug("Scheduling immediate ack for " + packet);
//con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay()); //con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay());
// honor request "almost" immediately // honor request "almost" immediately
// TODO the 250 below _may_ be a big limiter in how fast local "loopback" connections
// can go, however if it goes too fast then we start choking which causes
// frequent stalls anyway.
con.setNextSendTime(_context.clock().now() + 250); con.setNextSendTime(_context.clock().now() + 250);
} else { } else {
int delay = con.getOptions().getSendAckDelay(); int delay = con.getOptions().getSendAckDelay();
@ -222,14 +236,16 @@ class ConnectionPacketHandler {
} }
} }
boolean fastAck;
if (isSYN && (packet.getSendStreamId() <= 0) ) { if (isSYN && (packet.getSendStreamId() <= 0) ) {
// don't honor the ACK 0 in SYN packets received when the other side // don't honor the ACK 0 in SYN packets received when the other side
// has obviously not seen our messages // has obviously not seen our messages
fastAck = false;
} else { } else {
fastAck = ack(con, packet.getAckThrough(), packet.getNacks(), packet, isNew, choke); fastAck = ack(con, packet.getAckThrough(), packet.getNacks(), packet, isNew, choke);
} }
con.eventOccurred(); con.eventOccurred();
if (fastAck) { if (fastAck && !choke) {
if (!isNew) { if (!isNew) {
// if we're congested (fastAck) but this is also a new packet, // if we're congested (fastAck) but this is also a new packet,
// we've already scheduled an ack above, so there is no need to schedule // we've already scheduled an ack above, so there is no need to schedule
@ -266,6 +282,8 @@ class ConnectionPacketHandler {
/** /**
* Process the acks in a received packet, and adjust our window and RTT * Process the acks in a received packet, and adjust our window and RTT
* @param isNew was it a new packet? false for ack-only
* @param choke did we get a choke in the packet?
* @return are we congested? * @return are we congested?
*/ */
private boolean ack(Connection con, long ackThrough, long nacks[], Packet packet, boolean isNew, boolean choke) { private boolean ack(Connection con, long ackThrough, long nacks[], Packet packet, boolean isNew, boolean choke) {
@ -354,16 +372,25 @@ class ConnectionPacketHandler {
return rv; return rv;
} }
/** @return are we congested? */ /**
* This either does nothing or increases the window, it never decreases it.
* Decreasing is done in Connection.ResendPacketEvent.retransmit()
*
* @param isNew was it a new packet? false for ack-only
* @param sequenceNum 0 for ack-only
* @param choke did we get a choke in the packet?
* @return are we congested?
*/
private boolean adjustWindow(Connection con, boolean isNew, long sequenceNum, int numResends, int acked, boolean choke) { private boolean adjustWindow(Connection con, boolean isNew, long sequenceNum, int numResends, int acked, boolean choke) {
boolean congested = false; boolean congested;
if ( (!isNew) && (sequenceNum > 0) ) { if (choke || (!isNew && sequenceNum > 0) || con.isChoked()) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Congestion occurred on the sending side. Not adjusting window "+con); _log.debug("Congestion occurred on the sending side. Not adjusting window "+con);
congested = true; congested = true;
} } else {
congested = false;
}
long lowest = con.getHighestAckedThrough(); long lowest = con.getHighestAckedThrough();
// RFC 2581 // RFC 2581
// Why wait until we get a whole cwin to start updating the window? // Why wait until we get a whole cwin to start updating the window?

View File

@ -14,7 +14,7 @@ import net.i2p.util.Log;
/** /**
* Receive raw information from the I2PSession and turn it into * Receive raw information from the I2PSession and turn it into
* Packets, if we can. * Packets, if we can.
*&lt;p&gt; *<p>
* I2PSession -&gt; MessageHandler -&gt; PacketHandler -&gt; ConnectionPacketHandler -&gt; MessageInputStream * I2PSession -&gt; MessageHandler -&gt; PacketHandler -&gt; ConnectionPacketHandler -&gt; MessageInputStream
*/ */
class MessageHandler implements I2PSessionMuxedListener { class MessageHandler implements I2PSessionMuxedListener {

View File

@ -16,9 +16,9 @@ import net.i2p.util.Log;
/** /**
* Stream that can be given messages out of order * Stream that can be given messages out of order
* yet present them in order. * yet present them in order.
*&lt;p&gt; *<p>
* I2PSession -&gt; MessageHandler -&gt; PacketHandler -&gt; ConnectionPacketHandler -&gt; MessageInputStream * I2PSession -&gt; MessageHandler -&gt; PacketHandler -&gt; ConnectionPacketHandler -&gt; MessageInputStream
*&lt;p&gt; *<p>
* This buffers unlimited data via messageReceived() - * This buffers unlimited data via messageReceived() -
* limiting / blocking is done in ConnectionPacketHandler.receivePacket(). * limiting / blocking is done in ConnectionPacketHandler.receivePacket().
* *
@ -102,6 +102,7 @@ class MessageInputStream extends InputStream {
/** /**
* Determine if this packet will fit in our buffering limits. * Determine if this packet will fit in our buffering limits.
* Always returns true for zero payloadSize.
* *
* @return true if we have room. If false, do not call messageReceived() * @return true if we have room. If false, do not call messageReceived()
* @since 0.9.20 moved from ConnectionPacketHandler.receivePacket() so it can all be under one lock, * @since 0.9.20 moved from ConnectionPacketHandler.receivePacket() so it can all be under one lock,

View File

@ -16,7 +16,7 @@ import net.i2p.util.SimpleTimer2;
* A stream that we can shove data into that fires off those bytes * A stream that we can shove data into that fires off those bytes
* on flush or when the buffer is full. It also blocks according * on flush or when the buffer is full. It also blocks according
* to the data receiver's needs. * to the data receiver's needs.
*&lt;p&gt; *<p>
* MessageOutputStream -&gt; ConnectionDataReceiver -&gt; Connection -&gt; PacketQueue -&gt; I2PSession * MessageOutputStream -&gt; ConnectionDataReceiver -&gt; Connection -&gt; PacketQueue -&gt; I2PSession
*/ */
class MessageOutputStream extends OutputStream { class MessageOutputStream extends OutputStream {

View File

@ -164,6 +164,8 @@ class Packet {
public static final int DEFAULT_MAX_SIZE = 32*1024; public static final int DEFAULT_MAX_SIZE = 32*1024;
protected static final int MAX_DELAY_REQUEST = 65535; protected static final int MAX_DELAY_REQUEST = 65535;
public static final int MIN_DELAY_CHOKE = 60001;
public static final int SEND_DELAY_CHOKE = 61000;
/** /**
* Does no initialization. * Does no initialization.

View File

@ -11,7 +11,7 @@ import net.i2p.util.Log;
/** /**
* receive a packet and dispatch it correctly to the connection specified, * receive a packet and dispatch it correctly to the connection specified,
* the server socket, or queue a reply RST packet. * the server socket, or queue a reply RST packet.
*&lt;p&gt; *<p>
* I2PSession -&gt; MessageHandler -&gt; PacketHandler -&gt; ConnectionPacketHandler -&gt; MessageInputStream * I2PSession -&gt; MessageHandler -&gt; PacketHandler -&gt; ConnectionPacketHandler -&gt; MessageInputStream
*/ */
class PacketHandler { class PacketHandler {

View File

@ -111,18 +111,6 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
FLAG_ECHO); FLAG_ECHO);
} }
/** last minute update of ack fields, just before write/sign */
public void prepare() {
if (_connection != null)
_connection.getInputStream().updateAcks(this);
int numSends = _numSends.get();
if (numSends > 0) {
// so we can debug to differentiate resends
setOptionalDelay(numSends * 1000);
setFlag(FLAG_DELAY_REQUESTED);
}
}
public long getCreatedOn() { return _createdOn; } public long getCreatedOn() { return _createdOn; }
public long getLifetime() { return _context.clock().now() - _createdOn; } public long getLifetime() { return _context.clock().now() - _createdOn; }
public void incrementSends() { public void incrementSends() {

View File

@ -23,7 +23,7 @@ import net.i2p.util.SimpleTimer2;
* Well, thats the theory at least... in practice we just * Well, thats the theory at least... in practice we just
* send them immediately with no blocking, since the * send them immediately with no blocking, since the
* mode=bestEffort doesnt block in the SDK. * mode=bestEffort doesnt block in the SDK.
*&lt;p&gt; *<p>
* MessageOutputStream -&gt; ConnectionDataReceiver -&gt; Connection -&gt; PacketQueue -&gt; I2PSession * MessageOutputStream -&gt; ConnectionDataReceiver -&gt; Connection -&gt; PacketQueue -&gt; I2PSession
*/ */
class PacketQueue implements SendMessageStatusListener, Closeable { class PacketQueue implements SendMessageStatusListener, Closeable {
@ -64,7 +64,8 @@ class PacketQueue implements SendMessageStatusListener, Closeable {
} }
/** /**
* Add a new packet to be sent out ASAP * Add a new packet to be sent out ASAP.
* This updates the acks.
* *
* keys and tags disabled since dropped in I2PSession * keys and tags disabled since dropped in I2PSession
* @return true if sent * @return true if sent
@ -72,8 +73,6 @@ class PacketQueue implements SendMessageStatusListener, Closeable {
public boolean enqueue(PacketLocal packet) { public boolean enqueue(PacketLocal packet) {
if (_dead) if (_dead)
return false; return false;
// this updates the ack/nack field
packet.prepare();
//SessionKey keyUsed = packet.getKeyUsed(); //SessionKey keyUsed = packet.getKeyUsed();
//if (keyUsed == null) //if (keyUsed == null)
@ -87,6 +86,12 @@ class PacketQueue implements SendMessageStatusListener, Closeable {
_log.debug("Not resending " + packet); _log.debug("Not resending " + packet);
return false; return false;
} }
Connection con = packet.getConnection();
if (con != null) {
// this updates the ack/nack fields
con.getInputStream().updateAcks(packet);
}
ByteArray ba = _cache.acquire(); ByteArray ba = _cache.acquire();
byte buf[] = ba.getData(); byte buf[] = ba.getData();
@ -96,14 +101,14 @@ class PacketQueue implements SendMessageStatusListener, Closeable {
boolean sent = false; boolean sent = false;
try { try {
int size = 0; int size = 0;
long beforeWrite = System.currentTimeMillis(); //long beforeWrite = System.currentTimeMillis();
if (packet.shouldSign()) if (packet.shouldSign())
size = packet.writeSignedPacket(buf, 0); size = packet.writeSignedPacket(buf, 0);
else else
size = packet.writePacket(buf, 0); size = packet.writePacket(buf, 0);
long writeTime = System.currentTimeMillis() - beforeWrite; //long writeTime = System.currentTimeMillis() - beforeWrite;
if ( (writeTime > 1000) && (_log.shouldLog(Log.WARN)) ) //if ( (writeTime > 1000) && (_log.shouldLog(Log.WARN)) )
_log.warn("took " + writeTime + "ms to write the packet: " + packet); // _log.warn("took " + writeTime + "ms to write the packet: " + packet);
// last chance to short circuit... // last chance to short circuit...
if (packet.getAckTime() > 0) return false; if (packet.getAckTime() > 0) return false;
@ -121,7 +126,6 @@ class PacketQueue implements SendMessageStatusListener, Closeable {
options.setDate(expires); options.setDate(expires);
boolean listenForStatus = false; boolean listenForStatus = false;
if (packet.isFlagSet(FLAGS_INITIAL_TAGS)) { if (packet.isFlagSet(FLAGS_INITIAL_TAGS)) {
Connection con = packet.getConnection();
if (con != null) { if (con != null) {
if (con.isInbound()) if (con.isInbound())
options.setSendLeaseSet(false); options.setSendLeaseSet(false);
@ -141,7 +145,6 @@ class PacketQueue implements SendMessageStatusListener, Closeable {
options.setTagsToSend(FINAL_TAGS_TO_SEND); options.setTagsToSend(FINAL_TAGS_TO_SEND);
options.setTagThreshold(FINAL_TAG_THRESHOLD); options.setTagThreshold(FINAL_TAG_THRESHOLD);
} else { } else {
Connection con = packet.getConnection();
if (con != null) { if (con != null) {
if (con.isInbound() && con.getLifetime() < 2*60*1000) if (con.isInbound() && con.getLifetime() < 2*60*1000)
options.setSendLeaseSet(false); options.setSendLeaseSet(false);
@ -157,7 +160,7 @@ class PacketQueue implements SendMessageStatusListener, Closeable {
long id = session.sendMessage(packet.getTo(), buf, 0, size, long id = session.sendMessage(packet.getTo(), buf, 0, size,
I2PSession.PROTO_STREAMING, packet.getLocalPort(), packet.getRemotePort(), I2PSession.PROTO_STREAMING, packet.getLocalPort(), packet.getRemotePort(),
options, this); options, this);
_messageStatusMap.put(Long.valueOf(id), packet.getConnection()); _messageStatusMap.put(Long.valueOf(id), con);
sent = true; sent = true;
} else { } else {
sent = session.sendMessage(packet.getTo(), buf, 0, size, sent = session.sendMessage(packet.getTo(), buf, 0, size,
@ -173,7 +176,6 @@ class PacketQueue implements SendMessageStatusListener, Closeable {
if (packet.getNumSends() > 1) if (packet.getNumSends() > 1)
_context.statManager().addRateData("stream.con.sendDuplicateSize", size, packet.getLifetime()); _context.statManager().addRateData("stream.con.sendDuplicateSize", size, packet.getLifetime());
Connection con = packet.getConnection();
if (con != null) { if (con != null) {
con.incrementBytesSent(size); con.incrementBytesSent(size);
if (packet.getNumSends() > 1) if (packet.getNumSends() > 1)
@ -189,17 +191,15 @@ class PacketQueue implements SendMessageStatusListener, Closeable {
if (!sent) { if (!sent) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Send failed for " + packet); _log.warn("Send failed for " + packet);
Connection c = packet.getConnection(); if (con != null) // handle race on b0rk
if (c != null) // handle race on b0rk con.disconnect(false);
c.disconnect(false);
} else { } else {
//packet.setKeyUsed(keyUsed); //packet.setKeyUsed(keyUsed);
//packet.setTagsSent(tagsSent); //packet.setTagsSent(tagsSent);
packet.incrementSends(); packet.incrementSends();
Connection c = packet.getConnection(); if (con != null && _log.shouldDebug()) {
if (c != null && _log.shouldDebug()) { String suffix = "wsize " + con.getOptions().getWindowSize() + " rto " + con.getOptions().getRTO();
String suffix = "wsize " + c.getOptions().getWindowSize() + " rto " + c.getOptions().getRTO(); con.getConnectionManager().getPacketHandler().displayPacket(packet, "SEND", suffix);
c.getConnectionManager().getPacketHandler().displayPacket(packet, "SEND", suffix);
} }
if (I2PSocketManagerFull.pcapWriter != null && if (I2PSocketManagerFull.pcapWriter != null &&
_context.getBooleanProperty(I2PSocketManagerFull.PROP_PCAP)) _context.getBooleanProperty(I2PSocketManagerFull.PROP_PCAP))

View File

@ -1,4 +1,21 @@
2017-02-09 zzz
* Streaming: Fix optional delay and choking (tickets #1046, 1939)
2017-02-08 zzz
* I2CP: Return local delivery failure on queue overflow (ticket #1939)
2017-02-05 zzz
* Console: Consolidate timer threads (ticket #1068)
* NTCP: Don't write to an inbound connection before
fully established, causing NPE (ticket #996)
* Streaming:
- Don't always send optional delay (ticket #1046)
- Don't hard fail on expired message error (ticket #1748)
2017-02-04 zzz 2017-02-04 zzz
* HTTP proxies:
- Pass through relative referer URIs, convert same-origin
absolute referer URIs to relative (ticket #1862)
* NTP: Enable IPv6 support (ticket #1896) * NTP: Enable IPv6 support (ticket #1896)
2017-01-30 zzz 2017-01-30 zzz

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */ /** deprecated */
public final static String ID = "Monotone"; public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION; public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 5; public final static long BUILD = 6;
/** for example "-test" */ /** for example "-test" */
public final static String EXTRA = ""; public final static String EXTRA = "";