diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java index 5a41fb394c..a556c95ffd 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -127,7 +127,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { static final int INITIAL_WINDOW_SIZE = 6; static final int DEFAULT_MAX_SENDS = 8; public static final int DEFAULT_INITIAL_RTT = 8*1000; - public static final int DEFAULT_INITIAL_ACK_DELAY = 2*1000; + public static final int DEFAULT_INITIAL_ACK_DELAY = 1000; static final int MIN_WINDOW_SIZE = 1; private static final boolean DEFAULT_ANSWER_PINGS = true; private static final int DEFAULT_INACTIVITY_TIMEOUT = 90*1000; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index 9d353075cb..62283ddc66 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -37,6 +37,8 @@ class ConnectionPacketHandler { _context.statManager().createRateStat("stream.trend", "What direction the RTT is trending in (with period = windowsize)", "Stream", new long[] { 60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.con.initialRTT.in", "What is the actual RTT for the first packet of an inbound conn?", "Stream", new long[] { 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.con.initialRTT.out", "What is the actual RTT for the first packet of an outbound conn?", "Stream", new long[] { 10*60*1000, 60*60*1000 }); + _context.statManager().createFrequencyStat("stream.ack.dup.immediate","How often duplicate packets get acked immediately","Stream",new long[] { 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("stream.ack.dup.sent","Whether the ack for a duplicate packet was sent as scheduled","Stream",new long[] { 10*60*1000, 60*60*1000 }); } /** distribute a packet to the connection specified */ @@ -184,11 +186,28 @@ class ConnectionPacketHandler { con.incrementDupMessagesReceived(1); // take note of congestion + + final long now = _context.clock().now(); + final int ackDelay = con.getOptions().getSendAckDelay(); + final long lastSendTime = con.getLastSendTime(); + if (_log.shouldLog(Log.WARN)) - _log.warn("congestion.. dup " + packet); - _context.simpleScheduler().addEvent(new AckDup(con), con.getOptions().getSendAckDelay()); - //con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay()); - //fastAck = true; + _log.warn(String.format("%s congestion.. dup packet %s now %d ackDelay %d lastSend %d", + con, packet, now, ackDelay, lastSendTime)); + + final long nextSendTime = lastSendTime + ackDelay; + if (nextSendTime <= now) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("immediate ack"); + con.ackImmediately(); + _context.statManager().getFrequency("stream.ack.dup.immediate").eventOccurred(); + } else { + final long delay = nextSendTime - now; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("scheduling ack in "+delay); + _context.simpleScheduler().addEvent(new AckDup(con), delay); + } + } else { if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) { //con.incrementUnackedPacketsReceived(); @@ -542,6 +561,7 @@ class ConnectionPacketHandler { } public void timeReached() { + boolean sent = false; if (_con.getLastSendTime() <= _created) { if (_con.getResetReceived() || _con.getResetSent()) { if (_log.shouldLog(Log.DEBUG)) @@ -554,10 +574,12 @@ class ConnectionPacketHandler { // we haven't done anything since receiving the dup, send an // ack now _con.ackImmediately(); + sent = true; } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Ack dup on " + _con + ", but we have sent (" + (_con.getLastSendTime()-_created) + ")"); } + _context.statManager().getRate("stream.ack.dup.sent").addData(sent ? 1 : 0); } } } diff --git a/history.txt b/history.txt index 168e88c82f..16b1c2e0e8 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,8 @@ +2013-08-11 zab + * Streaming: + - reduce initial ack delay 2000->1000 + - rework the logic of acking duplicate packets + * 2013-08-10 0.9.7.1 released 2012-08-10 zzz