From 9dfa87ba472f729a3c4ab5b335701b6c23b90b40 Mon Sep 17 00:00:00 2001 From: jrandom Date: Fri, 30 Sep 2005 23:12:57 +0000 Subject: [PATCH] 2005-09-30 jrandom * Killed three more streaming lib bugs, one of which caused excess packets to be transmitted (dupacking dupacks), one that was the root of many of the old hung streams (shrinking highest received), and another that was releasing data too soon. --- .../net/i2p/client/streaming/Connection.java | 20 ++++++++++------- .../streaming/ConnectionPacketHandler.java | 22 +++++++++++++------ .../client/streaming/MessageInputStream.java | 2 ++ .../src/net/i2p/client/streaming/Packet.java | 2 +- history.txt | 8 ++++++- .../src/net/i2p/router/RouterVersion.java | 4 ++-- 6 files changed, 39 insertions(+), 19 deletions(-) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index d73ae8c43..6161723d3 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -311,16 +311,20 @@ public class Connection { } List ackPackets(long ackThrough, long nacks[]) { - if (nacks == null) { - _highestAckedThrough = ackThrough; + if (ackThrough < _highestAckedThrough) { + // dupack which won't tell us anything } else { - long lowest = -1; - for (int i = 0; i < nacks.length; i++) { - if ( (lowest < 0) || (nacks[i] < lowest) ) - lowest = nacks[i]; + if (nacks == null) { + _highestAckedThrough = ackThrough; + } else { + long lowest = -1; + for (int i = 0; i < nacks.length; i++) { + if ( (lowest < 0) || (nacks[i] < lowest) ) + lowest = nacks[i]; + } + if (lowest - 1 > _highestAckedThrough) + _highestAckedThrough = lowest - 1; } - if (lowest - 1 > _highestAckedThrough) - _highestAckedThrough = lowest - 1; } List acked = null; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index 2e845e8f1..6a319fbdb 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -100,10 +100,12 @@ public class ConnectionPacketHandler { (packet.getReceiveStreamId() <= 0) ) ) allowAck = false; - if (allowAck) + if (allowAck) { isNew = con.getInputStream().messageReceived(packet.getSequenceNum(), packet.getPayload()); - else - isNew = con.getInputStream().messageReceived(con.getInputStream().getHighestReadyBockId(), null); + } else { + con.getInputStream().notifyActivity(); + isNew = false; + } if ( (packet.getSequenceNum() == 0) && (packet.getPayloadSize() > 0) ) { if (_log.shouldLog(Log.DEBUG)) @@ -166,10 +168,16 @@ public class ConnectionPacketHandler { } con.eventOccurred(); if (fastAck) { - if (con.getLastSendTime() + 2000 < _context.clock().now()) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Fast ack for dup " + packet); - con.ackImmediately(); + if (!isNew) { + // if we're congested (fastAck) but this is also a new packet, + // we've already scheduled an ack above, so there is no need to schedule + // a fast ack (we can wait a few ms) + } else { + if (con.getLastSendTime() + 2000 < _context.clock().now()) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Fast ack for dup " + packet); + con.ackImmediately(); + } } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java index 1b608005d..c894f98e3 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java @@ -193,6 +193,8 @@ public class MessageInputStream extends InputStream { } } + public void notifyActivity() { synchronized (_dataLock) { _dataLock.notifyAll(); } } + /** * A new message has arrived - toss it on the appropriate queue (moving * previously pending messages to the ready queue if it fills the gap, etc). diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java index 06418beae..16e0c56c2 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java @@ -222,7 +222,7 @@ public class Packet { return (_payload == null ? 0 : _payload.getValid()); } public void releasePayload() { - _payload = null; + //_payload = null; } public ByteArray acquirePayload() { ByteArray old = _payload; diff --git a/history.txt b/history.txt index 6fad8e1fb..ed5f3de10 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,10 @@ -$Id: history.txt,v 1.273 2005/09/30 12:51:32 ragnarok Exp $ +$Id: history.txt,v 1.274 2005/09/30 15:29:19 jrandom Exp $ + +2005-09-30 jrandom + * Killed three more streaming lib bugs, one of which caused excess packets + to be transmitted (dupacking dupacks), one that was the root of many of + the old hung streams (shrinking highest received), and another that was + releasing data too soon. 2005-09-30 jrandom * Only allow autodetection of our IP address if we haven't received an diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 097a49946..c96469510 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.251 $ $Date: 2005/09/30 02:17:57 $"; + public final static String ID = "$Revision: 1.252 $ $Date: 2005/09/30 15:29:19 $"; public final static String VERSION = "0.6.1"; - public final static long BUILD = 2; + public final static long BUILD = 3; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID);