diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index 9435727c4..aa826f799 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -189,9 +189,9 @@ public class Connection { + _activeResends + "), waiting " + timeLeft); try { _outboundPackets.wait(Math.min(timeLeft,250l)); } catch (InterruptedException ie) { if (_log.shouldLog(Log.DEBUG)) _log.debug("InterruptedException while Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends +")"); return false;} } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends - + "), waiting indefinitely"); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends + // + "), waiting indefinitely"); try { _outboundPackets.wait(250); } catch (InterruptedException ie) {if (_log.shouldLog(Log.DEBUG)) _log.debug("InterruptedException while Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends + ")"); return false;} //10*1000 } } else { @@ -297,37 +297,48 @@ public class Connection { if ( (packet.getSequenceNum() == 0) && (!packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) ) { ackOnly = true; - if (_log.shouldLog(Log.DEBUG)) - _log.debug("No resend for " + packet); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("No resend for " + packet); } else { - int remaining = 0; + int windowSize; + int remaining; synchronized (_outboundPackets) { _outboundPackets.put(new Long(packet.getSequenceNum()), packet); - remaining = _options.getWindowSize() - _outboundPackets.size() ; + windowSize = _options.getWindowSize(); + remaining = windowSize - _outboundPackets.size() ; _outboundPackets.notifyAll(); } - if (remaining < 0) - remaining = 0; - if (packet.isFlagSet(Packet.FLAG_CLOSE) || (remaining < 2)) { + // the other end has no idea what our window size is, so + // help him out by requesting acks below the 1/3 point, + // if remaining < 3, and every 8 minimum. + if (packet.isFlagSet(Packet.FLAG_CLOSE) || + (remaining < (windowSize + 2) / 3) || + (remaining < 3) || + (packet.getSequenceNum() % 8 == 0)) { packet.setOptionalDelay(0); packet.setFlag(Packet.FLAG_DELAY_REQUESTED); if (_log.shouldLog(Log.DEBUG)) _log.debug("Requesting no ack delay for packet " + packet); } else { - int delay = _options.getRTO() / 2; + // This is somewhat of a waste of time, unless the RTT < 4000, + // since the other end limits it to getSendAckDelay() + // which is always 2000, but it's good for diagnostics to see what the other end thinks + // the RTT is. + int delay = _options.getRTT() / 2; packet.setOptionalDelay(delay); if (delay > 0) packet.setFlag(Packet.FLAG_DELAY_REQUESTED); if (_log.shouldLog(Log.DEBUG)) _log.debug("Requesting ack delay of " + delay + "ms for packet " + packet); } + // WHY always set? packet.setFlag(Packet.FLAG_DELAY_REQUESTED); long timeout = _options.getRTO(); if (timeout > MAX_RESEND_DELAY) timeout = MAX_RESEND_DELAY; if (_log.shouldLog(Log.DEBUG)) - _log.debug("Resend in " + timeout + " for " + packet, new Exception("Sent by")); + _log.debug("Resend in " + timeout + " for " + packet); // schedules itself ResendPacketEvent rpe = new ResendPacketEvent(packet, timeout); @@ -370,6 +381,10 @@ public class Connection { } *********/ + /** + * Process the acks and nacks received in a packet + * @return List of packets acked or null + */ List ackPackets(long ackThrough, long nacks[]) { if (ackThrough < _highestAckedThrough) { // dupack which won't tell us anything @@ -685,6 +700,14 @@ public class Connection { * @return the next time the scheduler will want to send a packet, or -1 if never. */ public long getNextSendTime() { return _nextSendTime; } + + /** + * If the next send time is currently >= 0 (i.e. not "never"), + * this may make the next time sooner but will not make it later. + * If the next send time is currently < 0 (i.e. "never"), + * this will set it to the time specified, but not later than + * options.getSendAckDelay() from now (2000 ms) + */ public void setNextSendTime(long when) { if (_nextSendTime >= 0) { if (when < _nextSendTime) @@ -699,12 +722,12 @@ public class Connection { _nextSendTime = max; } - if (_log.shouldLog(Log.DEBUG) && false) { - if (_nextSendTime <= 0) - _log.debug("set next send time to an unknown time", new Exception(toString())); - else - _log.debug("set next send time to " + (_nextSendTime-_context.clock().now()) + "ms from now", new Exception(toString())); - } + //if (_log.shouldLog(Log.DEBUG) && false) { + // if (_nextSendTime <= 0) + // _log.debug("set next send time to an unknown time", new Exception(toString())); + // else + // _log.debug("set next send time to " + (_nextSendTime-_context.clock().now()) + "ms from now", new Exception(toString())); + //} } /** how many packets have we sent and the other side has ACKed? @@ -742,7 +765,9 @@ public class Connection { public long getCongestionWindowEnd() { return _congestionWindowEnd; } public void setCongestionWindowEnd(long endMsg) { _congestionWindowEnd = endMsg; } + /** @return the highest outbound packet we have recieved an ack for */ public long getHighestAckedThrough() { return _highestAckedThrough; } + /** @deprecated unused */ public void setHighestAckedThrough(long msgNum) { _highestAckedThrough = msgNum; } public long getLastActivityOn() { @@ -835,8 +860,8 @@ public class Connection { } long howLong = _options.getInactivityTimeout(); howLong += _randomWait; // randomize it a bit, so both sides don't do it at once - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Resetting the inactivity timer to " + howLong, new Exception(toString())); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("Resetting the inactivity timer to " + howLong); // this will get rescheduled, and rescheduled, and rescheduled... _activityTimer.reschedule(howLong, false); // use the later of current and previous timeout } @@ -1087,6 +1112,8 @@ public class Connection { // we want to resend this packet, but there are already active // resends in the air and we dont want to make a bad situation // worse. wait another second + // BUG? seq# = 0, activeResends = 0, loop forever - why? + // also seen with seq# > 0. Is the _activeResends count reliable? if (_log.shouldLog(Log.INFO)) _log.info("Delaying resend of " + _packet + " as there are " + _activeResends + " active resends already in play"); @@ -1104,6 +1131,7 @@ public class Connection { _packet.setOptionalDelay(choke); if (choke > 0) _packet.setFlag(Packet.FLAG_DELAY_REQUESTED); + // this seems unnecessary to send the MSS again: _packet.setOptionalMaxSize(getOptions().getMaxMessageSize()); // bugfix release 0.7.8, we weren't dividing by 1000 _packet.setResendDelay(getOptions().getResendDelay() / 1000); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java index 3b55160d3..acec982f6 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java @@ -166,6 +166,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { packet.setReceiveStreamId(con.getReceiveStreamId()); con.getInputStream().updateAcks(packet); + // note that the optional delay is usually rewritten in Connection.sendPacket() int choke = con.getOptions().getChoke(); packet.setOptionalDelay(choke); if (choke > 0) @@ -197,12 +198,9 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { ( (size > 0) || (con.getUnackedPacketsSent() <= 0) || (packet.getSequenceNum() > 0) ) ) { packet.setFlag(Packet.FLAG_CLOSE); con.setCloseSentOn(_context.clock().now()); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Closed is set for a new packet on " + con + ": " + packet); - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Closed is not set for a new packet on " + _connection + ": " + packet); } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("New outbound packet on " + _connection + ": " + packet); return packet; } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java index bd32afbf7..d8f50fbcf 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -46,6 +46,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { public static final String PROP_INITIAL_RESEND_DELAY = "i2p.streaming.initialResendDelay"; public static final String PROP_INITIAL_ACK_DELAY = "i2p.streaming.initialAckDelay"; public static final String PROP_INITIAL_WINDOW_SIZE = "i2p.streaming.initialWindowSize"; + /** unused */ public static final String PROP_INITIAL_RECEIVE_WINDOW = "i2p.streaming.initialReceiveWindow"; public static final String PROP_INACTIVITY_TIMEOUT = "i2p.streaming.inactivityTimeout"; public static final String PROP_INACTIVITY_ACTION = "i2p.streaming.inactivityAction"; @@ -58,6 +59,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { static final int INITIAL_WINDOW_SIZE = 6; static final int DEFAULT_MAX_SENDS = 8; public static final int DEFAULT_INITIAL_RTT = 8*1000; + public static final int DEFAULT_INITIAL_ACK_DELAY = 2*1000; static final int MIN_WINDOW_SIZE = 1; private static final boolean DEFAULT_ANSWER_PINGS = true; @@ -217,7 +219,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { setRTT(getInt(opts, PROP_INITIAL_RTT, DEFAULT_INITIAL_RTT)); setReceiveWindow(getInt(opts, PROP_INITIAL_RECEIVE_WINDOW, 1)); setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 1000)); - setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, 2000)); + setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, DEFAULT_INITIAL_ACK_DELAY)); setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, INITIAL_WINDOW_SIZE)); setMaxResends(getInt(opts, PROP_MAX_RESENDS, DEFAULT_MAX_SENDS)); setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1)); @@ -249,7 +251,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { if (opts.containsKey(PROP_INITIAL_RESEND_DELAY)) setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 1000)); if (opts.containsKey(PROP_INITIAL_ACK_DELAY)) - setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, 2000)); + setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, DEFAULT_INITIAL_ACK_DELAY)); if (opts.containsKey(PROP_INITIAL_WINDOW_SIZE)) setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, INITIAL_WINDOW_SIZE)); if (opts.containsKey(PROP_MAX_RESENDS)) @@ -295,6 +297,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { * @return if we want signatures on all packets. */ public boolean getRequireFullySigned() { return _fullySigned; } + /** unused, see above */ public void setRequireFullySigned(boolean sign) { _fullySigned = sign; } /** @@ -325,7 +328,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { } /** after how many consecutive messages should we ack? - * This doesn't appear to be used. + * @deprecated This doesn't appear to be used. * @return receive window size. */ public int getReceiveWindow() { return _receiveWindow; } @@ -405,6 +408,10 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { * @return ACK delay in ms */ public int getSendAckDelay() { return _sendAckDelay; } + /** + * Unused except here, so expect the default initial delay of 2000 ms unless set by the user + * to remain constant. + */ public void setSendAckDelay(int delayMs) { _sendAckDelay = delayMs; } /** What is the largest message we want to send or receive? diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index 91a06e088..ba705b2d5 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -131,11 +131,14 @@ public class ConnectionPacketHandler { isNew = false; } - if ( (packet.getSequenceNum() == 0) && (packet.getPayloadSize() > 0) ) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("seq=0 && size=" + packet.getPayloadSize() + ": isNew? " + isNew - + " packet: " + packet + " con: " + con); - } + //if ( (packet.getSequenceNum() == 0) && (packet.getPayloadSize() > 0) ) { + // if (_log.shouldLog(Log.DEBUG)) + // _log.debug("seq=0 && size=" + packet.getPayloadSize() + ": isNew? " + isNew + // + " packet: " + packet + " con: " + con); + //} + + if (_log.shouldLog(Log.DEBUG)) + _log.debug((isNew ? "New" : "Dup or ack-only") + " inbound packet on " + con + ": " + packet); // close *after* receiving the data, as well as after verifying the signatures / etc if (packet.isFlagSet(Packet.FLAG_CLOSE) && packet.isFlagSet(Packet.FLAG_SIGNATURE_INCLUDED)) @@ -151,7 +154,9 @@ public class ConnectionPacketHandler { if (packet.isFlagSet(Packet.FLAG_DELAY_REQUESTED) && (packet.getOptionalDelay() <= 0) ) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Scheduling immediate ack for " + packet); - con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay()); + //con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay()); + // honor request "almost" immediately + con.setNextSendTime(_context.clock().now() + 250); } else { int delay = con.getOptions().getSendAckDelay(); if (packet.isFlagSet(Packet.FLAG_DELAY_REQUESTED)) // delayed ACK requested @@ -222,6 +227,10 @@ public class ConnectionPacketHandler { // con.fastRetransmit(); } + /** + * Process the acks in a received packet, and adjust our window and RTT + * @return are we congested? + */ private boolean ack(Connection con, long ackThrough, long nacks[], Packet packet, boolean isNew, boolean choke) { if (ackThrough < 0) return false; //if ( (nacks != null) && (nacks.length > 0) ) @@ -287,7 +296,7 @@ public class ConnectionPacketHandler { return adjustWindow(con, isNew, packet.getSequenceNum(), numResends, (acked != null ? acked.size() : 0), choke); } - + /** @return are we congested? */ private boolean adjustWindow(Connection con, boolean isNew, long sequenceNum, int numResends, int acked, boolean choke) { boolean congested = false; if ( (!isNew) && (sequenceNum > 0) ) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java index 85981d9e6..14ebb61d5 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java @@ -42,7 +42,7 @@ import net.i2p.util.Log; *
If the signature is included, it uses the Destination's DSA key