diff --git a/router/java/src/net/i2p/router/OutNetMessagePool.java b/router/java/src/net/i2p/router/OutNetMessagePool.java index 6b5ddfaab6..6558a9b3ec 100644 --- a/router/java/src/net/i2p/router/OutNetMessagePool.java +++ b/router/java/src/net/i2p/router/OutNetMessagePool.java @@ -8,8 +8,6 @@ package net.i2p.router; * */ -import java.util.Comparator; - import net.i2p.util.Log; /** diff --git a/router/java/src/net/i2p/router/transport/udp/ACKSender.java b/router/java/src/net/i2p/router/transport/udp/ACKSender.java index 33fd401df1..7136f8a9de 100644 --- a/router/java/src/net/i2p/router/transport/udp/ACKSender.java +++ b/router/java/src/net/i2p/router/transport/udp/ACKSender.java @@ -14,13 +14,14 @@ import net.i2p.util.Log; /** * Blocking thread that is given peers by the inboundFragment pool, sending out * any outstanding ACKs. - * + * The ACKs are sent directly to UDPSender, + * bypassing OutboundMessageFragments and PacketPusher. */ class ACKSender implements Runnable { - private RouterContext _context; - private Log _log; - private UDPTransport _transport; - private PacketBuilder _builder; + private final RouterContext _context; + private final Log _log; + private final UDPTransport _transport; + private final PacketBuilder _builder; /** list of peers (PeerState) who we have received data from but not yet ACKed to */ private final BlockingQueue _peersToACK; private boolean _alive; diff --git a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java index 24e0efe877..5d5021b597 100644 --- a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java +++ b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java @@ -72,6 +72,19 @@ class EstablishmentManager { _context.statManager().createRateStat("udp.receiveIntroRelayResponse", "How long it took to receive a relay response", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.establishRejected", "How many pending outbound connections are there when we refuse to add any more?", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.establishOverflow", "How many messages were queued up on a pending connection when it was too much?", "udp", UDPTransport.RATES); + // following are for PeerState + _context.statManager().createRateStat("udp.congestionOccurred", "How large the cwin was when congestion occurred (duration == sendBps)", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.congestedRTO", "retransmission timeout after congestion (duration == rtt dev)", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.sendACKPartial", "Number of partial ACKs sent (duration == number of full ACKs in that ack packet)", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.sendBps", "How fast we are transmitting when a packet is acked", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.receiveBps", "How fast we are receiving when a packet is fully received (at most one per second)", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.mtuIncrease", "How many retransmissions have there been to the peer when the MTU was increased (period is total packets transmitted)", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.mtuDecrease", "How many retransmissions have there been to the peer when the MTU was decreased (period is total packets transmitted)", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.rejectConcurrentActive", "How many messages are currently being sent to the peer when we reject it (period is how many concurrent packets we allow)", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.allowConcurrentActive", "How many messages are currently being sent to the peer when we accept it (period is how many concurrent packets we allow)", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.rejectConcurrentSequence", "How many consecutive concurrency rejections have we had when we stop rejecting (period is how many concurrent packets we are on)", "udp", UDPTransport.RATES); + //_context.statManager().createRateStat("udp.queueDropSize", "How many messages were queued up when it was considered full, causing a tail drop?", "udp", UDPTransport.RATES); + //_context.statManager().createRateStat("udp.queueAllowTotalLifetime", "When a peer is retransmitting and we probabalistically allow a new message, what is the sum of the pending message lifetimes? (period is the new message's lifetime)?", "udp", UDPTransport.RATES); } public void startup() { @@ -318,6 +331,7 @@ class EstablishmentManager { /** * Got a SessionDestroy on an established conn + * @since 0.8.1 */ void receiveSessionDestroy(RemoteHostId from, PeerState state) { if (_log.shouldLog(Log.DEBUG)) @@ -327,6 +341,7 @@ class EstablishmentManager { /** * Got a SessionDestroy during outbound establish + * @since 0.8.1 */ void receiveSessionDestroy(RemoteHostId from, OutboundEstablishState state) { if (_log.shouldLog(Log.DEBUG)) @@ -338,6 +353,7 @@ class EstablishmentManager { /** * Got a SessionDestroy - maybe after an inbound establish + * @since 0.8.1 */ void receiveSessionDestroy(RemoteHostId from) { if (_log.shouldLog(Log.DEBUG)) diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java index f685c9e787..9d68f898b5 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java @@ -18,14 +18,14 @@ import net.i2p.util.Log; * */ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ - private RouterContext _context; - private Log _log; + private final RouterContext _context; + private final Log _log; /** list of message IDs recently received, so we can ignore in flight dups */ private DecayingBloomFilter _recentlyCompletedMessages; - private OutboundMessageFragments _outbound; - private UDPTransport _transport; - private ACKSender _ackSender; - private MessageReceiver _messageReceiver; + private final OutboundMessageFragments _outbound; + private final UDPTransport _transport; + private final ACKSender _ackSender; + private final MessageReceiver _messageReceiver; private boolean _alive; /** decay the recently completed every 20 seconds */ @@ -148,8 +148,8 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ from.messageFullyReceived(messageId, state.getCompleteSize()); _ackSender.ackPeer(from); - if (_log.shouldLog(Log.INFO)) - _log.info("Message received completely! " + state); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Message received completely! " + state); _context.statManager().addRateData("udp.receivedCompleteTime", state.getLifetime(), state.getLifetime()); if (state.getFragmentCount() > 0) @@ -158,7 +158,7 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ state.releaseResources(); if (_log.shouldLog(Log.WARN)) _log.warn("Message expired while only being partially read: " + state); - _context.messageHistory().droppedInboundMessage(state.getMessageId(), state.getFrom(), "expired hile partially read: " + state.toString()); + _context.messageHistory().droppedInboundMessage(state.getMessageId(), state.getFrom(), "expired while partially read: " + state.toString()); } else if (partialACK) { // not expired but not yet complete... lets queue up a partial ACK if (_log.shouldLog(Log.DEBUG)) @@ -174,10 +174,13 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ return fragments; } + /** + * @return the number of bitfields in the ack? why? + */ private int receiveACKs(PeerState from, UDPPacketReader.DataReader data) { int rv = 0; + boolean newAck = false; if (data.readACKsIncluded()) { - int fragments = 0; int ackCount = data.readACKCount(); if (ackCount > 0) { rv += ackCount; @@ -186,9 +189,13 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ for (int i = 0; i < ackCount; i++) { long id = data.readACK(i); - if (_log.shouldLog(Log.INFO)) - _log.info("Full ACK of message " + id + " received!"); - fragments += _outbound.acked(id, from.getRemotePeer()); + if (from.acked(id)) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("First full ACK of message " + id + " received from " + from.getRemotePeer()); + newAck = true; + //} else if (_log.shouldLog(Log.DEBUG)) { + // _log.debug("Dup full ACK of message " + id + " received from " + from.getRemotePeer()); + } } } else { _log.error("Received ACKs with no acks?! " + data); @@ -201,9 +208,13 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ //_context.statManager().getStatLog().addData(from.getRemoteHostId().toString(), "udp.peer.receivePartialACKCount", bitfields.length, 0); for (int i = 0; i < bitfields.length; i++) { - if (_log.shouldLog(Log.INFO)) - _log.info("Partial ACK received: " + bitfields[i]); - _outbound.acked(bitfields[i], from.getRemotePeer()); + if (from.acked(bitfields[i])) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Final partial ACK received: " + bitfields[i] + " from " + from.getRemotePeer()); + newAck = true; + } else if (_log.shouldLog(Log.DEBUG)) { + _log.debug("Partial ACK received: " + bitfields[i] + " from " + from.getRemotePeer()); + } } } } @@ -211,6 +222,13 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ from.ECNReceived(); else from.dataReceived(); + + // Wake up the packet pusher if it is sleeping. + // By calling add(), this also is a failsafe against possible + // races in OutboundMessageFragments. + if (newAck && from.getOutboundMessageCount() > 0) + _outbound.add(from); + return rv; } } diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java index 9261dc70a3..6bc349862b 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java @@ -12,20 +12,20 @@ import net.i2p.util.Log; * */ class InboundMessageState { - private RouterContext _context; - private Log _log; - private long _messageId; - private Hash _from; + private final RouterContext _context; + private final Log _log; + private final long _messageId; + private final Hash _from; /** * indexed array of fragments for the message, where not yet * received fragments are null. */ - private ByteArray _fragments[]; + private final ByteArray _fragments[]; /** * what is the last fragment in the message (or -1 if not yet known) */ private int _lastFragment; - private long _receiveBegin; + private final long _receiveBegin; private int _completeSize; private boolean _released; @@ -33,7 +33,8 @@ class InboundMessageState { private static final long MAX_RECEIVE_TIME = 10*1000; public static final int MAX_FRAGMENTS = 64; - private static final ByteCache _fragmentCache = ByteCache.getInstance(64, 2048); + private static final int MAX_FRAGMENT_SIZE = UDPPacket.MAX_PACKET_SIZE; + private static final ByteCache _fragmentCache = ByteCache.getInstance(64, MAX_FRAGMENT_SIZE); public InboundMessageState(RouterContext ctx, long messageId, Hash from) { _context = ctx; @@ -153,10 +154,12 @@ class InboundMessageState { } public void releaseResources() { - if (_fragments != null) - for (int i = 0; i < _fragments.length; i++) + for (int i = 0; i < _fragments.length; i++) { + if (_fragments[i] != null) { _fragmentCache.release(_fragments[i]); - //_fragments = null; + _fragments[i] = null; + } + } _released = true; } @@ -178,7 +181,7 @@ class InboundMessageState { buf.append(" completely received with "); buf.append(getCompleteSize()).append(" bytes"); } else { - for (int i = 0; (_fragments != null) && (i < _fragments.length); i++) { + for (int i = 0; i < _lastFragment; i++) { buf.append(" fragment ").append(i); if (_fragments[i] != null) buf.append(": known at size ").append(_fragments[i].getValid()); diff --git a/router/java/src/net/i2p/router/transport/udp/IntroductionManager.java b/router/java/src/net/i2p/router/transport/udp/IntroductionManager.java index b39f44de9a..85855b0395 100644 --- a/router/java/src/net/i2p/router/transport/udp/IntroductionManager.java +++ b/router/java/src/net/i2p/router/transport/udp/IntroductionManager.java @@ -21,10 +21,10 @@ import net.i2p.util.Log; * */ class IntroductionManager { - private RouterContext _context; - private Log _log; - private UDPTransport _transport; - private PacketBuilder _builder; + private final RouterContext _context; + private final Log _log; + private final UDPTransport _transport; + private final PacketBuilder _builder; /** map of relay tag to PeerState that should receive the introduction */ private final Map _outbound; /** list of peers (PeerState) who have given us introduction tags */ diff --git a/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java b/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java index dede98ef7d..c0bce608e0 100644 --- a/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java +++ b/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java @@ -20,9 +20,9 @@ import net.i2p.util.Log; * {@link net.i2p.router.InNetMessagePool} by way of the {@link UDPTransport}. */ class MessageReceiver { - private RouterContext _context; - private Log _log; - private UDPTransport _transport; + private final RouterContext _context; + private final Log _log; + private final UDPTransport _transport; /** list of messages (InboundMessageState) fully received but not interpreted yet */ private final BlockingQueue _completeMessages; private boolean _alive; diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java b/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java index 76753a5f58..0c4f265f72 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java @@ -50,7 +50,7 @@ class OutboundEstablishState { private long _nextSend; private RemoteHostId _remoteHostId; private final RouterIdentity _remotePeer; - private SessionKey _introKey; + private final SessionKey _introKey; private final Queue _queuedMessages; private int _currentState; private long _introductionNonce; diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java index 3af1adc2cc..26ce9c44af 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java @@ -1,13 +1,16 @@ package net.i2p.router.transport.udp; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import java.util.Set; import net.i2p.data.Hash; import net.i2p.data.RouterInfo; import net.i2p.data.i2np.I2NPMessage; import net.i2p.router.OutNetMessage; import net.i2p.router.RouterContext; +import net.i2p.util.ConcurrentHashSet; import net.i2p.util.Log; /** @@ -23,16 +26,33 @@ import net.i2p.util.Log; * */ class OutboundMessageFragments { - private RouterContext _context; - private Log _log; - private UDPTransport _transport; + private final RouterContext _context; + private final Log _log; + private final UDPTransport _transport; // private ActiveThrottle _throttle; // LINT not used ?? - /** peers we are actively sending messages to */ - private final List _activePeers; + + /** + * Peers we are actively sending messages to. + * We use the iterator so we treat it like a list, + * but we use a HashSet so remove() is fast and + * we don't need to do contains(). + * Even though most (but NOT all) accesses are synchronized, + * we use a ConcurrentHashSet as the iterator is long-lived. + */ + private final Set _activePeers; + + /** + * The long-lived iterator over _activePeers. + */ + private Iterator _iterator; + + /** + * Avoid sync in add() if possible (not 100% reliable) + */ + private boolean _isWaiting; + private boolean _alive; - /** which peer should we build the next packet out of? */ - private int _nextPeer; - private PacketBuilder _builder; + private final PacketBuilder _builder; private long _lastCycleTime = System.currentTimeMillis(); /** if we can handle more messages explicitly, set this to true */ @@ -42,13 +62,14 @@ class OutboundMessageFragments { // private static final int MAX_ACTIVE = 64; // not used. // don't send a packet more than 10 times static final int MAX_VOLLEYS = 10; + private static final int MAX_WAIT = 1000; public OutboundMessageFragments(RouterContext ctx, UDPTransport transport, ActiveThrottle throttle) { _context = ctx; _log = ctx.logManager().getLog(OutboundMessageFragments.class); _transport = transport; // _throttle = throttle; - _activePeers = new ArrayList(256); + _activePeers = new ConcurrentHashSet(256); _builder = new PacketBuilder(ctx, transport); _alive = true; // _allowExcess = false; @@ -59,6 +80,7 @@ class OutboundMessageFragments { _context.statManager().createRateStat("udp.sendFailed", "How many sends a failed message was pushed", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.sendAggressiveFailed", "How many volleys was a packet sent before we gave up", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.outboundActiveCount", "How many messages are in the peer's active pool", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.outboundActivePeers", "How many peers we are actively sending to", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.sendRejected", "What volley are we on when the peer was throttled (time == message lifetime)", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.partialACKReceived", "How many fragments were partially ACKed (time == message lifetime)", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.sendSparse", "How many fragments were partially ACKed and hence not resent (time == message lifetime)", "udp", UDPTransport.RATES); @@ -72,20 +94,20 @@ class OutboundMessageFragments { } public void startup() { _alive = true; } + public void shutdown() { _alive = false; + _activePeers.clear(); synchronized (_activePeers) { _activePeers.notifyAll(); } } + void dropPeer(PeerState peer) { if (_log.shouldLog(Log.INFO)) _log.info("Dropping peer " + peer.getRemotePeer().toBase64()); peer.dropOutbound(); - synchronized (_activePeers) { - _activePeers.remove(peer); - _activePeers.notifyAll(); - } + _activePeers.remove(peer); } /** @@ -145,24 +167,12 @@ class OutboundMessageFragments { return; } int active = peer.add(state); - synchronized (_activePeers) { - if (!_activePeers.contains(peer)) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Add a new message to a new peer " + peer.getRemotePeer().toBase64()); - _activePeers.add(peer); - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Add a new message to an existing peer " + peer.getRemotePeer().toBase64()); - } - _activePeers.notifyAll(); - } - //msg.timestamp("made active along with: " + active); + add(peer); _context.statManager().addRateData("udp.outboundActiveCount", active, 0); } else { if (_log.shouldLog(Log.WARN)) _log.warn("Error initializing " + msg); } - //finishMessages(); } /** @@ -174,149 +184,186 @@ class OutboundMessageFragments { if (peer == null) throw new RuntimeException("wtf, null peer for " + state); int active = peer.add(state); - synchronized (_activePeers) { - if (!_activePeers.contains(peer)) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Add a new message to a new peer " + peer.getRemotePeer().toBase64()); - if (_activePeers.isEmpty()) - _lastCycleTime = System.currentTimeMillis(); - _activePeers.add(peer); - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Add a new message to an existing peer " + peer.getRemotePeer().toBase64()); - } - _activePeers.notifyAll(); - } + add(peer); _context.statManager().addRateData("udp.outboundActiveCount", active, 0); - // should we finish messages here too? - /* - synchronized (_activeMessages) { - _activeMessages.add(state); - if (_activeMessages.size() == 1) + } + + /** + * Add the peer to the list of peers wanting to transmit something. + * This wakes up the packet pusher if it is sleeping. + * + * Avoid synchronization where possible. + * There are small chances of races. + * There are larger chances of adding the PeerState "behind" where + * the iterator is now... but these issues are the same as before concurrentification. + * + * @since 0.8.9 + */ + public void add(PeerState peer) { + boolean wasEmpty = _activePeers.isEmpty(); + boolean added = _activePeers.add(peer); + if (added) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Add a new message to a new peer " + peer.getRemotePeer().toBase64()); + if (wasEmpty) _lastCycleTime = System.currentTimeMillis(); - _activeMessages.notifyAll(); + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Add a new message to an existing peer " + peer.getRemotePeer().toBase64()); + } + _context.statManager().addRateData("udp.outboundActivePeers", _activePeers.size(), 0); + + // Avoid sync if possible + // no, this doesn't always work. + // Also note that the iterator in getNextVolley may have alreay passed us, + // or not reflect the addition. + if (_isWaiting || wasEmpty) { + synchronized (_activePeers) { + _activePeers.notifyAll(); + } } - */ } /** * Remove any expired or complete messages */ +/**** private void finishMessages() { - int rv = 0; - List peers = null; - synchronized (_activePeers) { - peers = new ArrayList(_activePeers.size()); - for (int i = 0; i < _activePeers.size(); i++) { - PeerState state = _activePeers.get(i); - if (state.getOutboundMessageCount() <= 0) { - _activePeers.remove(i); - i--; - } else { - peers.add(state); - } - } - _activePeers.notifyAll(); - } - for (int i = 0; i < peers.size(); i++) { - PeerState state = (PeerState)peers.get(i); - int remaining = state.finishMessages(); - if (remaining <= 0) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("No more pending messages for " + state.getRemotePeer().toBase64()); - } - rv += remaining; - } - } - + for (Iterator iter = _activePeers.iterator(); iter.hasNext(); ) { + PeerState state = iter.next(); + if (state.getOutboundMessageCount() <= 0) { + iter.remove(); + } else { + int remaining = state.finishMessages(); + if (remaining <= 0) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("No more pending messages for " + state.getRemotePeer().toBase64()); + iter.remove(); + } + } + } + } +****/ + /** * Fetch all the packets for a message volley, blocking until there is a * message which can be fully transmitted (or the transport is shut down). * The returned array may be sparse, with null packets taking the place of * already ACKed fragments. * + * NOT thread-safe. Called by the PacketPusher thread only. + * + * @return null only on shutdown */ public UDPPacket[] getNextVolley() { PeerState peer = null; OutboundMessageState state = null; + // Keep track of how many we've looked at, since we don't start the iterator at the beginning. + int peersProcessed = 0; while (_alive && (state == null) ) { - long now = _context.clock().now(); - int nextSendDelay = -1; - finishMessages(); - try { - synchronized (_activePeers) { - for (int i = 0; i < _activePeers.size(); i++) { - int cur = (i + _nextPeer) % _activePeers.size(); - if (cur == 0) { - // FIXME or delete, these stats aren't much help since they include the sleep time - long ts = System.currentTimeMillis(); - long cycleTime = ts - _lastCycleTime; - _lastCycleTime = ts; - _context.statManager().addRateData("udp.sendCycleTime", cycleTime, _activePeers.size()); - // make longer than the default sleep time below - if (cycleTime > 1100) - _context.statManager().addRateData("udp.sendCycleTimeSlow", cycleTime, _activePeers.size()); + int nextSendDelay = Integer.MAX_VALUE; + // no, not every time - O(n**2) - do just before waiting below + //finishMessages(); + + // do we need a new long-lived iterator? + if (_iterator == null || + ((!_activePeers.isEmpty()) && (!_iterator.hasNext()))) { + _iterator = _activePeers.iterator(); + } + + // Go through all the peers that we are actively sending messages to. + // Call finishMessages() for each one, and remove them from the iterator + // if there is nothing left to send. + // Otherwise, return the volley to be sent. + // Otherwise, wait() + while (_iterator.hasNext()) { + peer = _iterator.next(); + int remaining = peer.finishMessages(); + if (remaining <= 0) { + // race with add() + _iterator.remove(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("No more pending messages for " + peer.getRemotePeer().toBase64()); + continue; } - peer = _activePeers.get(i); + peersProcessed++; state = peer.allocateSend(); if (state != null) { // we have something to send and we will be returning it - _nextPeer = i + 1; + break; + } else if (peersProcessed >= _activePeers.size()) { + // we've gone all the way around, time to sleep break; } else { - // Update the minimum delay for all peers (getNextDelay() returns 1 for "now") + // Update the minimum delay for all peers // which will be used if we found nothing to send across all peers int delay = peer.getNextDelay(); - if ( (nextSendDelay <= 0) || (delay < nextSendDelay) ) + if (delay < nextSendDelay) nextSendDelay = delay; peer = null; - state = null; } } - if (_log.shouldLog(Log.DEBUG)) + + if (peer != null && _log.shouldLog(Log.DEBUG)) _log.debug("Done looping, next peer we are sending for: " + - (peer != null ? peer.getRemotePeer().toBase64() : "none")); - if (state == null) { + peer.getRemotePeer().toBase64()); + + // if we've gone all the way through the loop, wait + // ... unless nextSendDelay says we have more ready now + if (state == null && peersProcessed >= _activePeers.size() && nextSendDelay > 0) { + _isWaiting = true; + peersProcessed = 0; + // why? we do this in the loop one at a time + //finishMessages(); + // wait a min of 10 and a max of MAX_WAIT ms no matter what peer.getNextDelay() says + // use max of 1 second so finishMessages() and/or PeerState.finishMessages() + // gets called regularly + int toWait = Math.min(Math.max(nextSendDelay, 10), MAX_WAIT); if (_log.shouldLog(Log.DEBUG)) - _log.debug("wait for " + nextSendDelay); + _log.debug("wait for " + toWait); // wait.. or somethin' - // wait a min of 10 and a max of 3000 ms no matter what peer.getNextDelay() says - if (nextSendDelay > 0) - _activePeers.wait(Math.min(Math.max(nextSendDelay, 10), 3000)); - else - _activePeers.wait(1000); - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("dont wait: alive=" + _alive + " state = " + state); + synchronized (_activePeers) { + try { + _activePeers.wait(toWait); + } catch (InterruptedException ie) { + // noop + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Woken up while waiting"); + } + } + _isWaiting = false; + //} else { + // if (_log.shouldLog(Log.DEBUG)) + // _log.debug("dont wait: alive=" + _alive + " state = " + state); } - } - } catch (InterruptedException ie) { - // noop - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Woken up while waiting"); - } - } + + } // while alive && state == null if (_log.shouldLog(Log.DEBUG)) _log.debug("Sending " + state); UDPPacket packets[] = preparePackets(state, peer); + + /**** if ( (state != null) && (state.getMessage() != null) ) { int valid = 0; for (int i = 0; packets != null && i < packets.length ; i++) if (packets[i] != null) valid++; - /* state.getMessage().timestamp("sending a volley of " + valid + " lastReceived: " + (_context.clock().now() - peer.getLastReceiveTime()) + " lastSentFully: " + (_context.clock().now() - peer.getLastSendFullyTime())); - */ } + ****/ + return packets; } + /** + * @return null if state or peer is null + */ private UDPPacket[] preparePackets(OutboundMessageState state, PeerState peer) { if ( (state != null) && (peer != null) ) { int fragments = state.getFragmentCount(); @@ -397,37 +444,6 @@ class OutboundMessageFragments { } } - /** - * We received an ACK of the given messageId from the given peer, so if it - * is still unacked, mark it as complete. - * - * @return fragments acked - */ - public int acked(long messageId, Hash ackedBy) { - PeerState peer = _transport.getPeerState(ackedBy); - if (peer != null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("acked [" + messageId + "] by " + ackedBy.toBase64()); - return peer.acked(messageId); - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("acked [" + messageId + "] by an unknown remote peer? " + ackedBy.toBase64()); - return 0; - } - } - - public void acked(ACKBitfield bitfield, Hash ackedBy) { - PeerState peer = _transport.getPeerState(ackedBy); - if (peer != null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("partial acked [" + bitfield + "] by " + ackedBy.toBase64()); - peer.acked(bitfield); - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("partial acked [" + bitfield + "] by an unknown remote peer? " + ackedBy.toBase64()); - } - } - public interface ActiveThrottle { public void choke(Hash peer); public void unchoke(Hash peer); diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java index 0e5ed7ef36..362ce4f4ff 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java @@ -12,12 +12,12 @@ import net.i2p.util.ByteCache; import net.i2p.util.Log; /** - * Maintain the outbound fragmentation for resending + * Maintain the outbound fragmentation for resending, for a single message. * */ class OutboundMessageState { - private I2PAppContext _context; - private Log _log; + private final I2PAppContext _context; + private final Log _log; /** may be null if we are part of the establishment */ private OutNetMessage _message; private long _messageId; @@ -49,6 +49,7 @@ class OutboundMessageState { _log = _context.logManager().getLog(OutboundMessageState.class); } +/**** public boolean initialize(OutNetMessage msg) { if (msg == null) return false; try { @@ -60,7 +61,11 @@ class OutboundMessageState { return false; } } +****/ + /** + * Called from UDPTransport + */ public boolean initialize(I2NPMessage msg, PeerState peer) { if (msg == null) return false; @@ -75,6 +80,9 @@ class OutboundMessageState { } } + /** + * Called from OutboundMessageFragments + */ public boolean initialize(OutNetMessage m, I2NPMessage msg) { if ( (m == null) || (msg == null) ) return false; @@ -198,6 +206,7 @@ class OutboundMessageState { sends[i] = (short)-1; boolean rv = isComplete(); + /**** if (!rv && false) { // don't do the fast retransmit... lets give it time to get ACKed long nextTime = _context.clock().now() + Math.max(_peer.getRTT(), ACKSender.ACK_FREQUENCY); //_nextSendTime = Math.max(now, _startedOn+PeerState.MIN_RTO); @@ -210,6 +219,7 @@ class OutboundMessageState { // _nextSendTime = now + 100; //_nextSendTime = now; } + ****/ return rv; } diff --git a/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java b/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java index cc43c4b2e3..a96c9d1cf4 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java @@ -95,9 +95,9 @@ around briefly, to address packet loss and reordering.

* */ class PacketBuilder { - private I2PAppContext _context; - private Log _log; - private UDPTransport _transport; + private final I2PAppContext _context; + private final Log _log; + private final UDPTransport _transport; private static final ByteCache _ivCache = ByteCache.getInstance(64, UDPPacket.IV_SIZE); private static final ByteCache _hmacCache = ByteCache.getInstance(64, Hash.HASH_LENGTH); @@ -656,6 +656,8 @@ class PacketBuilder { /** * Build a destroy packet, which contains a header but no body. + * Session must be established or this will NPE in authenticate(). + * Unused until 0.8.9. * * @since 0.8.1 */ diff --git a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java index 1355f09e90..972632077a 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java @@ -372,8 +372,8 @@ class PacketHandler { if (state.getMACKey() != null) { isValid = packet.validate(state.getMACKey()); if (isValid) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Valid introduction packet received for inbound con: " + packet); + if (_log.shouldLog(Log.INFO)) + _log.info("Valid introduction packet received for inbound con: " + packet); _state = 32; packet.decrypt(state.getCipherKey()); @@ -418,8 +418,8 @@ class PacketHandler { _state = 36; isValid = packet.validate(state.getMACKey()); if (isValid) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Valid introduction packet received for outbound established con: " + packet); + if (_log.shouldLog(Log.INFO)) + _log.info("Valid introduction packet received for outbound established con: " + packet); _state = 37; packet.decrypt(state.getCipherKey()); @@ -432,8 +432,8 @@ class PacketHandler { // keys not yet exchanged, lets try it with the peer's intro key isValid = packet.validate(state.getIntroKey()); if (isValid) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Valid introduction packet received for outbound established con with old intro key: " + packet); + if (_log.shouldLog(Log.INFO)) + _log.info("Valid introduction packet received for outbound established con with old intro key: " + packet); _state = 39; packet.decrypt(state.getIntroKey()); handlePacket(reader, packet, null, state, null); diff --git a/router/java/src/net/i2p/router/transport/udp/PacketPusher.java b/router/java/src/net/i2p/router/transport/udp/PacketPusher.java index 50b8d3ba78..7b4d7f3e24 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketPusher.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketPusher.java @@ -11,10 +11,10 @@ import net.i2p.util.Log; */ class PacketPusher implements Runnable { // private RouterContext _context; - private Log _log; - private OutboundMessageFragments _fragments; - private UDPSender _sender; - private boolean _alive; + private final Log _log; + private final OutboundMessageFragments _fragments; + private final UDPSender _sender; + private volatile boolean _alive; public PacketPusher(RouterContext ctx, OutboundMessageFragments fragments, UDPSender sender) { // _context = ctx; diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index 1006f52a32..fdc8aa17cb 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -24,8 +24,8 @@ import net.i2p.util.ConcurrentHashSet; * */ class PeerState { - private RouterContext _context; - private Log _log; + private final RouterContext _context; + private final Log _log; /** * The peer are we talking to. This should be set as soon as this * state is created if we are initiating a connection, but if we are @@ -157,7 +157,7 @@ class PeerState { /* how many consecutive packets at or under the min MTU have been received */ private long _consecutiveSmall; /** when did we last check the MTU? */ - private long _mtuLastChecked; + //private long _mtuLastChecked; private long _mtuIncreases; private long _mtuDecreases; /** current round trip time estimate */ @@ -192,7 +192,7 @@ class PeerState { /** which outbound message is currently being retransmitted */ private OutboundMessageState _retransmitter; - private UDPTransport _transport; + private final UDPTransport _transport; /** have we migrated away from this peer to another newer one? */ private volatile boolean _dead; @@ -224,7 +224,7 @@ class PeerState { * of 608 * * Well, we really need to count the acks as well, especially - * 4 * MAX_RESEND_ACKS which can take up a significant amount of space. + * 1 + (4 * MAX_RESEND_ACKS_SMALL) which can take up a significant amount of space. * We reduce the max acks when using the small MTU but it may not be enough... * */ @@ -234,8 +234,14 @@ class PeerState { * based on measurements, 1350 fits nearly all reasonably small I2NP messages * (larger I2NP messages may be up to 1900B-4500B, which isn't going to fit * into a live network MTU anyway) + * + * TODO + * VTBM is 2646, it would be nice to fit in two large + * 2646 / 2 = 1323 + * 1323 + 74 + 46 + 1 + (4 * 9) = 1480 + * So why not make it 1492 (old ethernet is 1492, new is 1500) */ - private static final int LARGE_MTU = 1350; + private static final int LARGE_MTU = 1492; private static final int MIN_RTO = 100 + ACKSender.ACK_FREQUENCY; private static final int MAX_RTO = 3000; // 5000; @@ -261,25 +267,14 @@ class PeerState { _remotePort = -1; _mtu = getDefaultMTU(); _mtuReceive = _mtu; - _mtuLastChecked = -1; + //_mtuLastChecked = -1; _lastACKSend = -1; _rto = MIN_RTO; _rtt = _rto/2; _rttDeviation = _rtt; _inboundMessages = new HashMap(8); _outboundMessages = new ArrayList(32); - _context.statManager().createRateStat("udp.congestionOccurred", "How large the cwin was when congestion occurred (duration == sendBps)", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.congestedRTO", "retransmission timeout after congestion (duration == rtt dev)", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.sendACKPartial", "Number of partial ACKs sent (duration == number of full ACKs in that ack packet)", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.sendBps", "How fast we are transmitting when a packet is acked", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.receiveBps", "How fast we are receiving when a packet is fully received (at most one per second)", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.mtuIncrease", "How many retransmissions have there been to the peer when the MTU was increased (period is total packets transmitted)", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.mtuDecrease", "How many retransmissions have there been to the peer when the MTU was decreased (period is total packets transmitted)", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.rejectConcurrentActive", "How many messages are currently being sent to the peer when we reject it (period is how many concurrent packets we allow)", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.allowConcurrentActive", "How many messages are currently being sent to the peer when we accept it (period is how many concurrent packets we allow)", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.rejectConcurrentSequence", "How many consecutive concurrency rejections have we had when we stop rejecting (period is how many concurrent packets we are on)", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.queueDropSize", "How many messages were queued up when it was considered full, causing a tail drop?", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.queueAllowTotalLifetime", "When a peer is retransmitting and we probabalistically allow a new message, what is the sum of the pending message lifetimes? (period is the new message's lifetime)?", "udp", UDPTransport.RATES); + // all createRateStat() moved to EstablishmentManager } private int getDefaultMTU() { @@ -381,13 +376,20 @@ class PeerState { public long getTheyRelayToUsAs() { return _theyRelayToUsAs; } /** what is the largest packet we can send to the peer? */ public int getMTU() { return _mtu; } - /** estimate how large the other side is sending packets */ + + /** + * Estimate how large the other side's MTU is. + * This could be wrong. + * It is used only for the HTML status. + */ public int getReceiveMTU() { return _mtuReceive; } + /** when did we last check the MTU? */ + /**** public long getMTULastChecked() { return _mtuLastChecked; } public long getMTUIncreases() { return _mtuIncreases; } public long getMTUDecreases() { return _mtuDecreases; } - + ****/ /** * The peer are we talking to. This should be set as soon as this @@ -548,11 +550,15 @@ class PeerState { * we can use to publish that fact. */ public void setTheyRelayToUsAs(long tag) { _theyRelayToUsAs = tag; } + /** what is the largest packet we can send to the peer? */ + /**** public void setMTU(int mtu) { _mtu = mtu; _mtuLastChecked = _context.clock().now(); } + ****/ + public int getSlowStartThreshold() { return _slowStartThreshold; } public int getConcurrentSends() { return _concurrentMessagesActive; } public int getConcurrentSendWindow() { return _concurrentMessagesAllowed; } @@ -990,15 +996,21 @@ class PeerState { public long getPacketRetransmissionRate() { return _packetRetransmissionRate; } public long getPacketsReceived() { return _packetsReceived; } public long getPacketsReceivedDuplicate() { return _packetsReceivedDuplicate; } + + private static final int MTU_RCV_DISPLAY_THRESHOLD = 20; + public void packetReceived(int size) { _packetsReceived++; - if (size <= MIN_MTU) + if (size <= MIN_MTU) { _consecutiveSmall++; - else + } else { _consecutiveSmall = 0; + _mtuReceive = LARGE_MTU; + return; + } - if (_packetsReceived > 50) { - if (_consecutiveSmall < 50) + if (_packetsReceived > MTU_RCV_DISPLAY_THRESHOLD) { + if (_consecutiveSmall < MTU_RCV_DISPLAY_THRESHOLD) _mtuReceive = LARGE_MTU; else _mtuReceive = MIN_MTU; @@ -1061,7 +1073,6 @@ class PeerState { if (_log.shouldLog(Log.DEBUG)) _log.debug("Adding to " + _remotePeer.toBase64() + ": " + state.getMessageId()); List msgs = _outboundMessages; - if (msgs == null) return 0; int rv = 0; boolean fail = false; synchronized (msgs) { @@ -1070,11 +1081,14 @@ class PeerState { // 32 queued messages? to *one* peer? nuh uh. fail = true; rv--; + + /******* proactive tail drop disabled by jr 2006-04-19 so all this is pointless + } else if (_retransmitter != null) { long lifetime = _retransmitter.getLifetime(); long totalLifetime = lifetime; for (int i = 1; i < msgs.size(); i++) { // skip the first, as thats the retransmitter - OutboundMessageState cur = (OutboundMessageState)msgs.get(i); + OutboundMessageState cur = msgs.get(i); totalLifetime += cur.getLifetime(); } long remaining = -1; @@ -1103,6 +1117,9 @@ class PeerState { _context.statManager().addRateData("udp.queueAllowTotalLifetime", totalLifetime, lifetime); msgs.add(state); } + + *******/ + } else { msgs.add(state); } @@ -1111,6 +1128,7 @@ class PeerState { _transport.failed(state, false); return rv; } + /** drop all outbound messages */ public void dropOutbound() { //if (_dead) return; @@ -1118,7 +1136,7 @@ class PeerState { List msgs = _outboundMessages; //_outboundMessages = null; _retransmitter = null; - if (msgs != null) { + int sz = 0; List tempList = null; synchronized (msgs) { @@ -1130,21 +1148,17 @@ class PeerState { } for (int i = 0; i < sz; i++) _transport.failed(tempList.get(i), false); - } + // so the ACKSender will drop this peer from its queue _wantACKSendSince = -1; } + /** + * @return number of active outbound messages remaining (unsynchronized) + */ public int getOutboundMessageCount() { - List msgs = _outboundMessages; if (_dead) return 0; - if (msgs != null) { - synchronized (msgs) { - return msgs.size(); - } - } else { - return 0; - } + return _outboundMessages.size(); } /** @@ -1152,39 +1166,37 @@ class PeerState { * @return number of active outbound messages remaining */ public int finishMessages() { - int rv = 0; List msgs = _outboundMessages; + // short circuit, unsynchronized + if (msgs.isEmpty()) + return 0; + if (_dead) { dropOutbound(); return 0; } + + int rv = 0; List succeeded = null; List failed = null; synchronized (msgs) { - int size = msgs.size(); - for (int i = 0; i < size; i++) { - OutboundMessageState state = msgs.get(i); + for (Iterator iter = msgs.iterator(); iter.hasNext(); ) { + OutboundMessageState state = iter.next(); if (state.isComplete()) { - msgs.remove(i); - i--; - size--; + iter.remove(); if (_retransmitter == state) _retransmitter = null; if (succeeded == null) succeeded = new ArrayList(4); succeeded.add(state); } else if (state.isExpired()) { - msgs.remove(i); - i--; - size--; + iter.remove(); if (_retransmitter == state) _retransmitter = null; _context.statManager().addRateData("udp.sendFailed", state.getPushCount(), state.getLifetime()); if (failed == null) failed = new ArrayList(4); failed.add(state); } else if (state.getPushCount() > OutboundMessageFragments.MAX_VOLLEYS) { - msgs.remove(i); - i--; - size--; + iter.remove(); if (state == _retransmitter) _retransmitter = null; _context.statManager().addRateData("udp.sendAggressiveFailed", state.getPushCount(), state.getLifetime()); @@ -1232,9 +1244,7 @@ class PeerState { List msgs = _outboundMessages; if (_dead) return null; synchronized (msgs) { - int size = msgs.size(); - for (int i = 0; i < size; i++) { - OutboundMessageState state = msgs.get(i); + for (OutboundMessageState state : msgs) { if (locked_shouldSend(state)) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Allocate sending to " + _remotePeer.toBase64() + ": " + state.getMessageId()); @@ -1261,28 +1271,22 @@ class PeerState { } /** - * return how long to wait before sending, or -1 if we have nothing to send + * @return how long to wait before sending, or Integer.MAX_VALUE if we have nothing to send. + * If ready now, will return 0 or a negative value. */ public int getNextDelay() { - int rv = -1; + int rv = Integer.MAX_VALUE; + if (_dead) return rv; long now = _context.clock().now(); List msgs = _outboundMessages; - if (_dead) return -1; synchronized (msgs) { if (_retransmitter != null) { rv = (int)(_retransmitter.getNextSendTime() - now); - if (rv <= 0) - return 1; - else - return rv; + return rv; } - int size = msgs.size(); - for (int i = 0; i < size; i++) { - OutboundMessageState state = msgs.get(i); + for (OutboundMessageState state : msgs) { int delay = (int)(state.getNextSendTime() - now); - if (delay <= 0) - delay = 1; - if ( (rv <= 0) || (delay < rv) ) + if (delay < rv) rv = delay; } } @@ -1306,7 +1310,11 @@ class PeerState { private static final int SSU_HEADER_SIZE = 46; static final int UDP_HEADER_SIZE = 8; static final int IP_HEADER_SIZE = 20; - /** how much payload data can we shove in there? */ + + /** + * how much payload data can we shove in there? + * @return MTU - 74 + */ private static final int fragmentSize(int mtu) { return mtu - SSU_HEADER_SIZE - UDP_HEADER_SIZE - IP_HEADER_SIZE; } @@ -1315,7 +1323,7 @@ class PeerState { long now = _context.clock().now(); if (state.getNextSendTime() <= now) { if (!state.isFragmented()) { - state.fragment(fragmentSize(getMTU())); + state.fragment(fragmentSize(_mtu)); if (state.getMessage() != null) state.getMessage().timestamp("fragment into " + state.getFragmentCount()); @@ -1372,14 +1380,14 @@ class PeerState { _context.statManager().addRateData("udp.sendRejected", state.getPushCount(), state.getLifetime()); //if (state.getMessage() != null) // state.getMessage().timestamp("send rejected, available=" + getSendWindowBytesRemaining()); - if (_log.shouldLog(Log.WARN)) - _log.warn("Allocation of " + size + " rejected w/ wsize=" + getSendWindowBytes() + if (_log.shouldLog(Log.INFO)) + _log.info("Allocation of " + size + " rejected w/ wsize=" + getSendWindowBytes() + " available=" + getSendWindowBytesRemaining() + " for message " + state.getMessageId() + ": " + state); state.setNextSendTime(now + (ACKSender.ACK_FREQUENCY / 2) + _context.random().nextInt(ACKSender.ACK_FREQUENCY)); //(now + 1024) & ~SECOND_MASK); - if (_log.shouldLog(Log.WARN)) - _log.warn("Retransmit after choke for next send time in " + (state.getNextSendTime()-now) + "ms"); + if (_log.shouldLog(Log.INFO)) + _log.info("Retransmit after choke for next send time in " + (state.getNextSendTime()-now) + "ms"); //_throttle.choke(peer.getRemotePeer()); //if (state.getMessage() != null) @@ -1393,16 +1401,20 @@ class PeerState { return false; } - public int acked(long messageId) { + /** + * A full ACK was received. + * + * @return true if the message was acked for the first time + */ + public boolean acked(long messageId) { + if (_dead) return false; OutboundMessageState state = null; List msgs = _outboundMessages; - if (_dead) return 0; synchronized (msgs) { - int sz = msgs.size(); - for (int i = 0; i < sz; i++) { - state = msgs.get(i); + for (Iterator iter = msgs.iterator(); iter.hasNext(); ) { + state = iter.next(); if (state.getMessageId() == messageId) { - msgs.remove(i); + iter.remove(); break; } else { state = null; @@ -1438,22 +1450,25 @@ class PeerState { // _throttle.unchoke(peer.getRemotePeer()); state.releaseResources(); - return numFragments; } else { // dupack, likely if (_log.shouldLog(Log.DEBUG)) _log.debug("Received an ACK for a message not pending: " + messageId); - return 0; } + return state != null; } - public void acked(ACKBitfield bitfield) { + /** + * A partial ACK was received. This is much less common than full ACKs. + * + * @return true if the message was completely acked for the first time + */ + public boolean acked(ACKBitfield bitfield) { if (_dead) - return; + return false; if (bitfield.receivedComplete()) { - acked(bitfield.getMessageId()); - return; + return acked(bitfield.getMessageId()); } List msgs = _outboundMessages; @@ -1461,13 +1476,13 @@ class PeerState { OutboundMessageState state = null; boolean isComplete = false; synchronized (msgs) { - for (int i = 0; i < msgs.size(); i++) { - state = msgs.get(i); + for (Iterator iter = msgs.iterator(); iter.hasNext(); ) { + state = iter.next(); if (state.getMessageId() == bitfield.getMessageId()) { boolean complete = state.acked(bitfield); if (complete) { isComplete = true; - msgs.remove(i); + iter.remove(); if (state == _retransmitter) _retransmitter = null; } @@ -1514,12 +1529,12 @@ class PeerState { //if (state.getMessage() != null) // state.getMessage().timestamp("partial ack after " + numSends + ": " + bitfield.toString()); } - return; + return isComplete; } else { // dupack if (_log.shouldLog(Log.DEBUG)) _log.debug("Received an ACK for a message not pending: " + bitfield); - return; + return false; } } @@ -1581,6 +1596,8 @@ class PeerState { } } + // why removed? Some risk of dups in OutboundMessageFragments._activePeers ??? + /* public int hashCode() { if (_remotePeer != null) diff --git a/router/java/src/net/i2p/router/transport/udp/PeerTestManager.java b/router/java/src/net/i2p/router/transport/udp/PeerTestManager.java index 46b1e46668..752c5d37be 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerTestManager.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerTestManager.java @@ -91,17 +91,17 @@ with either Bob or Charlie, but it is not required.

*/ class PeerTestManager { - private RouterContext _context; - private Log _log; - private UDPTransport _transport; - private PacketBuilder _packetBuilder; + private final RouterContext _context; + private final Log _log; + private final UDPTransport _transport; + private final PacketBuilder _packetBuilder; /** map of Long(nonce) to PeerTestState for tests currently in progress (as Bob/Charlie) */ private final Map _activeTests; /** current test we are running (as Alice), or null */ private PeerTestState _currentTest; private boolean _currentTestComplete; /** as Alice */ - private Queue _recentTests; + private final Queue _recentTests; /** longest we will keep track of a Charlie nonce for */ private static final int MAX_CHARLIE_LIFETIME = 10*1000; diff --git a/router/java/src/net/i2p/router/transport/udp/TimedWeightedPriorityMessageQueue.java b/router/java/src/net/i2p/router/transport/udp/TimedWeightedPriorityMessageQueue.java index 86fe2c8113..2973f89934 100644 --- a/router/java/src/net/i2p/router/transport/udp/TimedWeightedPriorityMessageQueue.java +++ b/router/java/src/net/i2p/router/transport/udp/TimedWeightedPriorityMessageQueue.java @@ -17,7 +17,7 @@ import net.i2p.util.Log; * with code to fail messages that expire. * * WARNING - UNUSED since 0.6.1.11 - * See comments in DQAT.java and mtn history ca. 2006-02-19 + * See comments in DummyThrottle.java and mtn history ca. 2006-02-19 * */ class TimedWeightedPriorityMessageQueue implements MessageQueue, OutboundMessageFragments.ActiveThrottle { diff --git a/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java b/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java index 1773095125..6017ebcf8e 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java @@ -137,7 +137,7 @@ class UDPEndpoint { * Add the packet to the outobund queue to be sent ASAP (as allowed by * the bandwidth limiter) * - * @return number of packets in the send queue + * @return ZERO (used to be number of packets in the queue) */ public int send(UDPPacket packet) { if (_sender == null) diff --git a/router/java/src/net/i2p/router/transport/udp/UDPPacket.java b/router/java/src/net/i2p/router/transport/udp/UDPPacket.java index 092431db37..3dd97a7040 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPPacket.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPPacket.java @@ -19,13 +19,13 @@ import net.i2p.util.Log; class UDPPacket { private I2PAppContext _context; private static Log _log; - private volatile DatagramPacket _packet; + private final DatagramPacket _packet; private volatile short _priority; private volatile long _initializeTime; private volatile long _expiration; - private byte[] _data; - private byte[] _validateBuf; - private byte[] _ivBuf; + private final byte[] _data; + private final byte[] _validateBuf; + private final byte[] _ivBuf; private volatile int _markedType; private volatile RemoteHostId _remoteHost; private volatile boolean _released; @@ -51,7 +51,13 @@ class UDPPacket { _log = I2PAppContext.getGlobalContext().logManager().getLog(UDPPacket.class); } - static final int MAX_PACKET_SIZE = 2048; + /** + * Actually it is one less than this, we assume + * if a received packet is this big it is truncated. + * This is bigger than PeerState.LARGE_MTU, as the far-end's + * LARGE_MTU may be larger than ours. + */ + static final int MAX_PACKET_SIZE = 1536; public static final int IV_SIZE = 16; public static final int MAC_SIZE = 16; @@ -81,21 +87,26 @@ class UDPPacket { private static final int MAX_VALIDATE_SIZE = MAX_PACKET_SIZE; - private UDPPacket(I2PAppContext ctx, boolean inbound) { + private UDPPacket(I2PAppContext ctx) { ctx.statManager().createRateStat("udp.fetchRemoteSlow", "How long it takes to grab the remote ip info", "udp", UDPTransport.RATES); // the data buffer is clobbered on init(..), but we need it to bootstrap _data = new byte[MAX_PACKET_SIZE]; _packet = new DatagramPacket(_data, MAX_PACKET_SIZE); _validateBuf = new byte[MAX_VALIDATE_SIZE]; _ivBuf = new byte[IV_SIZE]; - init(ctx, inbound); + init(ctx); } - // FIXME optimization, remove the inbound parameter, as it is unused. FIXME - private void init(I2PAppContext ctx, boolean inbound) { + + private void init(I2PAppContext ctx) { _context = ctx; //_dataBuf = _dataCache.acquire(); Arrays.fill(_data, (byte)0); //_packet = new DatagramPacket(_data, MAX_PACKET_SIZE); + // + // WARNING - + // Doesn't seem like we should have to do this every time, + // from reading the DatagramPacket javadocs, + // but we get massive corruption without it. _packet.setData(_data); // _isInbound = inbound; _initializeTime = _context.clock().now(); @@ -262,15 +273,18 @@ class UDPPacket { return buf.toString(); } + /** + * @param inbound unused + */ public static UDPPacket acquire(I2PAppContext ctx, boolean inbound) { UDPPacket rv = null; if (CACHE) { rv = _packetCache.poll(); if (rv != null) - rv.init(ctx, inbound); + rv.init(ctx); } if (rv == null) - rv = new UDPPacket(ctx, inbound); + rv = new UDPPacket(ctx); //if (rv._acquiredBy != null) { // _log.log(Log.CRIT, "Already acquired! current stack trace is:", new Exception()); // _log.log(Log.CRIT, "Earlier acquired:", rv._acquiredBy); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java b/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java index 493d627718..6421ed93ef 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java @@ -16,19 +16,19 @@ import net.i2p.util.Log; * */ class UDPPacketReader { - private I2PAppContext _context; - private Log _log; + private final I2PAppContext _context; + private final Log _log; private byte _message[]; private int _payloadBeginOffset; private int _payloadLength; - private SessionRequestReader _sessionRequestReader; - private SessionCreatedReader _sessionCreatedReader; - private SessionConfirmedReader _sessionConfirmedReader; - private DataReader _dataReader; - private PeerTestReader _peerTestReader; - private RelayRequestReader _relayRequestReader; - private RelayIntroReader _relayIntroReader; - private RelayResponseReader _relayResponseReader; + private final SessionRequestReader _sessionRequestReader; + private final SessionCreatedReader _sessionCreatedReader; + private final SessionConfirmedReader _sessionConfirmedReader; + private final DataReader _dataReader; + private final PeerTestReader _peerTestReader; + private final RelayRequestReader _relayRequestReader; + private final RelayIntroReader _relayIntroReader; + private final RelayResponseReader _relayResponseReader; private static final int KEYING_MATERIAL_LENGTH = 64; @@ -354,7 +354,9 @@ class UDPPacketReader { off++; // fragment info return ((int)DataHelper.fromLong(_message, off, 2)) & 0x3FFF; } - public void readMessageFragment(int fragmentNum, byte target[], int targetOffset) { + + public void readMessageFragment(int fragmentNum, byte target[], int targetOffset) + throws ArrayIndexOutOfBoundsException { int off = getFragmentBegin(fragmentNum); off += 4; // messageId off++; // fragment info diff --git a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java index 647a5639f2..bdf5b1e694 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java @@ -243,6 +243,10 @@ class UDPReceiver { // and block after we know how much we read but before // we release the packet to the inbound queue + if (size >= UDPPacket.MAX_PACKET_SIZE) { + // DatagramSocket javadocs: If the message is longer than the packet's length, the message is truncated. + throw new IOException("packet too large! truncated and dropped"); + } if (size > 0) { //FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(size, "UDP receiver"); //_context.bandwidthLimiter().requestInbound(req, size, "UDP receiver"); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPSender.java b/router/java/src/net/i2p/router/transport/udp/UDPSender.java index 4c7b97bcaf..b39e78f453 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPSender.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPSender.java @@ -16,13 +16,13 @@ import net.i2p.util.Log; * */ class UDPSender { - private RouterContext _context; - private Log _log; + private final RouterContext _context; + private final Log _log; private DatagramSocket _socket; private String _name; private final BlockingQueue _outboundQueue; private boolean _keepRunning; - private Runner _runner; + private final Runner _runner; private static final int TYPE_POISON = 99999; //private static final int MAX_QUEUED = 4; @@ -91,7 +91,7 @@ class UDPSender { * available, if requested, otherwise it returns immediately * * @param blockTime how long to block IGNORED - * @return number of packets queued + * @return ZERO (used to be number of packets in the queue) * @deprecated use add(packet) */ public int add(UDPPacket packet, int blockTime) { diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index b49906c43f..ec3e69482c 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -64,6 +64,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority private final IntroductionManager _introManager; private final ExpirePeerEvent _expireEvent; private final PeerTestEvent _testEvent; + private final PacketBuilder _destroyBuilder; private short _reachabilityStatus; private long _reachabilityStatusLastUpdated; private long _introducersSelectedOn; @@ -184,7 +185,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _peersByRemoteHost = new ConcurrentHashMap(128); _dropList = new ConcurrentHashSet(2); - // See comments in DQAT.java + // See comments in DummyThrottle.java if (USE_PRIORITY) { TimedWeightedPriorityMessageQueue mq = new TimedWeightedPriorityMessageQueue(ctx, PRIORITY_LIMITS, PRIORITY_WEIGHT, this); _outboundMessages = mq; @@ -200,6 +201,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _cachedBid[i] = new SharedBid(BID_VALUES[i]); } + _destroyBuilder = new PacketBuilder(_context, this); _fragments = new OutboundMessageFragments(_context, this, _activeThrottle); _inboundFragments = new InboundMessageFragments(_context, _fragments, this); if (SHOULD_FLOOD_PEERS) @@ -296,7 +298,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority if (_handler == null) _handler = new PacketHandler(_context, this, _endpoint, _establisher, _inboundFragments, _testManager, _introManager); - // See comments in DQAT.java + // See comments in DummyThrottle.java if (USE_PRIORITY && _refiller == null) _refiller = new OutboundRefiller(_context, _fragments, _outboundMessages); @@ -337,6 +339,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } public void shutdown() { + destroyAll(); if (_endpoint != null) _endpoint.shutdown(); if (_flooder != null) @@ -345,14 +348,18 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _refiller.shutdown(); if (_handler != null) _handler.shutdown(); - _fragments.shutdown(); if (_pusher != null) _pusher.shutdown(); + _fragments.shutdown(); if (_establisher != null) _establisher.shutdown(); _inboundFragments.shutdown(); _expireEvent.setIsAlive(false); _testEvent.setIsAlive(false); + _peersByRemoteHost.clear(); + _peersByIdent.clear(); + _dropList.clear(); + _introManager.reset(); } /** @@ -1011,12 +1018,53 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority */ } + /** + * This sends it directly out, bypassing OutboundMessageFragments + * and the PacketPusher. The only queueing is for the bandwidth limiter. + * + * @return ZERO (used to be number of packets in the queue) + */ int send(UDPPacket packet) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Sending packet " + packet); return _endpoint.send(packet); } + /** + * Send a session destroy message, bypassing OMF and PacketPusher. + * + * @since 0.8.9 + */ + private void sendDestroy(PeerState peer) { + // peer must be fully established + if (peer.getCurrentCipherKey() == null) + return; + UDPPacket pkt = _destroyBuilder.buildSessionDestroyPacket(peer); + if (_log.shouldLog(Log.WARN)) + _log.warn("Sending destroy to : " + peer); + send(pkt); + } + + /** + * Send a session destroy message to everybody + * + * @since 0.8.9 + */ + private void destroyAll() { + int howMany = _peersByIdent.size(); + if (_log.shouldLog(Log.WARN)) + _log.warn("Sending destroy to : " + howMany + " peers"); + for (PeerState peer : _peersByIdent.values()) { + sendDestroy(peer); + } + int toSleep = Math.min(howMany / 3, 750); + if (toSleep > 0) { + try { + Thread.sleep(toSleep); + } catch (InterruptedException ie) {} + } + } + /** minimum active peers to maintain IP detection, etc. */ private static final int MIN_PEERS = 3; /** minimum peers volunteering to be introducers if we need that */ @@ -1112,6 +1160,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority private static final int MIN_EXPIRE_TIMEOUT = 10*60*1000; public String getStyle() { return STYLE; } + @Override public void send(OutNetMessage msg) { if (msg == null) return; @@ -1151,7 +1200,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority if (_log.shouldLog(Log.DEBUG)) _log.debug("Add to fragments for " + to.toBase64()); - // See comments in DQAT.java + // See comments in DummyThrottle.java if (USE_PRIORITY) _outboundMessages.add(msg); else // skip the priority queue and go straight to the active pool @@ -1163,6 +1212,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _establisher.establish(msg); } } + void send(I2NPMessage msg, PeerState peer) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Injecting a data message to a new peer: " + peer); @@ -2234,8 +2284,10 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } } - for (int i = 0; i < _expireBuffer.size(); i++) - dropPeer(_expireBuffer.get(i), false, "idle too long"); + for (PeerState peer : _expireBuffer) { + sendDestroy(peer); + dropPeer(peer, false, "idle too long"); + } _expireBuffer.clear(); if (_alive)