diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java index 27a6777560..d01d7a1da9 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java @@ -69,6 +69,11 @@ class Connection { private int _lastCongestionSeenAt; private long _lastCongestionTime; private volatile long _lastCongestionHighestUnacked; + /** has the other side choked us? */ + private volatile boolean _isChoked; + /** are we choking the other side? */ + private volatile boolean _isChoking; + private final AtomicInteger _unchokesToSend = new AtomicInteger(); private final AtomicBoolean _ackSinceCongestion; /** Notify this on connection (or connection failure) */ private final Object _connectLock; @@ -102,6 +107,7 @@ class Connection { private static final long MAX_CONNECT_TIMEOUT = 2*60*1000; public static final int MAX_WINDOW_SIZE = 128; + private static final int UNCHOKES_TO_SEND = 8; /**** public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, @@ -187,7 +193,7 @@ class Connection { long timeLeft = writeExpire - _context.clock().now(); synchronized (_outboundPackets) { if (!started) - _context.statManager().addRateData("stream.chokeSizeBegin", _outboundPackets.size(), timeoutMs); + _context.statManager().addRateData("stream.chokeSizeBegin", _outboundPackets.size()); if (start + 5*60*1000 < _context.clock().now()) // ok, 5 minutes blocking? I dont think so return false; @@ -205,20 +211,20 @@ class Connection { // Limit (highest-lowest) to twice the window (if far end doesn't like it, it can send a choke) int unacked = _outboundPackets.size(); int wsz = _options.getWindowSize(); - if (unacked >= wsz || + if (_isChoked || unacked >= wsz || _activeResends.get() >= (wsz + 1) / 2 || _lastSendId.get() - _highestAckedThrough >= Math.max(MAX_WINDOW_SIZE, 2 * wsz)) { if (timeoutMs > 0) { if (timeLeft <= 0) { if (_log.shouldLog(Log.INFO)) - _log.info("Outbound window is full " + unacked + _log.info("Outbound window is full (choked? " + _isChoked + ' ' + unacked + " unacked with " + _activeResends + " active resends" + " and we've waited too long (" + (0-(timeLeft - timeoutMs)) + "ms): " + toString()); return false; } if (_log.shouldLog(Log.DEBUG)) - _log.debug("Outbound window is full (" + unacked + "/" + wsz + "/" + _log.debug("Outbound window is full (choked? " + _isChoked + ' ' + unacked + '/' + wsz + '/' + _activeResends + "), waiting " + timeLeft); try { _outboundPackets.wait(Math.min(timeLeft,250l)); @@ -240,7 +246,7 @@ class Connection { } //10*1000 } } else { - _context.statManager().addRateData("stream.chokeSizeEnd", _outboundPackets.size(), _context.clock().now() - start); + _context.statManager().addRateData("stream.chokeSizeEnd", _outboundPackets.size()); return true; } } @@ -257,7 +263,7 @@ class Connection { } void ackImmediately() { - PacketLocal packet = null; + PacketLocal packet; /*** why would we do this? was it to force a congestion indication at the other end? an expensive way to do that... @@ -343,6 +349,12 @@ class Connection { } } + /** + * This sends all 'normal' packets (acks and data) for the first time. + * Retransmits are done in ResendPacketEvent below. + * Resets, pings, and pongs are done elsewhere in this class, + * or in ConnectionManager or ConnectionHandler. + */ void sendPacket(PacketLocal packet) { if (packet == null) return; @@ -353,8 +365,15 @@ class Connection { } if ( (packet.getSequenceNum() == 0) && (!packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) ) { - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("No resend for " + packet); + // ACK-only + if (_isChoking) { + packet.setOptionalDelay(Packet.SEND_DELAY_CHOKE); + packet.setFlag(Packet.FLAG_DELAY_REQUESTED); + } else if (_unchokesToSend.decrementAndGet() > 0) { + // don't worry about wrapping around + packet.setOptionalDelay(0); + packet.setFlag(Packet.FLAG_DELAY_REQUESTED); + } } else { int windowSize; int remaining; @@ -364,13 +383,18 @@ class Connection { remaining = windowSize - _outboundPackets.size() ; _outboundPackets.notifyAll(); } - // the other end has no idea what our window size is, so - // help him out by requesting acks below the 1/3 point, - // if remaining < 3, and every 8 minimum. - if (packet.isFlagSet(Packet.FLAG_CLOSE) || - (remaining < (windowSize + 2) / 3) || + + if (_isChoking) { + packet.setOptionalDelay(Packet.SEND_DELAY_CHOKE); + packet.setFlag(Packet.FLAG_DELAY_REQUESTED); + } else if (packet.isFlagSet(Packet.FLAG_CLOSE) || + _unchokesToSend.decrementAndGet() > 0 || + // the other end has no idea what our window size is, so + // help him out by requesting acks below the 1/3 point, + // if remaining < 3, and every 8 minimum. (remaining < 3) || - (packet.getSequenceNum() % 8 == 0)) { + (remaining < (windowSize + 2) / 3) /* || + (packet.getSequenceNum() % 8 == 0) */ ) { packet.setOptionalDelay(0); packet.setFlag(Packet.FLAG_DELAY_REQUESTED); //if (_log.shouldLog(Log.DEBUG)) @@ -380,15 +404,15 @@ class Connection { // since the other end limits it to getSendAckDelay() // which is always 2000, but it's good for diagnostics to see what the other end thinks // the RTT is. +/** int delay = _options.getRTT() / 2; packet.setOptionalDelay(delay); if (delay > 0) packet.setFlag(Packet.FLAG_DELAY_REQUESTED); if (_log.shouldLog(Log.DEBUG)) _log.debug("Requesting ack delay of " + delay + "ms for packet " + packet); +**/ } - // WHY always set? - //packet.setFlag(Packet.FLAG_DELAY_REQUESTED); long timeout = _options.getRTO(); if (timeout > MAX_RESEND_DELAY) @@ -984,6 +1008,55 @@ class Connection { } } + /** + * Set or clear if we are choking the other side. + * If on is true or the value has changed, this will call ackImmediately(). + * @param on true for choking + * @since 0.9.29 + */ + public void setChoking(boolean on) { + if (on != _isChoking) { + _isChoking = on; + if (!on) + _unchokesToSend.set(UNCHOKES_TO_SEND); + ackImmediately(); + } else if (on) { + ackImmediately(); + } + } + + /** + * Set or clear if we are being choked by the other side. + * @param on true for choked + * @since 0.9.29 + */ + public void setChoked(boolean on) { + _isChoked = on; + if (on) { + congestionOccurred(); + // https://en.wikipedia.org/wiki/Transmission_Control_Protocol + // When a receiver advertises a window size of 0, the sender stops sending data and starts the persist timer. + // The persist timer is used to protect TCP from a deadlock situation that could arise + // if a subsequent window size update from the receiver is lost, + // and the sender cannot send more data until receiving a new window size update from the receiver. + // When the persist timer expires, the TCP sender attempts recovery by sending a small packet + // so that the receiver responds by sending another acknowledgement containing the new window size. + // ... + // We don't do any of that, but we set the window size to 1, and let the retransmission + // of packets do the "attempted recovery". + getOptions().setWindowSize(1); + } + } + + /** + * Is the other side choking us? + * @return if choked + * @since 0.9.29 + */ + public boolean isChoked() { + return _isChoked; + } + /** how many packets have we sent and the other side has ACKed? * @return Count of how many packets ACKed. */ @@ -1381,10 +1454,18 @@ class Connection { // revamp various fields, in case we need to ack more, etc // updateAcks done in enqueue() //_inputStream.updateAcks(_packet); - int choke = getOptions().getChoke(); - _packet.setOptionalDelay(choke); - if (choke > 0) + if (_isChoking) { + _packet.setOptionalDelay(Packet.SEND_DELAY_CHOKE); _packet.setFlag(Packet.FLAG_DELAY_REQUESTED); + } else if (_unchokesToSend.decrementAndGet() > 0) { + // don't worry about wrapping around + _packet.setOptionalDelay(0); + _packet.setFlag(Packet.FLAG_DELAY_REQUESTED); + } else { + // clear flag + _packet.setFlag(Packet.FLAG_DELAY_REQUESTED, false); + } + // this seems unnecessary to send the MSS again: //_packet.setOptionalMaxSize(getOptions().getMaxMessageSize()); // bugfix release 0.7.8, we weren't dividing by 1000 @@ -1396,7 +1477,10 @@ class Connection { int newWindowSize = getOptions().getWindowSize(); - if (_ackSinceCongestion.get()) { + if (_isChoked) { + congestionOccurred(); + getOptions().setWindowSize(1); + } else if (_ackSinceCongestion.get()) { // only shrink the window once per window if (_packet.getSequenceNum() > _lastCongestionHighestUnacked) { congestionOccurred(); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionDataReceiver.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionDataReceiver.java index c8098024c4..4450bb8c1f 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionDataReceiver.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionDataReceiver.java @@ -156,6 +156,11 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { } /** + * Compose a packet. + * Most flags are set here; however, some are set in Connection.sendPacket() + * and Connection.ResendPacketEvent.retransmit(). + * Take care not to set the same options both here and in Connection. + * * @param buf data to be sent - may be null * @param off offset into the buffer to start writing from * @param size how many bytes of the buffer to write (may be 0) @@ -164,12 +169,11 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { * @return the packet to be sent */ private PacketLocal buildPacket(byte buf[], int off, int size, boolean forceIncrement) { - Connection con = _connection; if (size > Packet.MAX_PAYLOAD_SIZE) throw new IllegalArgumentException("size is too large (" + size + ")"); - boolean ackOnly = isAckOnly(con, size); - boolean isFirst = (con.getAckedPackets() <= 0) && (con.getUnackedPacketsSent() <= 0); + boolean ackOnly = isAckOnly(_connection, size); + boolean isFirst = (_connection.getAckedPackets() <= 0) && (_connection.getUnackedPacketsSent() <= 0); - PacketLocal packet = new PacketLocal(_context, con.getRemotePeer(), con); + PacketLocal packet = new PacketLocal(_context, _connection.getRemotePeer(), _connection); //ByteArray data = packet.acquirePayload(); ByteArray data = new ByteArray(new byte[size]); if (size > 0) @@ -180,36 +184,32 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { if ( (ackOnly && !forceIncrement) && (!isFirst) ) packet.setSequenceNum(0); else - packet.setSequenceNum(con.getNextOutboundPacketNum()); - packet.setSendStreamId(con.getSendStreamId()); - packet.setReceiveStreamId(con.getReceiveStreamId()); + packet.setSequenceNum(_connection.getNextOutboundPacketNum()); + packet.setSendStreamId(_connection.getSendStreamId()); + packet.setReceiveStreamId(_connection.getReceiveStreamId()); // not needed here, handled in PacketQueue.enqueue() //con.getInputStream().updateAcks(packet); - // note that the optional delay is usually rewritten in Connection.sendPacket() - int choke = con.getOptions().getChoke(); - packet.setOptionalDelay(choke); - if (choke > 0) - packet.setFlag(Packet.FLAG_DELAY_REQUESTED); + + // Do not set optional delay here, set in Connection.sendPacket() + // bugfix release 0.7.8, we weren't dividing by 1000 - packet.setResendDelay(con.getOptions().getResendDelay() / 1000); + packet.setResendDelay(_connection.getOptions().getResendDelay() / 1000); - if (con.getOptions().getProfile() == ConnectionOptions.PROFILE_INTERACTIVE) + if (_connection.getOptions().getProfile() == ConnectionOptions.PROFILE_INTERACTIVE) packet.setFlag(Packet.FLAG_PROFILE_INTERACTIVE, true); else packet.setFlag(Packet.FLAG_PROFILE_INTERACTIVE, false); - packet.setFlag(Packet.FLAG_SIGNATURE_REQUESTED, con.getOptions().getRequireFullySigned()); - //if ( (!ackOnly) && (packet.getSequenceNum() <= 0) ) { if (isFirst) { packet.setFlag(Packet.FLAG_SYNCHRONIZE); packet.setOptionalFrom(); - packet.setOptionalMaxSize(con.getOptions().getMaxMessageSize()); + packet.setOptionalMaxSize(_connection.getOptions().getMaxMessageSize()); } - packet.setLocalPort(con.getLocalPort()); - packet.setRemotePort(con.getPort()); - if (con.getSendStreamId() == Packet.STREAM_ID_UNKNOWN) { + packet.setLocalPort(_connection.getLocalPort()); + packet.setRemotePort(_connection.getPort()); + if (_connection.getSendStreamId() == Packet.STREAM_ID_UNKNOWN) { packet.setFlag(Packet.FLAG_NO_ACK); } @@ -221,10 +221,10 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { // FIXME Implement better half-close by sending CLOSE whenever. Needs 0.9.9 bug fixes // throughout network? // - if (con.getOutputStream().getClosed() && - ( (size > 0) || (con.getUnackedPacketsSent() <= 0) || (packet.getSequenceNum() > 0) ) ) { + if (_connection.getOutputStream().getClosed() && + ( (size > 0) || (_connection.getUnackedPacketsSent() <= 0) || (packet.getSequenceNum() > 0) ) ) { packet.setFlag(Packet.FLAG_CLOSE); - con.notifyCloseSent(); + _connection.notifyCloseSent(); } if (_log.shouldLog(Log.DEBUG)) _log.debug("New OB pkt (acks not yet filled in): " + packet + " on " + _connection); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java index b132eab536..f98062b662 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java @@ -105,13 +105,13 @@ class ConnectionManager { _context.statManager().createRateStat("stream.con.lifetimeSendWindowSize", "What is the final send window size when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.receiveActive", "How many streams are active when a new one is received (period being not yet dropped)", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); // Stats for Connection - _context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); - _context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); - _context.statManager().createRateStat("stream.chokeSizeEnd", "How many messages were outstanding when we stopped being choked?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); - _context.statManager().createRateStat("stream.fastRetransmit", "How long a packet has been around for if it has been resent per the fast retransmit timer?", "Stream", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*60*1000 }); + _context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*60*1000 }); + _context.statManager().createRateStat("stream.chokeSizeEnd", "How many messages were outstanding when we stopped being choked?", "Stream", new long[] { 60*60*1000 }); + _context.statManager().createRateStat("stream.fastRetransmit", "How long a packet has been around for if it has been resent per the fast retransmit timer?", "Stream", new long[] { 10*60*1000 }); // Stats for PacketQueue - _context.statManager().createRateStat("stream.con.sendMessageSize", "Size of a message sent on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); - _context.statManager().createRateStat("stream.con.sendDuplicateSize", "Size of a message resent on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("stream.con.sendMessageSize", "Size of a message sent on a connection", "Stream", new long[] { 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("stream.con.sendDuplicateSize", "Size of a message resent on a connection", "Stream", new long[] { 10*60*1000, 60*60*1000 }); } Connection getConnectionByInboundId(long id) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java index cd268c1f34..6979f3abe8 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java @@ -33,7 +33,6 @@ class ConnectionOptions extends I2PSocketOptionsImpl { private int _resendDelay; private int _sendAckDelay; private int _maxMessageSize; - private int _choke; private int _maxResends; private int _inactivityTimeout; private int _inactivityAction; @@ -327,7 +326,6 @@ class ConnectionOptions extends I2PSocketOptionsImpl { setWindowSize(opts.getWindowSize()); setResendDelay(opts.getResendDelay()); setMaxMessageSize(opts.getMaxMessageSize()); - setChoke(opts.getChoke()); setMaxResends(opts.getMaxResends()); setInactivityTimeout(opts.getInactivityTimeout()); setInactivityAction(opts.getInactivityAction()); @@ -677,15 +675,6 @@ class ConnectionOptions extends I2PSocketOptionsImpl { public int getMaxMessageSize() { return _maxMessageSize; } public void setMaxMessageSize(int bytes) { _maxMessageSize = Math.max(bytes, MIN_MESSAGE_SIZE); } - /** - * how long we want to wait before any data is transferred on the - * connection in either direction - * - * @return how long to wait before any data is transferred in either direction in ms - */ - public int getChoke() { return _choke; } - public void setChoke(int ms) { _choke = ms; } - /** * What profile do we want to use for this connection? * TODO: Only bulk is supported so far. diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java index bb5dd63f4b..7f37a0682a 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java @@ -96,23 +96,27 @@ class ConnectionPacketHandler { boolean choke = false; if (packet.isFlagSet(Packet.FLAG_DELAY_REQUESTED)) { - if (packet.getOptionalDelay() > 60000) { + if (packet.getOptionalDelay() >= Packet.MIN_DELAY_CHOKE) { // requested choke choke = true; + if (_log.shouldWarn()) + _log.warn("Got a choke on connection " + con + ": " + packet); //con.getOptions().setRTT(con.getOptions().getRTT() + 10*1000); } + // Only call this if the flag is set + con.setChoked(choke); } if (!con.getInputStream().canAccept(seqNum, packet.getPayloadSize())) { if (_log.shouldWarn()) _log.warn("Inbound buffer exceeded on connection " + con + - ", dropping " + packet); - con.getOptions().setChoke(61*1000); + ", choking and dropping " + packet); + // this will call ackImmediately() + con.setChoking(true); + // TODO we could still process the acks for this packet before discarding packet.releasePayload(); - con.ackImmediately(); return; - } - con.getOptions().setChoke(0); + } // else we will call setChoking(false) below _context.statManager().addRateData("stream.con.receiveMessageSize", packet.getPayloadSize()); @@ -132,12 +136,20 @@ class ConnectionPacketHandler { // MessageInputStream will know the last sequence number. // But not ack-only packets! boolean isNew; - if (seqNum > 0 || isSYN) - isNew = con.getInputStream().messageReceived(seqNum, packet.getPayload()); - else - isNew = false; - if (!allowAck) + if (seqNum > 0 || isSYN) { + isNew = con.getInputStream().messageReceived(seqNum, packet.getPayload()) && + !allowAck; + } else { isNew = false; + } + + if (isNew && packet.getPayloadSize() > 1500) { + // don't clear choking unless it was new, and a big packet + // this will call ackImmediately() if changed + // TODO if this filled in a hole, we shouldn't unchoke + // TODO a bunch of small packets should unchoke also + con.setChoking(false); + } //if ( (packet.getSequenceNum() == 0) && (packet.getPayloadSize() > 0) ) { // if (_log.shouldLog(Log.DEBUG)) @@ -158,7 +170,6 @@ class ConnectionPacketHandler { _log.debug(type + " IB pkt: " + packet + " on " + con); } - boolean fastAck = false; boolean ackOnly = false; if (isNew) { @@ -170,6 +181,9 @@ class ConnectionPacketHandler { _log.debug("Scheduling immediate ack for " + packet); //con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay()); // honor request "almost" immediately + // TODO the 250 below _may_ be a big limiter in how fast local "loopback" connections + // can go, however if it goes too fast then we start choking which causes + // frequent stalls anyway. con.setNextSendTime(_context.clock().now() + 250); } else { int delay = con.getOptions().getSendAckDelay(); @@ -222,14 +236,16 @@ class ConnectionPacketHandler { } } + boolean fastAck; if (isSYN && (packet.getSendStreamId() <= 0) ) { // don't honor the ACK 0 in SYN packets received when the other side // has obviously not seen our messages + fastAck = false; } else { fastAck = ack(con, packet.getAckThrough(), packet.getNacks(), packet, isNew, choke); } con.eventOccurred(); - if (fastAck) { + if (fastAck && !choke) { if (!isNew) { // if we're congested (fastAck) but this is also a new packet, // we've already scheduled an ack above, so there is no need to schedule @@ -266,6 +282,8 @@ class ConnectionPacketHandler { /** * Process the acks in a received packet, and adjust our window and RTT + * @param isNew was it a new packet? false for ack-only + * @param choke did we get a choke in the packet? * @return are we congested? */ private boolean ack(Connection con, long ackThrough, long nacks[], Packet packet, boolean isNew, boolean choke) { @@ -354,16 +372,25 @@ class ConnectionPacketHandler { return rv; } - /** @return are we congested? */ + /** + * This either does nothing or increases the window, it never decreases it. + * Decreasing is done in Connection.ResendPacketEvent.retransmit() + * + * @param isNew was it a new packet? false for ack-only + * @param sequenceNum 0 for ack-only + * @param choke did we get a choke in the packet? + * @return are we congested? + */ private boolean adjustWindow(Connection con, boolean isNew, long sequenceNum, int numResends, int acked, boolean choke) { - boolean congested = false; - if ( (!isNew) && (sequenceNum > 0) ) { + boolean congested; + if (choke || (!isNew && sequenceNum > 0) || con.isChoked()) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Congestion occurred on the sending side. Not adjusting window "+con); - congested = true; - } - + } else { + congested = false; + } + long lowest = con.getHighestAckedThrough(); // RFC 2581 // Why wait until we get a whole cwin to start updating the window? diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageHandler.java index 477f7476fd..234585c04f 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageHandler.java @@ -14,7 +14,7 @@ import net.i2p.util.Log; /** * Receive raw information from the I2PSession and turn it into * Packets, if we can. - *<p> + *

* I2PSession -> MessageHandler -> PacketHandler -> ConnectionPacketHandler -> MessageInputStream */ class MessageHandler implements I2PSessionMuxedListener { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java index adf401a343..858f59157e 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java @@ -16,9 +16,9 @@ import net.i2p.util.Log; /** * Stream that can be given messages out of order * yet present them in order. - *<p> + *

* I2PSession -> MessageHandler -> PacketHandler -> ConnectionPacketHandler -> MessageInputStream - *<p> + *

* This buffers unlimited data via messageReceived() - * limiting / blocking is done in ConnectionPacketHandler.receivePacket(). * @@ -102,6 +102,7 @@ class MessageInputStream extends InputStream { /** * Determine if this packet will fit in our buffering limits. + * Always returns true for zero payloadSize. * * @return true if we have room. If false, do not call messageReceived() * @since 0.9.20 moved from ConnectionPacketHandler.receivePacket() so it can all be under one lock, diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageOutputStream.java index b5f1113d9e..72b2e02511 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageOutputStream.java @@ -16,7 +16,7 @@ import net.i2p.util.SimpleTimer2; * A stream that we can shove data into that fires off those bytes * on flush or when the buffer is full. It also blocks according * to the data receiver's needs. - *<p> + *

* MessageOutputStream -> ConnectionDataReceiver -> Connection -> PacketQueue -> I2PSession */ class MessageOutputStream extends OutputStream { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/Packet.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/Packet.java index f9e8cdecf8..72751f0c33 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/Packet.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/Packet.java @@ -164,6 +164,8 @@ class Packet { public static final int DEFAULT_MAX_SIZE = 32*1024; protected static final int MAX_DELAY_REQUEST = 65535; + public static final int MIN_DELAY_CHOKE = 60001; + public static final int SEND_DELAY_CHOKE = 61000; /** * Does no initialization. diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketHandler.java index 7ca70cd278..39350affc9 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketHandler.java @@ -11,7 +11,7 @@ import net.i2p.util.Log; /** * receive a packet and dispatch it correctly to the connection specified, * the server socket, or queue a reply RST packet. - *<p> + *

* I2PSession -> MessageHandler -> PacketHandler -> ConnectionPacketHandler -> MessageInputStream */ class PacketHandler { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java index 2b26b0aada..80b4d29204 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java @@ -111,18 +111,6 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { FLAG_ECHO); } - /** last minute update of ack fields, just before write/sign */ - public void prepare() { - if (_connection != null) - _connection.getInputStream().updateAcks(this); - int numSends = _numSends.get(); - if (numSends > 0) { - // so we can debug to differentiate resends - setOptionalDelay(numSends * 1000); - setFlag(FLAG_DELAY_REQUESTED); - } - } - public long getCreatedOn() { return _createdOn; } public long getLifetime() { return _context.clock().now() - _createdOn; } public void incrementSends() { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java index aaa91e685a..5d807ddc97 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java @@ -23,7 +23,7 @@ import net.i2p.util.SimpleTimer2; * Well, thats the theory at least... in practice we just * send them immediately with no blocking, since the * mode=bestEffort doesnt block in the SDK. - *<p> + *

* MessageOutputStream -> ConnectionDataReceiver -> Connection -> PacketQueue -> I2PSession */ class PacketQueue implements SendMessageStatusListener, Closeable { @@ -64,7 +64,8 @@ class PacketQueue implements SendMessageStatusListener, Closeable { } /** - * Add a new packet to be sent out ASAP + * Add a new packet to be sent out ASAP. + * This updates the acks. * * keys and tags disabled since dropped in I2PSession * @return true if sent @@ -72,8 +73,6 @@ class PacketQueue implements SendMessageStatusListener, Closeable { public boolean enqueue(PacketLocal packet) { if (_dead) return false; - // this updates the ack/nack field - packet.prepare(); //SessionKey keyUsed = packet.getKeyUsed(); //if (keyUsed == null) @@ -87,6 +86,12 @@ class PacketQueue implements SendMessageStatusListener, Closeable { _log.debug("Not resending " + packet); return false; } + + Connection con = packet.getConnection(); + if (con != null) { + // this updates the ack/nack fields + con.getInputStream().updateAcks(packet); + } ByteArray ba = _cache.acquire(); byte buf[] = ba.getData(); @@ -96,14 +101,14 @@ class PacketQueue implements SendMessageStatusListener, Closeable { boolean sent = false; try { int size = 0; - long beforeWrite = System.currentTimeMillis(); + //long beforeWrite = System.currentTimeMillis(); if (packet.shouldSign()) size = packet.writeSignedPacket(buf, 0); else size = packet.writePacket(buf, 0); - long writeTime = System.currentTimeMillis() - beforeWrite; - if ( (writeTime > 1000) && (_log.shouldLog(Log.WARN)) ) - _log.warn("took " + writeTime + "ms to write the packet: " + packet); + //long writeTime = System.currentTimeMillis() - beforeWrite; + //if ( (writeTime > 1000) && (_log.shouldLog(Log.WARN)) ) + // _log.warn("took " + writeTime + "ms to write the packet: " + packet); // last chance to short circuit... if (packet.getAckTime() > 0) return false; @@ -121,7 +126,6 @@ class PacketQueue implements SendMessageStatusListener, Closeable { options.setDate(expires); boolean listenForStatus = false; if (packet.isFlagSet(FLAGS_INITIAL_TAGS)) { - Connection con = packet.getConnection(); if (con != null) { if (con.isInbound()) options.setSendLeaseSet(false); @@ -141,7 +145,6 @@ class PacketQueue implements SendMessageStatusListener, Closeable { options.setTagsToSend(FINAL_TAGS_TO_SEND); options.setTagThreshold(FINAL_TAG_THRESHOLD); } else { - Connection con = packet.getConnection(); if (con != null) { if (con.isInbound() && con.getLifetime() < 2*60*1000) options.setSendLeaseSet(false); @@ -157,7 +160,7 @@ class PacketQueue implements SendMessageStatusListener, Closeable { long id = session.sendMessage(packet.getTo(), buf, 0, size, I2PSession.PROTO_STREAMING, packet.getLocalPort(), packet.getRemotePort(), options, this); - _messageStatusMap.put(Long.valueOf(id), packet.getConnection()); + _messageStatusMap.put(Long.valueOf(id), con); sent = true; } else { sent = session.sendMessage(packet.getTo(), buf, 0, size, @@ -173,7 +176,6 @@ class PacketQueue implements SendMessageStatusListener, Closeable { if (packet.getNumSends() > 1) _context.statManager().addRateData("stream.con.sendDuplicateSize", size, packet.getLifetime()); - Connection con = packet.getConnection(); if (con != null) { con.incrementBytesSent(size); if (packet.getNumSends() > 1) @@ -189,17 +191,15 @@ class PacketQueue implements SendMessageStatusListener, Closeable { if (!sent) { if (_log.shouldLog(Log.WARN)) _log.warn("Send failed for " + packet); - Connection c = packet.getConnection(); - if (c != null) // handle race on b0rk - c.disconnect(false); + if (con != null) // handle race on b0rk + con.disconnect(false); } else { //packet.setKeyUsed(keyUsed); //packet.setTagsSent(tagsSent); packet.incrementSends(); - Connection c = packet.getConnection(); - if (c != null && _log.shouldDebug()) { - String suffix = "wsize " + c.getOptions().getWindowSize() + " rto " + c.getOptions().getRTO(); - c.getConnectionManager().getPacketHandler().displayPacket(packet, "SEND", suffix); + if (con != null && _log.shouldDebug()) { + String suffix = "wsize " + con.getOptions().getWindowSize() + " rto " + con.getOptions().getRTO(); + con.getConnectionManager().getPacketHandler().displayPacket(packet, "SEND", suffix); } if (I2PSocketManagerFull.pcapWriter != null && _context.getBooleanProperty(I2PSocketManagerFull.PROP_PCAP)) diff --git a/history.txt b/history.txt index fcb36a48ec..b547287ae4 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,21 @@ +2017-02-09 zzz + * Streaming: Fix optional delay and choking (tickets #1046, 1939) + +2017-02-08 zzz + * I2CP: Return local delivery failure on queue overflow (ticket #1939) + +2017-02-05 zzz + * Console: Consolidate timer threads (ticket #1068) + * NTCP: Don't write to an inbound connection before + fully established, causing NPE (ticket #996) + * Streaming: + - Don't always send optional delay (ticket #1046) + - Don't hard fail on expired message error (ticket #1748) + 2017-02-04 zzz + * HTTP proxies: + - Pass through relative referer URIs, convert same-origin + absolute referer URIs to relative (ticket #1862) * NTP: Enable IPv6 support (ticket #1896) 2017-01-30 zzz diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 4e28a2c8d2..c10128fe62 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 5; + public final static long BUILD = 6; /** for example "-test" */ public final static String EXTRA = "";