propagate from branch 'i2p.i2p.zzz.test4' (head a1d80c1c396eaa49c7b46a69397b36fe9717ff2e)

to branch 'i2p.i2p' (head 7d00d6f11ce1172c218ce44b0a8ac28e4addf03d)
This commit is contained in:
zzz
2011-08-24 14:24:25 +00:00
22 changed files with 484 additions and 331 deletions

View File

@ -8,8 +8,6 @@ package net.i2p.router;
* *
*/ */
import java.util.Comparator;
import net.i2p.util.Log; import net.i2p.util.Log;
/** /**

View File

@ -14,13 +14,14 @@ import net.i2p.util.Log;
/** /**
* Blocking thread that is given peers by the inboundFragment pool, sending out * Blocking thread that is given peers by the inboundFragment pool, sending out
* any outstanding ACKs. * any outstanding ACKs.
* * The ACKs are sent directly to UDPSender,
* bypassing OutboundMessageFragments and PacketPusher.
*/ */
class ACKSender implements Runnable { class ACKSender implements Runnable {
private RouterContext _context; private final RouterContext _context;
private Log _log; private final Log _log;
private UDPTransport _transport; private final UDPTransport _transport;
private PacketBuilder _builder; private final PacketBuilder _builder;
/** list of peers (PeerState) who we have received data from but not yet ACKed to */ /** list of peers (PeerState) who we have received data from but not yet ACKed to */
private final BlockingQueue<PeerState> _peersToACK; private final BlockingQueue<PeerState> _peersToACK;
private boolean _alive; private boolean _alive;

View File

@ -72,6 +72,19 @@ class EstablishmentManager {
_context.statManager().createRateStat("udp.receiveIntroRelayResponse", "How long it took to receive a relay response", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.receiveIntroRelayResponse", "How long it took to receive a relay response", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.establishRejected", "How many pending outbound connections are there when we refuse to add any more?", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.establishRejected", "How many pending outbound connections are there when we refuse to add any more?", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.establishOverflow", "How many messages were queued up on a pending connection when it was too much?", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.establishOverflow", "How many messages were queued up on a pending connection when it was too much?", "udp", UDPTransport.RATES);
// following are for PeerState
_context.statManager().createRateStat("udp.congestionOccurred", "How large the cwin was when congestion occurred (duration == sendBps)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.congestedRTO", "retransmission timeout after congestion (duration == rtt dev)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendACKPartial", "Number of partial ACKs sent (duration == number of full ACKs in that ack packet)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendBps", "How fast we are transmitting when a packet is acked", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receiveBps", "How fast we are receiving when a packet is fully received (at most one per second)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.mtuIncrease", "How many retransmissions have there been to the peer when the MTU was increased (period is total packets transmitted)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.mtuDecrease", "How many retransmissions have there been to the peer when the MTU was decreased (period is total packets transmitted)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.rejectConcurrentActive", "How many messages are currently being sent to the peer when we reject it (period is how many concurrent packets we allow)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.allowConcurrentActive", "How many messages are currently being sent to the peer when we accept it (period is how many concurrent packets we allow)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.rejectConcurrentSequence", "How many consecutive concurrency rejections have we had when we stop rejecting (period is how many concurrent packets we are on)", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.queueDropSize", "How many messages were queued up when it was considered full, causing a tail drop?", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.queueAllowTotalLifetime", "When a peer is retransmitting and we probabalistically allow a new message, what is the sum of the pending message lifetimes? (period is the new message's lifetime)?", "udp", UDPTransport.RATES);
} }
public void startup() { public void startup() {
@ -318,6 +331,7 @@ class EstablishmentManager {
/** /**
* Got a SessionDestroy on an established conn * Got a SessionDestroy on an established conn
* @since 0.8.1
*/ */
void receiveSessionDestroy(RemoteHostId from, PeerState state) { void receiveSessionDestroy(RemoteHostId from, PeerState state) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@ -327,6 +341,7 @@ class EstablishmentManager {
/** /**
* Got a SessionDestroy during outbound establish * Got a SessionDestroy during outbound establish
* @since 0.8.1
*/ */
void receiveSessionDestroy(RemoteHostId from, OutboundEstablishState state) { void receiveSessionDestroy(RemoteHostId from, OutboundEstablishState state) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@ -338,6 +353,7 @@ class EstablishmentManager {
/** /**
* Got a SessionDestroy - maybe after an inbound establish * Got a SessionDestroy - maybe after an inbound establish
* @since 0.8.1
*/ */
void receiveSessionDestroy(RemoteHostId from) { void receiveSessionDestroy(RemoteHostId from) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))

View File

@ -18,14 +18,14 @@ import net.i2p.util.Log;
* *
*/ */
class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
private RouterContext _context; private final RouterContext _context;
private Log _log; private final Log _log;
/** list of message IDs recently received, so we can ignore in flight dups */ /** list of message IDs recently received, so we can ignore in flight dups */
private DecayingBloomFilter _recentlyCompletedMessages; private DecayingBloomFilter _recentlyCompletedMessages;
private OutboundMessageFragments _outbound; private final OutboundMessageFragments _outbound;
private UDPTransport _transport; private final UDPTransport _transport;
private ACKSender _ackSender; private final ACKSender _ackSender;
private MessageReceiver _messageReceiver; private final MessageReceiver _messageReceiver;
private boolean _alive; private boolean _alive;
/** decay the recently completed every 20 seconds */ /** decay the recently completed every 20 seconds */
@ -148,8 +148,8 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
from.messageFullyReceived(messageId, state.getCompleteSize()); from.messageFullyReceived(messageId, state.getCompleteSize());
_ackSender.ackPeer(from); _ackSender.ackPeer(from);
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.DEBUG))
_log.info("Message received completely! " + state); _log.debug("Message received completely! " + state);
_context.statManager().addRateData("udp.receivedCompleteTime", state.getLifetime(), state.getLifetime()); _context.statManager().addRateData("udp.receivedCompleteTime", state.getLifetime(), state.getLifetime());
if (state.getFragmentCount() > 0) if (state.getFragmentCount() > 0)
@ -158,7 +158,7 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
state.releaseResources(); state.releaseResources();
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Message expired while only being partially read: " + state); _log.warn("Message expired while only being partially read: " + state);
_context.messageHistory().droppedInboundMessage(state.getMessageId(), state.getFrom(), "expired hile partially read: " + state.toString()); _context.messageHistory().droppedInboundMessage(state.getMessageId(), state.getFrom(), "expired while partially read: " + state.toString());
} else if (partialACK) { } else if (partialACK) {
// not expired but not yet complete... lets queue up a partial ACK // not expired but not yet complete... lets queue up a partial ACK
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@ -174,10 +174,13 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
return fragments; return fragments;
} }
/**
* @return the number of bitfields in the ack? why?
*/
private int receiveACKs(PeerState from, UDPPacketReader.DataReader data) { private int receiveACKs(PeerState from, UDPPacketReader.DataReader data) {
int rv = 0; int rv = 0;
boolean newAck = false;
if (data.readACKsIncluded()) { if (data.readACKsIncluded()) {
int fragments = 0;
int ackCount = data.readACKCount(); int ackCount = data.readACKCount();
if (ackCount > 0) { if (ackCount > 0) {
rv += ackCount; rv += ackCount;
@ -186,9 +189,13 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
for (int i = 0; i < ackCount; i++) { for (int i = 0; i < ackCount; i++) {
long id = data.readACK(i); long id = data.readACK(i);
if (_log.shouldLog(Log.INFO)) if (from.acked(id)) {
_log.info("Full ACK of message " + id + " received!"); if (_log.shouldLog(Log.DEBUG))
fragments += _outbound.acked(id, from.getRemotePeer()); _log.debug("First full ACK of message " + id + " received from " + from.getRemotePeer());
newAck = true;
//} else if (_log.shouldLog(Log.DEBUG)) {
// _log.debug("Dup full ACK of message " + id + " received from " + from.getRemotePeer());
}
} }
} else { } else {
_log.error("Received ACKs with no acks?! " + data); _log.error("Received ACKs with no acks?! " + data);
@ -201,9 +208,13 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
//_context.statManager().getStatLog().addData(from.getRemoteHostId().toString(), "udp.peer.receivePartialACKCount", bitfields.length, 0); //_context.statManager().getStatLog().addData(from.getRemoteHostId().toString(), "udp.peer.receivePartialACKCount", bitfields.length, 0);
for (int i = 0; i < bitfields.length; i++) { for (int i = 0; i < bitfields.length; i++) {
if (_log.shouldLog(Log.INFO)) if (from.acked(bitfields[i])) {
_log.info("Partial ACK received: " + bitfields[i]); if (_log.shouldLog(Log.DEBUG))
_outbound.acked(bitfields[i], from.getRemotePeer()); _log.debug("Final partial ACK received: " + bitfields[i] + " from " + from.getRemotePeer());
newAck = true;
} else if (_log.shouldLog(Log.DEBUG)) {
_log.debug("Partial ACK received: " + bitfields[i] + " from " + from.getRemotePeer());
}
} }
} }
} }
@ -211,6 +222,13 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
from.ECNReceived(); from.ECNReceived();
else else
from.dataReceived(); from.dataReceived();
// Wake up the packet pusher if it is sleeping.
// By calling add(), this also is a failsafe against possible
// races in OutboundMessageFragments.
if (newAck && from.getOutboundMessageCount() > 0)
_outbound.add(from);
return rv; return rv;
} }
} }

View File

@ -12,20 +12,20 @@ import net.i2p.util.Log;
* *
*/ */
class InboundMessageState { class InboundMessageState {
private RouterContext _context; private final RouterContext _context;
private Log _log; private final Log _log;
private long _messageId; private final long _messageId;
private Hash _from; private final Hash _from;
/** /**
* indexed array of fragments for the message, where not yet * indexed array of fragments for the message, where not yet
* received fragments are null. * received fragments are null.
*/ */
private ByteArray _fragments[]; private final ByteArray _fragments[];
/** /**
* what is the last fragment in the message (or -1 if not yet known) * what is the last fragment in the message (or -1 if not yet known)
*/ */
private int _lastFragment; private int _lastFragment;
private long _receiveBegin; private final long _receiveBegin;
private int _completeSize; private int _completeSize;
private boolean _released; private boolean _released;
@ -33,7 +33,8 @@ class InboundMessageState {
private static final long MAX_RECEIVE_TIME = 10*1000; private static final long MAX_RECEIVE_TIME = 10*1000;
public static final int MAX_FRAGMENTS = 64; public static final int MAX_FRAGMENTS = 64;
private static final ByteCache _fragmentCache = ByteCache.getInstance(64, 2048); private static final int MAX_FRAGMENT_SIZE = UDPPacket.MAX_PACKET_SIZE;
private static final ByteCache _fragmentCache = ByteCache.getInstance(64, MAX_FRAGMENT_SIZE);
public InboundMessageState(RouterContext ctx, long messageId, Hash from) { public InboundMessageState(RouterContext ctx, long messageId, Hash from) {
_context = ctx; _context = ctx;
@ -153,10 +154,12 @@ class InboundMessageState {
} }
public void releaseResources() { public void releaseResources() {
if (_fragments != null) for (int i = 0; i < _fragments.length; i++) {
for (int i = 0; i < _fragments.length; i++) if (_fragments[i] != null) {
_fragmentCache.release(_fragments[i]); _fragmentCache.release(_fragments[i]);
//_fragments = null; _fragments[i] = null;
}
}
_released = true; _released = true;
} }
@ -178,7 +181,7 @@ class InboundMessageState {
buf.append(" completely received with "); buf.append(" completely received with ");
buf.append(getCompleteSize()).append(" bytes"); buf.append(getCompleteSize()).append(" bytes");
} else { } else {
for (int i = 0; (_fragments != null) && (i < _fragments.length); i++) { for (int i = 0; i < _lastFragment; i++) {
buf.append(" fragment ").append(i); buf.append(" fragment ").append(i);
if (_fragments[i] != null) if (_fragments[i] != null)
buf.append(": known at size ").append(_fragments[i].getValid()); buf.append(": known at size ").append(_fragments[i].getValid());

View File

@ -21,10 +21,10 @@ import net.i2p.util.Log;
* *
*/ */
class IntroductionManager { class IntroductionManager {
private RouterContext _context; private final RouterContext _context;
private Log _log; private final Log _log;
private UDPTransport _transport; private final UDPTransport _transport;
private PacketBuilder _builder; private final PacketBuilder _builder;
/** map of relay tag to PeerState that should receive the introduction */ /** map of relay tag to PeerState that should receive the introduction */
private final Map<Long, PeerState> _outbound; private final Map<Long, PeerState> _outbound;
/** list of peers (PeerState) who have given us introduction tags */ /** list of peers (PeerState) who have given us introduction tags */

View File

@ -20,9 +20,9 @@ import net.i2p.util.Log;
* {@link net.i2p.router.InNetMessagePool} by way of the {@link UDPTransport}. * {@link net.i2p.router.InNetMessagePool} by way of the {@link UDPTransport}.
*/ */
class MessageReceiver { class MessageReceiver {
private RouterContext _context; private final RouterContext _context;
private Log _log; private final Log _log;
private UDPTransport _transport; private final UDPTransport _transport;
/** list of messages (InboundMessageState) fully received but not interpreted yet */ /** list of messages (InboundMessageState) fully received but not interpreted yet */
private final BlockingQueue<InboundMessageState> _completeMessages; private final BlockingQueue<InboundMessageState> _completeMessages;
private boolean _alive; private boolean _alive;

View File

@ -50,7 +50,7 @@ class OutboundEstablishState {
private long _nextSend; private long _nextSend;
private RemoteHostId _remoteHostId; private RemoteHostId _remoteHostId;
private final RouterIdentity _remotePeer; private final RouterIdentity _remotePeer;
private SessionKey _introKey; private final SessionKey _introKey;
private final Queue<OutNetMessage> _queuedMessages; private final Queue<OutNetMessage> _queuedMessages;
private int _currentState; private int _currentState;
private long _introductionNonce; private long _introductionNonce;

View File

@ -1,13 +1,16 @@
package net.i2p.router.transport.udp; package net.i2p.router.transport.udp;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set;
import net.i2p.data.Hash; import net.i2p.data.Hash;
import net.i2p.data.RouterInfo; import net.i2p.data.RouterInfo;
import net.i2p.data.i2np.I2NPMessage; import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.OutNetMessage; import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.Log; import net.i2p.util.Log;
/** /**
@ -23,16 +26,33 @@ import net.i2p.util.Log;
* *
*/ */
class OutboundMessageFragments { class OutboundMessageFragments {
private RouterContext _context; private final RouterContext _context;
private Log _log; private final Log _log;
private UDPTransport _transport; private final UDPTransport _transport;
// private ActiveThrottle _throttle; // LINT not used ?? // private ActiveThrottle _throttle; // LINT not used ??
/** peers we are actively sending messages to */
private final List<PeerState> _activePeers; /**
* Peers we are actively sending messages to.
* We use the iterator so we treat it like a list,
* but we use a HashSet so remove() is fast and
* we don't need to do contains().
* Even though most (but NOT all) accesses are synchronized,
* we use a ConcurrentHashSet as the iterator is long-lived.
*/
private final Set<PeerState> _activePeers;
/**
* The long-lived iterator over _activePeers.
*/
private Iterator<PeerState> _iterator;
/**
* Avoid sync in add() if possible (not 100% reliable)
*/
private boolean _isWaiting;
private boolean _alive; private boolean _alive;
/** which peer should we build the next packet out of? */ private final PacketBuilder _builder;
private int _nextPeer;
private PacketBuilder _builder;
private long _lastCycleTime = System.currentTimeMillis(); private long _lastCycleTime = System.currentTimeMillis();
/** if we can handle more messages explicitly, set this to true */ /** if we can handle more messages explicitly, set this to true */
@ -42,13 +62,14 @@ class OutboundMessageFragments {
// private static final int MAX_ACTIVE = 64; // not used. // private static final int MAX_ACTIVE = 64; // not used.
// don't send a packet more than 10 times // don't send a packet more than 10 times
static final int MAX_VOLLEYS = 10; static final int MAX_VOLLEYS = 10;
private static final int MAX_WAIT = 1000;
public OutboundMessageFragments(RouterContext ctx, UDPTransport transport, ActiveThrottle throttle) { public OutboundMessageFragments(RouterContext ctx, UDPTransport transport, ActiveThrottle throttle) {
_context = ctx; _context = ctx;
_log = ctx.logManager().getLog(OutboundMessageFragments.class); _log = ctx.logManager().getLog(OutboundMessageFragments.class);
_transport = transport; _transport = transport;
// _throttle = throttle; // _throttle = throttle;
_activePeers = new ArrayList(256); _activePeers = new ConcurrentHashSet(256);
_builder = new PacketBuilder(ctx, transport); _builder = new PacketBuilder(ctx, transport);
_alive = true; _alive = true;
// _allowExcess = false; // _allowExcess = false;
@ -59,6 +80,7 @@ class OutboundMessageFragments {
_context.statManager().createRateStat("udp.sendFailed", "How many sends a failed message was pushed", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.sendFailed", "How many sends a failed message was pushed", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendAggressiveFailed", "How many volleys was a packet sent before we gave up", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.sendAggressiveFailed", "How many volleys was a packet sent before we gave up", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.outboundActiveCount", "How many messages are in the peer's active pool", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.outboundActiveCount", "How many messages are in the peer's active pool", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.outboundActivePeers", "How many peers we are actively sending to", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendRejected", "What volley are we on when the peer was throttled (time == message lifetime)", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.sendRejected", "What volley are we on when the peer was throttled (time == message lifetime)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.partialACKReceived", "How many fragments were partially ACKed (time == message lifetime)", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.partialACKReceived", "How many fragments were partially ACKed (time == message lifetime)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendSparse", "How many fragments were partially ACKed and hence not resent (time == message lifetime)", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.sendSparse", "How many fragments were partially ACKed and hence not resent (time == message lifetime)", "udp", UDPTransport.RATES);
@ -72,20 +94,20 @@ class OutboundMessageFragments {
} }
public void startup() { _alive = true; } public void startup() { _alive = true; }
public void shutdown() { public void shutdown() {
_alive = false; _alive = false;
_activePeers.clear();
synchronized (_activePeers) { synchronized (_activePeers) {
_activePeers.notifyAll(); _activePeers.notifyAll();
} }
} }
void dropPeer(PeerState peer) { void dropPeer(PeerState peer) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Dropping peer " + peer.getRemotePeer().toBase64()); _log.info("Dropping peer " + peer.getRemotePeer().toBase64());
peer.dropOutbound(); peer.dropOutbound();
synchronized (_activePeers) { _activePeers.remove(peer);
_activePeers.remove(peer);
_activePeers.notifyAll();
}
} }
/** /**
@ -145,24 +167,12 @@ class OutboundMessageFragments {
return; return;
} }
int active = peer.add(state); int active = peer.add(state);
synchronized (_activePeers) { add(peer);
if (!_activePeers.contains(peer)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Add a new message to a new peer " + peer.getRemotePeer().toBase64());
_activePeers.add(peer);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Add a new message to an existing peer " + peer.getRemotePeer().toBase64());
}
_activePeers.notifyAll();
}
//msg.timestamp("made active along with: " + active);
_context.statManager().addRateData("udp.outboundActiveCount", active, 0); _context.statManager().addRateData("udp.outboundActiveCount", active, 0);
} else { } else {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Error initializing " + msg); _log.warn("Error initializing " + msg);
} }
//finishMessages();
} }
/** /**
@ -174,149 +184,186 @@ class OutboundMessageFragments {
if (peer == null) if (peer == null)
throw new RuntimeException("wtf, null peer for " + state); throw new RuntimeException("wtf, null peer for " + state);
int active = peer.add(state); int active = peer.add(state);
synchronized (_activePeers) { add(peer);
if (!_activePeers.contains(peer)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Add a new message to a new peer " + peer.getRemotePeer().toBase64());
if (_activePeers.isEmpty())
_lastCycleTime = System.currentTimeMillis();
_activePeers.add(peer);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Add a new message to an existing peer " + peer.getRemotePeer().toBase64());
}
_activePeers.notifyAll();
}
_context.statManager().addRateData("udp.outboundActiveCount", active, 0); _context.statManager().addRateData("udp.outboundActiveCount", active, 0);
// should we finish messages here too? }
/*
synchronized (_activeMessages) { /**
_activeMessages.add(state); * Add the peer to the list of peers wanting to transmit something.
if (_activeMessages.size() == 1) * This wakes up the packet pusher if it is sleeping.
*
* Avoid synchronization where possible.
* There are small chances of races.
* There are larger chances of adding the PeerState "behind" where
* the iterator is now... but these issues are the same as before concurrentification.
*
* @since 0.8.9
*/
public void add(PeerState peer) {
boolean wasEmpty = _activePeers.isEmpty();
boolean added = _activePeers.add(peer);
if (added) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Add a new message to a new peer " + peer.getRemotePeer().toBase64());
if (wasEmpty)
_lastCycleTime = System.currentTimeMillis(); _lastCycleTime = System.currentTimeMillis();
_activeMessages.notifyAll(); } else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Add a new message to an existing peer " + peer.getRemotePeer().toBase64());
}
_context.statManager().addRateData("udp.outboundActivePeers", _activePeers.size(), 0);
// Avoid sync if possible
// no, this doesn't always work.
// Also note that the iterator in getNextVolley may have alreay passed us,
// or not reflect the addition.
if (_isWaiting || wasEmpty) {
synchronized (_activePeers) {
_activePeers.notifyAll();
}
} }
*/
} }
/** /**
* Remove any expired or complete messages * Remove any expired or complete messages
*/ */
/****
private void finishMessages() { private void finishMessages() {
int rv = 0; for (Iterator<PeerState> iter = _activePeers.iterator(); iter.hasNext(); ) {
List peers = null; PeerState state = iter.next();
synchronized (_activePeers) { if (state.getOutboundMessageCount() <= 0) {
peers = new ArrayList(_activePeers.size()); iter.remove();
for (int i = 0; i < _activePeers.size(); i++) { } else {
PeerState state = _activePeers.get(i); int remaining = state.finishMessages();
if (state.getOutboundMessageCount() <= 0) { if (remaining <= 0) {
_activePeers.remove(i); if (_log.shouldLog(Log.DEBUG))
i--; _log.debug("No more pending messages for " + state.getRemotePeer().toBase64());
} else { iter.remove();
peers.add(state); }
} }
} }
_activePeers.notifyAll(); }
} ****/
for (int i = 0; i < peers.size(); i++) {
PeerState state = (PeerState)peers.get(i);
int remaining = state.finishMessages();
if (remaining <= 0) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("No more pending messages for " + state.getRemotePeer().toBase64());
}
rv += remaining;
}
}
/** /**
* Fetch all the packets for a message volley, blocking until there is a * Fetch all the packets for a message volley, blocking until there is a
* message which can be fully transmitted (or the transport is shut down). * message which can be fully transmitted (or the transport is shut down).
* The returned array may be sparse, with null packets taking the place of * The returned array may be sparse, with null packets taking the place of
* already ACKed fragments. * already ACKed fragments.
* *
* NOT thread-safe. Called by the PacketPusher thread only.
*
* @return null only on shutdown
*/ */
public UDPPacket[] getNextVolley() { public UDPPacket[] getNextVolley() {
PeerState peer = null; PeerState peer = null;
OutboundMessageState state = null; OutboundMessageState state = null;
// Keep track of how many we've looked at, since we don't start the iterator at the beginning.
int peersProcessed = 0;
while (_alive && (state == null) ) { while (_alive && (state == null) ) {
long now = _context.clock().now(); int nextSendDelay = Integer.MAX_VALUE;
int nextSendDelay = -1; // no, not every time - O(n**2) - do just before waiting below
finishMessages(); //finishMessages();
try {
synchronized (_activePeers) { // do we need a new long-lived iterator?
for (int i = 0; i < _activePeers.size(); i++) { if (_iterator == null ||
int cur = (i + _nextPeer) % _activePeers.size(); ((!_activePeers.isEmpty()) && (!_iterator.hasNext()))) {
if (cur == 0) { _iterator = _activePeers.iterator();
// FIXME or delete, these stats aren't much help since they include the sleep time }
long ts = System.currentTimeMillis();
long cycleTime = ts - _lastCycleTime; // Go through all the peers that we are actively sending messages to.
_lastCycleTime = ts; // Call finishMessages() for each one, and remove them from the iterator
_context.statManager().addRateData("udp.sendCycleTime", cycleTime, _activePeers.size()); // if there is nothing left to send.
// make longer than the default sleep time below // Otherwise, return the volley to be sent.
if (cycleTime > 1100) // Otherwise, wait()
_context.statManager().addRateData("udp.sendCycleTimeSlow", cycleTime, _activePeers.size()); while (_iterator.hasNext()) {
peer = _iterator.next();
int remaining = peer.finishMessages();
if (remaining <= 0) {
// race with add()
_iterator.remove();
if (_log.shouldLog(Log.DEBUG))
_log.debug("No more pending messages for " + peer.getRemotePeer().toBase64());
continue;
} }
peer = _activePeers.get(i); peersProcessed++;
state = peer.allocateSend(); state = peer.allocateSend();
if (state != null) { if (state != null) {
// we have something to send and we will be returning it // we have something to send and we will be returning it
_nextPeer = i + 1; break;
} else if (peersProcessed >= _activePeers.size()) {
// we've gone all the way around, time to sleep
break; break;
} else { } else {
// Update the minimum delay for all peers (getNextDelay() returns 1 for "now") // Update the minimum delay for all peers
// which will be used if we found nothing to send across all peers // which will be used if we found nothing to send across all peers
int delay = peer.getNextDelay(); int delay = peer.getNextDelay();
if ( (nextSendDelay <= 0) || (delay < nextSendDelay) ) if (delay < nextSendDelay)
nextSendDelay = delay; nextSendDelay = delay;
peer = null; peer = null;
state = null;
} }
} }
if (_log.shouldLog(Log.DEBUG))
if (peer != null && _log.shouldLog(Log.DEBUG))
_log.debug("Done looping, next peer we are sending for: " + _log.debug("Done looping, next peer we are sending for: " +
(peer != null ? peer.getRemotePeer().toBase64() : "none")); peer.getRemotePeer().toBase64());
if (state == null) {
// if we've gone all the way through the loop, wait
// ... unless nextSendDelay says we have more ready now
if (state == null && peersProcessed >= _activePeers.size() && nextSendDelay > 0) {
_isWaiting = true;
peersProcessed = 0;
// why? we do this in the loop one at a time
//finishMessages();
// wait a min of 10 and a max of MAX_WAIT ms no matter what peer.getNextDelay() says
// use max of 1 second so finishMessages() and/or PeerState.finishMessages()
// gets called regularly
int toWait = Math.min(Math.max(nextSendDelay, 10), MAX_WAIT);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("wait for " + nextSendDelay); _log.debug("wait for " + toWait);
// wait.. or somethin' // wait.. or somethin'
// wait a min of 10 and a max of 3000 ms no matter what peer.getNextDelay() says synchronized (_activePeers) {
if (nextSendDelay > 0) try {
_activePeers.wait(Math.min(Math.max(nextSendDelay, 10), 3000)); _activePeers.wait(toWait);
else } catch (InterruptedException ie) {
_activePeers.wait(1000); // noop
} else { if (_log.shouldLog(Log.DEBUG))
if (_log.shouldLog(Log.DEBUG)) _log.debug("Woken up while waiting");
_log.debug("dont wait: alive=" + _alive + " state = " + state); }
}
_isWaiting = false;
//} else {
// if (_log.shouldLog(Log.DEBUG))
// _log.debug("dont wait: alive=" + _alive + " state = " + state);
} }
}
} catch (InterruptedException ie) { } // while alive && state == null
// noop
if (_log.shouldLog(Log.DEBUG))
_log.debug("Woken up while waiting");
}
}
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending " + state); _log.debug("Sending " + state);
UDPPacket packets[] = preparePackets(state, peer); UDPPacket packets[] = preparePackets(state, peer);
/****
if ( (state != null) && (state.getMessage() != null) ) { if ( (state != null) && (state.getMessage() != null) ) {
int valid = 0; int valid = 0;
for (int i = 0; packets != null && i < packets.length ; i++) for (int i = 0; packets != null && i < packets.length ; i++)
if (packets[i] != null) if (packets[i] != null)
valid++; valid++;
/*
state.getMessage().timestamp("sending a volley of " + valid state.getMessage().timestamp("sending a volley of " + valid
+ " lastReceived: " + " lastReceived: "
+ (_context.clock().now() - peer.getLastReceiveTime()) + (_context.clock().now() - peer.getLastReceiveTime())
+ " lastSentFully: " + " lastSentFully: "
+ (_context.clock().now() - peer.getLastSendFullyTime())); + (_context.clock().now() - peer.getLastSendFullyTime()));
*/
} }
****/
return packets; return packets;
} }
/**
* @return null if state or peer is null
*/
private UDPPacket[] preparePackets(OutboundMessageState state, PeerState peer) { private UDPPacket[] preparePackets(OutboundMessageState state, PeerState peer) {
if ( (state != null) && (peer != null) ) { if ( (state != null) && (peer != null) ) {
int fragments = state.getFragmentCount(); int fragments = state.getFragmentCount();
@ -397,37 +444,6 @@ class OutboundMessageFragments {
} }
} }
/**
* We received an ACK of the given messageId from the given peer, so if it
* is still unacked, mark it as complete.
*
* @return fragments acked
*/
public int acked(long messageId, Hash ackedBy) {
PeerState peer = _transport.getPeerState(ackedBy);
if (peer != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("acked [" + messageId + "] by " + ackedBy.toBase64());
return peer.acked(messageId);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("acked [" + messageId + "] by an unknown remote peer? " + ackedBy.toBase64());
return 0;
}
}
public void acked(ACKBitfield bitfield, Hash ackedBy) {
PeerState peer = _transport.getPeerState(ackedBy);
if (peer != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("partial acked [" + bitfield + "] by " + ackedBy.toBase64());
peer.acked(bitfield);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("partial acked [" + bitfield + "] by an unknown remote peer? " + ackedBy.toBase64());
}
}
public interface ActiveThrottle { public interface ActiveThrottle {
public void choke(Hash peer); public void choke(Hash peer);
public void unchoke(Hash peer); public void unchoke(Hash peer);

View File

@ -12,12 +12,12 @@ import net.i2p.util.ByteCache;
import net.i2p.util.Log; import net.i2p.util.Log;
/** /**
* Maintain the outbound fragmentation for resending * Maintain the outbound fragmentation for resending, for a single message.
* *
*/ */
class OutboundMessageState { class OutboundMessageState {
private I2PAppContext _context; private final I2PAppContext _context;
private Log _log; private final Log _log;
/** may be null if we are part of the establishment */ /** may be null if we are part of the establishment */
private OutNetMessage _message; private OutNetMessage _message;
private long _messageId; private long _messageId;
@ -49,6 +49,7 @@ class OutboundMessageState {
_log = _context.logManager().getLog(OutboundMessageState.class); _log = _context.logManager().getLog(OutboundMessageState.class);
} }
/****
public boolean initialize(OutNetMessage msg) { public boolean initialize(OutNetMessage msg) {
if (msg == null) return false; if (msg == null) return false;
try { try {
@ -60,7 +61,11 @@ class OutboundMessageState {
return false; return false;
} }
} }
****/
/**
* Called from UDPTransport
*/
public boolean initialize(I2NPMessage msg, PeerState peer) { public boolean initialize(I2NPMessage msg, PeerState peer) {
if (msg == null) if (msg == null)
return false; return false;
@ -75,6 +80,9 @@ class OutboundMessageState {
} }
} }
/**
* Called from OutboundMessageFragments
*/
public boolean initialize(OutNetMessage m, I2NPMessage msg) { public boolean initialize(OutNetMessage m, I2NPMessage msg) {
if ( (m == null) || (msg == null) ) if ( (m == null) || (msg == null) )
return false; return false;
@ -198,6 +206,7 @@ class OutboundMessageState {
sends[i] = (short)-1; sends[i] = (short)-1;
boolean rv = isComplete(); boolean rv = isComplete();
/****
if (!rv && false) { // don't do the fast retransmit... lets give it time to get ACKed if (!rv && false) { // don't do the fast retransmit... lets give it time to get ACKed
long nextTime = _context.clock().now() + Math.max(_peer.getRTT(), ACKSender.ACK_FREQUENCY); long nextTime = _context.clock().now() + Math.max(_peer.getRTT(), ACKSender.ACK_FREQUENCY);
//_nextSendTime = Math.max(now, _startedOn+PeerState.MIN_RTO); //_nextSendTime = Math.max(now, _startedOn+PeerState.MIN_RTO);
@ -210,6 +219,7 @@ class OutboundMessageState {
// _nextSendTime = now + 100; // _nextSendTime = now + 100;
//_nextSendTime = now; //_nextSendTime = now;
} }
****/
return rv; return rv;
} }

View File

@ -95,9 +95,9 @@ around briefly, to address packet loss and reordering.</p>
* *
*/ */
class PacketBuilder { class PacketBuilder {
private I2PAppContext _context; private final I2PAppContext _context;
private Log _log; private final Log _log;
private UDPTransport _transport; private final UDPTransport _transport;
private static final ByteCache _ivCache = ByteCache.getInstance(64, UDPPacket.IV_SIZE); private static final ByteCache _ivCache = ByteCache.getInstance(64, UDPPacket.IV_SIZE);
private static final ByteCache _hmacCache = ByteCache.getInstance(64, Hash.HASH_LENGTH); private static final ByteCache _hmacCache = ByteCache.getInstance(64, Hash.HASH_LENGTH);
@ -656,6 +656,8 @@ class PacketBuilder {
/** /**
* Build a destroy packet, which contains a header but no body. * Build a destroy packet, which contains a header but no body.
* Session must be established or this will NPE in authenticate().
* Unused until 0.8.9.
* *
* @since 0.8.1 * @since 0.8.1
*/ */

View File

@ -372,8 +372,8 @@ class PacketHandler {
if (state.getMACKey() != null) { if (state.getMACKey() != null) {
isValid = packet.validate(state.getMACKey()); isValid = packet.validate(state.getMACKey());
if (isValid) { if (isValid) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.INFO))
_log.warn("Valid introduction packet received for inbound con: " + packet); _log.info("Valid introduction packet received for inbound con: " + packet);
_state = 32; _state = 32;
packet.decrypt(state.getCipherKey()); packet.decrypt(state.getCipherKey());
@ -418,8 +418,8 @@ class PacketHandler {
_state = 36; _state = 36;
isValid = packet.validate(state.getMACKey()); isValid = packet.validate(state.getMACKey());
if (isValid) { if (isValid) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.INFO))
_log.warn("Valid introduction packet received for outbound established con: " + packet); _log.info("Valid introduction packet received for outbound established con: " + packet);
_state = 37; _state = 37;
packet.decrypt(state.getCipherKey()); packet.decrypt(state.getCipherKey());
@ -432,8 +432,8 @@ class PacketHandler {
// keys not yet exchanged, lets try it with the peer's intro key // keys not yet exchanged, lets try it with the peer's intro key
isValid = packet.validate(state.getIntroKey()); isValid = packet.validate(state.getIntroKey());
if (isValid) { if (isValid) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.INFO))
_log.warn("Valid introduction packet received for outbound established con with old intro key: " + packet); _log.info("Valid introduction packet received for outbound established con with old intro key: " + packet);
_state = 39; _state = 39;
packet.decrypt(state.getIntroKey()); packet.decrypt(state.getIntroKey());
handlePacket(reader, packet, null, state, null); handlePacket(reader, packet, null, state, null);

View File

@ -11,10 +11,10 @@ import net.i2p.util.Log;
*/ */
class PacketPusher implements Runnable { class PacketPusher implements Runnable {
// private RouterContext _context; // private RouterContext _context;
private Log _log; private final Log _log;
private OutboundMessageFragments _fragments; private final OutboundMessageFragments _fragments;
private UDPSender _sender; private final UDPSender _sender;
private boolean _alive; private volatile boolean _alive;
public PacketPusher(RouterContext ctx, OutboundMessageFragments fragments, UDPSender sender) { public PacketPusher(RouterContext ctx, OutboundMessageFragments fragments, UDPSender sender) {
// _context = ctx; // _context = ctx;

View File

@ -24,8 +24,8 @@ import net.i2p.util.ConcurrentHashSet;
* *
*/ */
class PeerState { class PeerState {
private RouterContext _context; private final RouterContext _context;
private Log _log; private final Log _log;
/** /**
* The peer are we talking to. This should be set as soon as this * The peer are we talking to. This should be set as soon as this
* state is created if we are initiating a connection, but if we are * state is created if we are initiating a connection, but if we are
@ -157,7 +157,7 @@ class PeerState {
/* how many consecutive packets at or under the min MTU have been received */ /* how many consecutive packets at or under the min MTU have been received */
private long _consecutiveSmall; private long _consecutiveSmall;
/** when did we last check the MTU? */ /** when did we last check the MTU? */
private long _mtuLastChecked; //private long _mtuLastChecked;
private long _mtuIncreases; private long _mtuIncreases;
private long _mtuDecreases; private long _mtuDecreases;
/** current round trip time estimate */ /** current round trip time estimate */
@ -192,7 +192,7 @@ class PeerState {
/** which outbound message is currently being retransmitted */ /** which outbound message is currently being retransmitted */
private OutboundMessageState _retransmitter; private OutboundMessageState _retransmitter;
private UDPTransport _transport; private final UDPTransport _transport;
/** have we migrated away from this peer to another newer one? */ /** have we migrated away from this peer to another newer one? */
private volatile boolean _dead; private volatile boolean _dead;
@ -224,7 +224,7 @@ class PeerState {
* of 608 * of 608
* *
* Well, we really need to count the acks as well, especially * Well, we really need to count the acks as well, especially
* 4 * MAX_RESEND_ACKS which can take up a significant amount of space. * 1 + (4 * MAX_RESEND_ACKS_SMALL) which can take up a significant amount of space.
* We reduce the max acks when using the small MTU but it may not be enough... * We reduce the max acks when using the small MTU but it may not be enough...
* *
*/ */
@ -234,8 +234,14 @@ class PeerState {
* based on measurements, 1350 fits nearly all reasonably small I2NP messages * based on measurements, 1350 fits nearly all reasonably small I2NP messages
* (larger I2NP messages may be up to 1900B-4500B, which isn't going to fit * (larger I2NP messages may be up to 1900B-4500B, which isn't going to fit
* into a live network MTU anyway) * into a live network MTU anyway)
*
* TODO
* VTBM is 2646, it would be nice to fit in two large
* 2646 / 2 = 1323
* 1323 + 74 + 46 + 1 + (4 * 9) = 1480
* So why not make it 1492 (old ethernet is 1492, new is 1500)
*/ */
private static final int LARGE_MTU = 1350; private static final int LARGE_MTU = 1492;
private static final int MIN_RTO = 100 + ACKSender.ACK_FREQUENCY; private static final int MIN_RTO = 100 + ACKSender.ACK_FREQUENCY;
private static final int MAX_RTO = 3000; // 5000; private static final int MAX_RTO = 3000; // 5000;
@ -261,25 +267,14 @@ class PeerState {
_remotePort = -1; _remotePort = -1;
_mtu = getDefaultMTU(); _mtu = getDefaultMTU();
_mtuReceive = _mtu; _mtuReceive = _mtu;
_mtuLastChecked = -1; //_mtuLastChecked = -1;
_lastACKSend = -1; _lastACKSend = -1;
_rto = MIN_RTO; _rto = MIN_RTO;
_rtt = _rto/2; _rtt = _rto/2;
_rttDeviation = _rtt; _rttDeviation = _rtt;
_inboundMessages = new HashMap(8); _inboundMessages = new HashMap(8);
_outboundMessages = new ArrayList(32); _outboundMessages = new ArrayList(32);
_context.statManager().createRateStat("udp.congestionOccurred", "How large the cwin was when congestion occurred (duration == sendBps)", "udp", UDPTransport.RATES); // all createRateStat() moved to EstablishmentManager
_context.statManager().createRateStat("udp.congestedRTO", "retransmission timeout after congestion (duration == rtt dev)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendACKPartial", "Number of partial ACKs sent (duration == number of full ACKs in that ack packet)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendBps", "How fast we are transmitting when a packet is acked", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receiveBps", "How fast we are receiving when a packet is fully received (at most one per second)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.mtuIncrease", "How many retransmissions have there been to the peer when the MTU was increased (period is total packets transmitted)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.mtuDecrease", "How many retransmissions have there been to the peer when the MTU was decreased (period is total packets transmitted)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.rejectConcurrentActive", "How many messages are currently being sent to the peer when we reject it (period is how many concurrent packets we allow)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.allowConcurrentActive", "How many messages are currently being sent to the peer when we accept it (period is how many concurrent packets we allow)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.rejectConcurrentSequence", "How many consecutive concurrency rejections have we had when we stop rejecting (period is how many concurrent packets we are on)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.queueDropSize", "How many messages were queued up when it was considered full, causing a tail drop?", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.queueAllowTotalLifetime", "When a peer is retransmitting and we probabalistically allow a new message, what is the sum of the pending message lifetimes? (period is the new message's lifetime)?", "udp", UDPTransport.RATES);
} }
private int getDefaultMTU() { private int getDefaultMTU() {
@ -381,13 +376,20 @@ class PeerState {
public long getTheyRelayToUsAs() { return _theyRelayToUsAs; } public long getTheyRelayToUsAs() { return _theyRelayToUsAs; }
/** what is the largest packet we can send to the peer? */ /** what is the largest packet we can send to the peer? */
public int getMTU() { return _mtu; } public int getMTU() { return _mtu; }
/** estimate how large the other side is sending packets */
/**
* Estimate how large the other side's MTU is.
* This could be wrong.
* It is used only for the HTML status.
*/
public int getReceiveMTU() { return _mtuReceive; } public int getReceiveMTU() { return _mtuReceive; }
/** when did we last check the MTU? */ /** when did we last check the MTU? */
/****
public long getMTULastChecked() { return _mtuLastChecked; } public long getMTULastChecked() { return _mtuLastChecked; }
public long getMTUIncreases() { return _mtuIncreases; } public long getMTUIncreases() { return _mtuIncreases; }
public long getMTUDecreases() { return _mtuDecreases; } public long getMTUDecreases() { return _mtuDecreases; }
****/
/** /**
* The peer are we talking to. This should be set as soon as this * The peer are we talking to. This should be set as soon as this
@ -548,11 +550,15 @@ class PeerState {
* we can use to publish that fact. * we can use to publish that fact.
*/ */
public void setTheyRelayToUsAs(long tag) { _theyRelayToUsAs = tag; } public void setTheyRelayToUsAs(long tag) { _theyRelayToUsAs = tag; }
/** what is the largest packet we can send to the peer? */ /** what is the largest packet we can send to the peer? */
/****
public void setMTU(int mtu) { public void setMTU(int mtu) {
_mtu = mtu; _mtu = mtu;
_mtuLastChecked = _context.clock().now(); _mtuLastChecked = _context.clock().now();
} }
****/
public int getSlowStartThreshold() { return _slowStartThreshold; } public int getSlowStartThreshold() { return _slowStartThreshold; }
public int getConcurrentSends() { return _concurrentMessagesActive; } public int getConcurrentSends() { return _concurrentMessagesActive; }
public int getConcurrentSendWindow() { return _concurrentMessagesAllowed; } public int getConcurrentSendWindow() { return _concurrentMessagesAllowed; }
@ -990,15 +996,21 @@ class PeerState {
public long getPacketRetransmissionRate() { return _packetRetransmissionRate; } public long getPacketRetransmissionRate() { return _packetRetransmissionRate; }
public long getPacketsReceived() { return _packetsReceived; } public long getPacketsReceived() { return _packetsReceived; }
public long getPacketsReceivedDuplicate() { return _packetsReceivedDuplicate; } public long getPacketsReceivedDuplicate() { return _packetsReceivedDuplicate; }
private static final int MTU_RCV_DISPLAY_THRESHOLD = 20;
public void packetReceived(int size) { public void packetReceived(int size) {
_packetsReceived++; _packetsReceived++;
if (size <= MIN_MTU) if (size <= MIN_MTU) {
_consecutiveSmall++; _consecutiveSmall++;
else } else {
_consecutiveSmall = 0; _consecutiveSmall = 0;
_mtuReceive = LARGE_MTU;
return;
}
if (_packetsReceived > 50) { if (_packetsReceived > MTU_RCV_DISPLAY_THRESHOLD) {
if (_consecutiveSmall < 50) if (_consecutiveSmall < MTU_RCV_DISPLAY_THRESHOLD)
_mtuReceive = LARGE_MTU; _mtuReceive = LARGE_MTU;
else else
_mtuReceive = MIN_MTU; _mtuReceive = MIN_MTU;
@ -1061,7 +1073,6 @@ class PeerState {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Adding to " + _remotePeer.toBase64() + ": " + state.getMessageId()); _log.debug("Adding to " + _remotePeer.toBase64() + ": " + state.getMessageId());
List<OutboundMessageState> msgs = _outboundMessages; List<OutboundMessageState> msgs = _outboundMessages;
if (msgs == null) return 0;
int rv = 0; int rv = 0;
boolean fail = false; boolean fail = false;
synchronized (msgs) { synchronized (msgs) {
@ -1070,11 +1081,14 @@ class PeerState {
// 32 queued messages? to *one* peer? nuh uh. // 32 queued messages? to *one* peer? nuh uh.
fail = true; fail = true;
rv--; rv--;
/******* proactive tail drop disabled by jr 2006-04-19 so all this is pointless
} else if (_retransmitter != null) { } else if (_retransmitter != null) {
long lifetime = _retransmitter.getLifetime(); long lifetime = _retransmitter.getLifetime();
long totalLifetime = lifetime; long totalLifetime = lifetime;
for (int i = 1; i < msgs.size(); i++) { // skip the first, as thats the retransmitter for (int i = 1; i < msgs.size(); i++) { // skip the first, as thats the retransmitter
OutboundMessageState cur = (OutboundMessageState)msgs.get(i); OutboundMessageState cur = msgs.get(i);
totalLifetime += cur.getLifetime(); totalLifetime += cur.getLifetime();
} }
long remaining = -1; long remaining = -1;
@ -1103,6 +1117,9 @@ class PeerState {
_context.statManager().addRateData("udp.queueAllowTotalLifetime", totalLifetime, lifetime); _context.statManager().addRateData("udp.queueAllowTotalLifetime", totalLifetime, lifetime);
msgs.add(state); msgs.add(state);
} }
*******/
} else { } else {
msgs.add(state); msgs.add(state);
} }
@ -1111,6 +1128,7 @@ class PeerState {
_transport.failed(state, false); _transport.failed(state, false);
return rv; return rv;
} }
/** drop all outbound messages */ /** drop all outbound messages */
public void dropOutbound() { public void dropOutbound() {
//if (_dead) return; //if (_dead) return;
@ -1118,7 +1136,7 @@ class PeerState {
List<OutboundMessageState> msgs = _outboundMessages; List<OutboundMessageState> msgs = _outboundMessages;
//_outboundMessages = null; //_outboundMessages = null;
_retransmitter = null; _retransmitter = null;
if (msgs != null) {
int sz = 0; int sz = 0;
List<OutboundMessageState> tempList = null; List<OutboundMessageState> tempList = null;
synchronized (msgs) { synchronized (msgs) {
@ -1130,21 +1148,17 @@ class PeerState {
} }
for (int i = 0; i < sz; i++) for (int i = 0; i < sz; i++)
_transport.failed(tempList.get(i), false); _transport.failed(tempList.get(i), false);
}
// so the ACKSender will drop this peer from its queue // so the ACKSender will drop this peer from its queue
_wantACKSendSince = -1; _wantACKSendSince = -1;
} }
/**
* @return number of active outbound messages remaining (unsynchronized)
*/
public int getOutboundMessageCount() { public int getOutboundMessageCount() {
List<OutboundMessageState> msgs = _outboundMessages;
if (_dead) return 0; if (_dead) return 0;
if (msgs != null) { return _outboundMessages.size();
synchronized (msgs) {
return msgs.size();
}
} else {
return 0;
}
} }
/** /**
@ -1152,39 +1166,37 @@ class PeerState {
* @return number of active outbound messages remaining * @return number of active outbound messages remaining
*/ */
public int finishMessages() { public int finishMessages() {
int rv = 0;
List<OutboundMessageState> msgs = _outboundMessages; List<OutboundMessageState> msgs = _outboundMessages;
// short circuit, unsynchronized
if (msgs.isEmpty())
return 0;
if (_dead) { if (_dead) {
dropOutbound(); dropOutbound();
return 0; return 0;
} }
int rv = 0;
List<OutboundMessageState> succeeded = null; List<OutboundMessageState> succeeded = null;
List<OutboundMessageState> failed = null; List<OutboundMessageState> failed = null;
synchronized (msgs) { synchronized (msgs) {
int size = msgs.size(); for (Iterator<OutboundMessageState> iter = msgs.iterator(); iter.hasNext(); ) {
for (int i = 0; i < size; i++) { OutboundMessageState state = iter.next();
OutboundMessageState state = msgs.get(i);
if (state.isComplete()) { if (state.isComplete()) {
msgs.remove(i); iter.remove();
i--;
size--;
if (_retransmitter == state) if (_retransmitter == state)
_retransmitter = null; _retransmitter = null;
if (succeeded == null) succeeded = new ArrayList(4); if (succeeded == null) succeeded = new ArrayList(4);
succeeded.add(state); succeeded.add(state);
} else if (state.isExpired()) { } else if (state.isExpired()) {
msgs.remove(i); iter.remove();
i--;
size--;
if (_retransmitter == state) if (_retransmitter == state)
_retransmitter = null; _retransmitter = null;
_context.statManager().addRateData("udp.sendFailed", state.getPushCount(), state.getLifetime()); _context.statManager().addRateData("udp.sendFailed", state.getPushCount(), state.getLifetime());
if (failed == null) failed = new ArrayList(4); if (failed == null) failed = new ArrayList(4);
failed.add(state); failed.add(state);
} else if (state.getPushCount() > OutboundMessageFragments.MAX_VOLLEYS) { } else if (state.getPushCount() > OutboundMessageFragments.MAX_VOLLEYS) {
msgs.remove(i); iter.remove();
i--;
size--;
if (state == _retransmitter) if (state == _retransmitter)
_retransmitter = null; _retransmitter = null;
_context.statManager().addRateData("udp.sendAggressiveFailed", state.getPushCount(), state.getLifetime()); _context.statManager().addRateData("udp.sendAggressiveFailed", state.getPushCount(), state.getLifetime());
@ -1232,9 +1244,7 @@ class PeerState {
List<OutboundMessageState> msgs = _outboundMessages; List<OutboundMessageState> msgs = _outboundMessages;
if (_dead) return null; if (_dead) return null;
synchronized (msgs) { synchronized (msgs) {
int size = msgs.size(); for (OutboundMessageState state : msgs) {
for (int i = 0; i < size; i++) {
OutboundMessageState state = msgs.get(i);
if (locked_shouldSend(state)) { if (locked_shouldSend(state)) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Allocate sending to " + _remotePeer.toBase64() + ": " + state.getMessageId()); _log.debug("Allocate sending to " + _remotePeer.toBase64() + ": " + state.getMessageId());
@ -1261,28 +1271,22 @@ class PeerState {
} }
/** /**
* return how long to wait before sending, or -1 if we have nothing to send * @return how long to wait before sending, or Integer.MAX_VALUE if we have nothing to send.
* If ready now, will return 0 or a negative value.
*/ */
public int getNextDelay() { public int getNextDelay() {
int rv = -1; int rv = Integer.MAX_VALUE;
if (_dead) return rv;
long now = _context.clock().now(); long now = _context.clock().now();
List<OutboundMessageState> msgs = _outboundMessages; List<OutboundMessageState> msgs = _outboundMessages;
if (_dead) return -1;
synchronized (msgs) { synchronized (msgs) {
if (_retransmitter != null) { if (_retransmitter != null) {
rv = (int)(_retransmitter.getNextSendTime() - now); rv = (int)(_retransmitter.getNextSendTime() - now);
if (rv <= 0) return rv;
return 1;
else
return rv;
} }
int size = msgs.size(); for (OutboundMessageState state : msgs) {
for (int i = 0; i < size; i++) {
OutboundMessageState state = msgs.get(i);
int delay = (int)(state.getNextSendTime() - now); int delay = (int)(state.getNextSendTime() - now);
if (delay <= 0) if (delay < rv)
delay = 1;
if ( (rv <= 0) || (delay < rv) )
rv = delay; rv = delay;
} }
} }
@ -1306,7 +1310,11 @@ class PeerState {
private static final int SSU_HEADER_SIZE = 46; private static final int SSU_HEADER_SIZE = 46;
static final int UDP_HEADER_SIZE = 8; static final int UDP_HEADER_SIZE = 8;
static final int IP_HEADER_SIZE = 20; static final int IP_HEADER_SIZE = 20;
/** how much payload data can we shove in there? */
/**
* how much payload data can we shove in there?
* @return MTU - 74
*/
private static final int fragmentSize(int mtu) { private static final int fragmentSize(int mtu) {
return mtu - SSU_HEADER_SIZE - UDP_HEADER_SIZE - IP_HEADER_SIZE; return mtu - SSU_HEADER_SIZE - UDP_HEADER_SIZE - IP_HEADER_SIZE;
} }
@ -1315,7 +1323,7 @@ class PeerState {
long now = _context.clock().now(); long now = _context.clock().now();
if (state.getNextSendTime() <= now) { if (state.getNextSendTime() <= now) {
if (!state.isFragmented()) { if (!state.isFragmented()) {
state.fragment(fragmentSize(getMTU())); state.fragment(fragmentSize(_mtu));
if (state.getMessage() != null) if (state.getMessage() != null)
state.getMessage().timestamp("fragment into " + state.getFragmentCount()); state.getMessage().timestamp("fragment into " + state.getFragmentCount());
@ -1372,14 +1380,14 @@ class PeerState {
_context.statManager().addRateData("udp.sendRejected", state.getPushCount(), state.getLifetime()); _context.statManager().addRateData("udp.sendRejected", state.getPushCount(), state.getLifetime());
//if (state.getMessage() != null) //if (state.getMessage() != null)
// state.getMessage().timestamp("send rejected, available=" + getSendWindowBytesRemaining()); // state.getMessage().timestamp("send rejected, available=" + getSendWindowBytesRemaining());
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.INFO))
_log.warn("Allocation of " + size + " rejected w/ wsize=" + getSendWindowBytes() _log.info("Allocation of " + size + " rejected w/ wsize=" + getSendWindowBytes()
+ " available=" + getSendWindowBytesRemaining() + " available=" + getSendWindowBytesRemaining()
+ " for message " + state.getMessageId() + ": " + state); + " for message " + state.getMessageId() + ": " + state);
state.setNextSendTime(now + (ACKSender.ACK_FREQUENCY / 2) + state.setNextSendTime(now + (ACKSender.ACK_FREQUENCY / 2) +
_context.random().nextInt(ACKSender.ACK_FREQUENCY)); //(now + 1024) & ~SECOND_MASK); _context.random().nextInt(ACKSender.ACK_FREQUENCY)); //(now + 1024) & ~SECOND_MASK);
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.INFO))
_log.warn("Retransmit after choke for next send time in " + (state.getNextSendTime()-now) + "ms"); _log.info("Retransmit after choke for next send time in " + (state.getNextSendTime()-now) + "ms");
//_throttle.choke(peer.getRemotePeer()); //_throttle.choke(peer.getRemotePeer());
//if (state.getMessage() != null) //if (state.getMessage() != null)
@ -1393,16 +1401,20 @@ class PeerState {
return false; return false;
} }
public int acked(long messageId) { /**
* A full ACK was received.
*
* @return true if the message was acked for the first time
*/
public boolean acked(long messageId) {
if (_dead) return false;
OutboundMessageState state = null; OutboundMessageState state = null;
List<OutboundMessageState> msgs = _outboundMessages; List<OutboundMessageState> msgs = _outboundMessages;
if (_dead) return 0;
synchronized (msgs) { synchronized (msgs) {
int sz = msgs.size(); for (Iterator<OutboundMessageState> iter = msgs.iterator(); iter.hasNext(); ) {
for (int i = 0; i < sz; i++) { state = iter.next();
state = msgs.get(i);
if (state.getMessageId() == messageId) { if (state.getMessageId() == messageId) {
msgs.remove(i); iter.remove();
break; break;
} else { } else {
state = null; state = null;
@ -1438,22 +1450,25 @@ class PeerState {
// _throttle.unchoke(peer.getRemotePeer()); // _throttle.unchoke(peer.getRemotePeer());
state.releaseResources(); state.releaseResources();
return numFragments;
} else { } else {
// dupack, likely // dupack, likely
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Received an ACK for a message not pending: " + messageId); _log.debug("Received an ACK for a message not pending: " + messageId);
return 0;
} }
return state != null;
} }
public void acked(ACKBitfield bitfield) { /**
* A partial ACK was received. This is much less common than full ACKs.
*
* @return true if the message was completely acked for the first time
*/
public boolean acked(ACKBitfield bitfield) {
if (_dead) if (_dead)
return; return false;
if (bitfield.receivedComplete()) { if (bitfield.receivedComplete()) {
acked(bitfield.getMessageId()); return acked(bitfield.getMessageId());
return;
} }
List<OutboundMessageState> msgs = _outboundMessages; List<OutboundMessageState> msgs = _outboundMessages;
@ -1461,13 +1476,13 @@ class PeerState {
OutboundMessageState state = null; OutboundMessageState state = null;
boolean isComplete = false; boolean isComplete = false;
synchronized (msgs) { synchronized (msgs) {
for (int i = 0; i < msgs.size(); i++) { for (Iterator<OutboundMessageState> iter = msgs.iterator(); iter.hasNext(); ) {
state = msgs.get(i); state = iter.next();
if (state.getMessageId() == bitfield.getMessageId()) { if (state.getMessageId() == bitfield.getMessageId()) {
boolean complete = state.acked(bitfield); boolean complete = state.acked(bitfield);
if (complete) { if (complete) {
isComplete = true; isComplete = true;
msgs.remove(i); iter.remove();
if (state == _retransmitter) if (state == _retransmitter)
_retransmitter = null; _retransmitter = null;
} }
@ -1514,12 +1529,12 @@ class PeerState {
//if (state.getMessage() != null) //if (state.getMessage() != null)
// state.getMessage().timestamp("partial ack after " + numSends + ": " + bitfield.toString()); // state.getMessage().timestamp("partial ack after " + numSends + ": " + bitfield.toString());
} }
return; return isComplete;
} else { } else {
// dupack // dupack
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Received an ACK for a message not pending: " + bitfield); _log.debug("Received an ACK for a message not pending: " + bitfield);
return; return false;
} }
} }
@ -1581,6 +1596,8 @@ class PeerState {
} }
} }
// why removed? Some risk of dups in OutboundMessageFragments._activePeers ???
/* /*
public int hashCode() { public int hashCode() {
if (_remotePeer != null) if (_remotePeer != null)

View File

@ -91,17 +91,17 @@ with either Bob or Charlie, but it is not required.</p>
*/ */
class PeerTestManager { class PeerTestManager {
private RouterContext _context; private final RouterContext _context;
private Log _log; private final Log _log;
private UDPTransport _transport; private final UDPTransport _transport;
private PacketBuilder _packetBuilder; private final PacketBuilder _packetBuilder;
/** map of Long(nonce) to PeerTestState for tests currently in progress (as Bob/Charlie) */ /** map of Long(nonce) to PeerTestState for tests currently in progress (as Bob/Charlie) */
private final Map<Long, PeerTestState> _activeTests; private final Map<Long, PeerTestState> _activeTests;
/** current test we are running (as Alice), or null */ /** current test we are running (as Alice), or null */
private PeerTestState _currentTest; private PeerTestState _currentTest;
private boolean _currentTestComplete; private boolean _currentTestComplete;
/** as Alice */ /** as Alice */
private Queue<Long> _recentTests; private final Queue<Long> _recentTests;
/** longest we will keep track of a Charlie nonce for */ /** longest we will keep track of a Charlie nonce for */
private static final int MAX_CHARLIE_LIFETIME = 10*1000; private static final int MAX_CHARLIE_LIFETIME = 10*1000;

View File

@ -17,7 +17,7 @@ import net.i2p.util.Log;
* with code to fail messages that expire. * with code to fail messages that expire.
* *
* WARNING - UNUSED since 0.6.1.11 * WARNING - UNUSED since 0.6.1.11
* See comments in DQAT.java and mtn history ca. 2006-02-19 * See comments in DummyThrottle.java and mtn history ca. 2006-02-19
* *
*/ */
class TimedWeightedPriorityMessageQueue implements MessageQueue, OutboundMessageFragments.ActiveThrottle { class TimedWeightedPriorityMessageQueue implements MessageQueue, OutboundMessageFragments.ActiveThrottle {

View File

@ -137,7 +137,7 @@ class UDPEndpoint {
* Add the packet to the outobund queue to be sent ASAP (as allowed by * Add the packet to the outobund queue to be sent ASAP (as allowed by
* the bandwidth limiter) * the bandwidth limiter)
* *
* @return number of packets in the send queue * @return ZERO (used to be number of packets in the queue)
*/ */
public int send(UDPPacket packet) { public int send(UDPPacket packet) {
if (_sender == null) if (_sender == null)

View File

@ -19,13 +19,13 @@ import net.i2p.util.Log;
class UDPPacket { class UDPPacket {
private I2PAppContext _context; private I2PAppContext _context;
private static Log _log; private static Log _log;
private volatile DatagramPacket _packet; private final DatagramPacket _packet;
private volatile short _priority; private volatile short _priority;
private volatile long _initializeTime; private volatile long _initializeTime;
private volatile long _expiration; private volatile long _expiration;
private byte[] _data; private final byte[] _data;
private byte[] _validateBuf; private final byte[] _validateBuf;
private byte[] _ivBuf; private final byte[] _ivBuf;
private volatile int _markedType; private volatile int _markedType;
private volatile RemoteHostId _remoteHost; private volatile RemoteHostId _remoteHost;
private volatile boolean _released; private volatile boolean _released;
@ -51,7 +51,13 @@ class UDPPacket {
_log = I2PAppContext.getGlobalContext().logManager().getLog(UDPPacket.class); _log = I2PAppContext.getGlobalContext().logManager().getLog(UDPPacket.class);
} }
static final int MAX_PACKET_SIZE = 2048; /**
* Actually it is one less than this, we assume
* if a received packet is this big it is truncated.
* This is bigger than PeerState.LARGE_MTU, as the far-end's
* LARGE_MTU may be larger than ours.
*/
static final int MAX_PACKET_SIZE = 1536;
public static final int IV_SIZE = 16; public static final int IV_SIZE = 16;
public static final int MAC_SIZE = 16; public static final int MAC_SIZE = 16;
@ -81,21 +87,26 @@ class UDPPacket {
private static final int MAX_VALIDATE_SIZE = MAX_PACKET_SIZE; private static final int MAX_VALIDATE_SIZE = MAX_PACKET_SIZE;
private UDPPacket(I2PAppContext ctx, boolean inbound) { private UDPPacket(I2PAppContext ctx) {
ctx.statManager().createRateStat("udp.fetchRemoteSlow", "How long it takes to grab the remote ip info", "udp", UDPTransport.RATES); ctx.statManager().createRateStat("udp.fetchRemoteSlow", "How long it takes to grab the remote ip info", "udp", UDPTransport.RATES);
// the data buffer is clobbered on init(..), but we need it to bootstrap // the data buffer is clobbered on init(..), but we need it to bootstrap
_data = new byte[MAX_PACKET_SIZE]; _data = new byte[MAX_PACKET_SIZE];
_packet = new DatagramPacket(_data, MAX_PACKET_SIZE); _packet = new DatagramPacket(_data, MAX_PACKET_SIZE);
_validateBuf = new byte[MAX_VALIDATE_SIZE]; _validateBuf = new byte[MAX_VALIDATE_SIZE];
_ivBuf = new byte[IV_SIZE]; _ivBuf = new byte[IV_SIZE];
init(ctx, inbound); init(ctx);
} }
// FIXME optimization, remove the inbound parameter, as it is unused. FIXME
private void init(I2PAppContext ctx, boolean inbound) { private void init(I2PAppContext ctx) {
_context = ctx; _context = ctx;
//_dataBuf = _dataCache.acquire(); //_dataBuf = _dataCache.acquire();
Arrays.fill(_data, (byte)0); Arrays.fill(_data, (byte)0);
//_packet = new DatagramPacket(_data, MAX_PACKET_SIZE); //_packet = new DatagramPacket(_data, MAX_PACKET_SIZE);
//
// WARNING -
// Doesn't seem like we should have to do this every time,
// from reading the DatagramPacket javadocs,
// but we get massive corruption without it.
_packet.setData(_data); _packet.setData(_data);
// _isInbound = inbound; // _isInbound = inbound;
_initializeTime = _context.clock().now(); _initializeTime = _context.clock().now();
@ -262,15 +273,18 @@ class UDPPacket {
return buf.toString(); return buf.toString();
} }
/**
* @param inbound unused
*/
public static UDPPacket acquire(I2PAppContext ctx, boolean inbound) { public static UDPPacket acquire(I2PAppContext ctx, boolean inbound) {
UDPPacket rv = null; UDPPacket rv = null;
if (CACHE) { if (CACHE) {
rv = _packetCache.poll(); rv = _packetCache.poll();
if (rv != null) if (rv != null)
rv.init(ctx, inbound); rv.init(ctx);
} }
if (rv == null) if (rv == null)
rv = new UDPPacket(ctx, inbound); rv = new UDPPacket(ctx);
//if (rv._acquiredBy != null) { //if (rv._acquiredBy != null) {
// _log.log(Log.CRIT, "Already acquired! current stack trace is:", new Exception()); // _log.log(Log.CRIT, "Already acquired! current stack trace is:", new Exception());
// _log.log(Log.CRIT, "Earlier acquired:", rv._acquiredBy); // _log.log(Log.CRIT, "Earlier acquired:", rv._acquiredBy);

View File

@ -16,19 +16,19 @@ import net.i2p.util.Log;
* *
*/ */
class UDPPacketReader { class UDPPacketReader {
private I2PAppContext _context; private final I2PAppContext _context;
private Log _log; private final Log _log;
private byte _message[]; private byte _message[];
private int _payloadBeginOffset; private int _payloadBeginOffset;
private int _payloadLength; private int _payloadLength;
private SessionRequestReader _sessionRequestReader; private final SessionRequestReader _sessionRequestReader;
private SessionCreatedReader _sessionCreatedReader; private final SessionCreatedReader _sessionCreatedReader;
private SessionConfirmedReader _sessionConfirmedReader; private final SessionConfirmedReader _sessionConfirmedReader;
private DataReader _dataReader; private final DataReader _dataReader;
private PeerTestReader _peerTestReader; private final PeerTestReader _peerTestReader;
private RelayRequestReader _relayRequestReader; private final RelayRequestReader _relayRequestReader;
private RelayIntroReader _relayIntroReader; private final RelayIntroReader _relayIntroReader;
private RelayResponseReader _relayResponseReader; private final RelayResponseReader _relayResponseReader;
private static final int KEYING_MATERIAL_LENGTH = 64; private static final int KEYING_MATERIAL_LENGTH = 64;
@ -354,7 +354,9 @@ class UDPPacketReader {
off++; // fragment info off++; // fragment info
return ((int)DataHelper.fromLong(_message, off, 2)) & 0x3FFF; return ((int)DataHelper.fromLong(_message, off, 2)) & 0x3FFF;
} }
public void readMessageFragment(int fragmentNum, byte target[], int targetOffset) {
public void readMessageFragment(int fragmentNum, byte target[], int targetOffset)
throws ArrayIndexOutOfBoundsException {
int off = getFragmentBegin(fragmentNum); int off = getFragmentBegin(fragmentNum);
off += 4; // messageId off += 4; // messageId
off++; // fragment info off++; // fragment info

View File

@ -243,6 +243,10 @@ class UDPReceiver {
// and block after we know how much we read but before // and block after we know how much we read but before
// we release the packet to the inbound queue // we release the packet to the inbound queue
if (size >= UDPPacket.MAX_PACKET_SIZE) {
// DatagramSocket javadocs: If the message is longer than the packet's length, the message is truncated.
throw new IOException("packet too large! truncated and dropped");
}
if (size > 0) { if (size > 0) {
//FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(size, "UDP receiver"); //FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(size, "UDP receiver");
//_context.bandwidthLimiter().requestInbound(req, size, "UDP receiver"); //_context.bandwidthLimiter().requestInbound(req, size, "UDP receiver");

View File

@ -16,13 +16,13 @@ import net.i2p.util.Log;
* *
*/ */
class UDPSender { class UDPSender {
private RouterContext _context; private final RouterContext _context;
private Log _log; private final Log _log;
private DatagramSocket _socket; private DatagramSocket _socket;
private String _name; private String _name;
private final BlockingQueue<UDPPacket> _outboundQueue; private final BlockingQueue<UDPPacket> _outboundQueue;
private boolean _keepRunning; private boolean _keepRunning;
private Runner _runner; private final Runner _runner;
private static final int TYPE_POISON = 99999; private static final int TYPE_POISON = 99999;
//private static final int MAX_QUEUED = 4; //private static final int MAX_QUEUED = 4;
@ -91,7 +91,7 @@ class UDPSender {
* available, if requested, otherwise it returns immediately * available, if requested, otherwise it returns immediately
* *
* @param blockTime how long to block IGNORED * @param blockTime how long to block IGNORED
* @return number of packets queued * @return ZERO (used to be number of packets in the queue)
* @deprecated use add(packet) * @deprecated use add(packet)
*/ */
public int add(UDPPacket packet, int blockTime) { public int add(UDPPacket packet, int blockTime) {

View File

@ -64,6 +64,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private final IntroductionManager _introManager; private final IntroductionManager _introManager;
private final ExpirePeerEvent _expireEvent; private final ExpirePeerEvent _expireEvent;
private final PeerTestEvent _testEvent; private final PeerTestEvent _testEvent;
private final PacketBuilder _destroyBuilder;
private short _reachabilityStatus; private short _reachabilityStatus;
private long _reachabilityStatusLastUpdated; private long _reachabilityStatusLastUpdated;
private long _introducersSelectedOn; private long _introducersSelectedOn;
@ -184,7 +185,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_peersByRemoteHost = new ConcurrentHashMap(128); _peersByRemoteHost = new ConcurrentHashMap(128);
_dropList = new ConcurrentHashSet(2); _dropList = new ConcurrentHashSet(2);
// See comments in DQAT.java // See comments in DummyThrottle.java
if (USE_PRIORITY) { if (USE_PRIORITY) {
TimedWeightedPriorityMessageQueue mq = new TimedWeightedPriorityMessageQueue(ctx, PRIORITY_LIMITS, PRIORITY_WEIGHT, this); TimedWeightedPriorityMessageQueue mq = new TimedWeightedPriorityMessageQueue(ctx, PRIORITY_LIMITS, PRIORITY_WEIGHT, this);
_outboundMessages = mq; _outboundMessages = mq;
@ -200,6 +201,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_cachedBid[i] = new SharedBid(BID_VALUES[i]); _cachedBid[i] = new SharedBid(BID_VALUES[i]);
} }
_destroyBuilder = new PacketBuilder(_context, this);
_fragments = new OutboundMessageFragments(_context, this, _activeThrottle); _fragments = new OutboundMessageFragments(_context, this, _activeThrottle);
_inboundFragments = new InboundMessageFragments(_context, _fragments, this); _inboundFragments = new InboundMessageFragments(_context, _fragments, this);
if (SHOULD_FLOOD_PEERS) if (SHOULD_FLOOD_PEERS)
@ -296,7 +298,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (_handler == null) if (_handler == null)
_handler = new PacketHandler(_context, this, _endpoint, _establisher, _inboundFragments, _testManager, _introManager); _handler = new PacketHandler(_context, this, _endpoint, _establisher, _inboundFragments, _testManager, _introManager);
// See comments in DQAT.java // See comments in DummyThrottle.java
if (USE_PRIORITY && _refiller == null) if (USE_PRIORITY && _refiller == null)
_refiller = new OutboundRefiller(_context, _fragments, _outboundMessages); _refiller = new OutboundRefiller(_context, _fragments, _outboundMessages);
@ -337,6 +339,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
} }
public void shutdown() { public void shutdown() {
destroyAll();
if (_endpoint != null) if (_endpoint != null)
_endpoint.shutdown(); _endpoint.shutdown();
if (_flooder != null) if (_flooder != null)
@ -345,14 +348,18 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_refiller.shutdown(); _refiller.shutdown();
if (_handler != null) if (_handler != null)
_handler.shutdown(); _handler.shutdown();
_fragments.shutdown();
if (_pusher != null) if (_pusher != null)
_pusher.shutdown(); _pusher.shutdown();
_fragments.shutdown();
if (_establisher != null) if (_establisher != null)
_establisher.shutdown(); _establisher.shutdown();
_inboundFragments.shutdown(); _inboundFragments.shutdown();
_expireEvent.setIsAlive(false); _expireEvent.setIsAlive(false);
_testEvent.setIsAlive(false); _testEvent.setIsAlive(false);
_peersByRemoteHost.clear();
_peersByIdent.clear();
_dropList.clear();
_introManager.reset();
} }
/** /**
@ -1011,12 +1018,53 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
*/ */
} }
/**
* This sends it directly out, bypassing OutboundMessageFragments
* and the PacketPusher. The only queueing is for the bandwidth limiter.
*
* @return ZERO (used to be number of packets in the queue)
*/
int send(UDPPacket packet) { int send(UDPPacket packet) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending packet " + packet); _log.debug("Sending packet " + packet);
return _endpoint.send(packet); return _endpoint.send(packet);
} }
/**
* Send a session destroy message, bypassing OMF and PacketPusher.
*
* @since 0.8.9
*/
private void sendDestroy(PeerState peer) {
// peer must be fully established
if (peer.getCurrentCipherKey() == null)
return;
UDPPacket pkt = _destroyBuilder.buildSessionDestroyPacket(peer);
if (_log.shouldLog(Log.WARN))
_log.warn("Sending destroy to : " + peer);
send(pkt);
}
/**
* Send a session destroy message to everybody
*
* @since 0.8.9
*/
private void destroyAll() {
int howMany = _peersByIdent.size();
if (_log.shouldLog(Log.WARN))
_log.warn("Sending destroy to : " + howMany + " peers");
for (PeerState peer : _peersByIdent.values()) {
sendDestroy(peer);
}
int toSleep = Math.min(howMany / 3, 750);
if (toSleep > 0) {
try {
Thread.sleep(toSleep);
} catch (InterruptedException ie) {}
}
}
/** minimum active peers to maintain IP detection, etc. */ /** minimum active peers to maintain IP detection, etc. */
private static final int MIN_PEERS = 3; private static final int MIN_PEERS = 3;
/** minimum peers volunteering to be introducers if we need that */ /** minimum peers volunteering to be introducers if we need that */
@ -1112,6 +1160,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private static final int MIN_EXPIRE_TIMEOUT = 10*60*1000; private static final int MIN_EXPIRE_TIMEOUT = 10*60*1000;
public String getStyle() { return STYLE; } public String getStyle() { return STYLE; }
@Override @Override
public void send(OutNetMessage msg) { public void send(OutNetMessage msg) {
if (msg == null) return; if (msg == null) return;
@ -1151,7 +1200,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Add to fragments for " + to.toBase64()); _log.debug("Add to fragments for " + to.toBase64());
// See comments in DQAT.java // See comments in DummyThrottle.java
if (USE_PRIORITY) if (USE_PRIORITY)
_outboundMessages.add(msg); _outboundMessages.add(msg);
else // skip the priority queue and go straight to the active pool else // skip the priority queue and go straight to the active pool
@ -1163,6 +1212,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_establisher.establish(msg); _establisher.establish(msg);
} }
} }
void send(I2NPMessage msg, PeerState peer) { void send(I2NPMessage msg, PeerState peer) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Injecting a data message to a new peer: " + peer); _log.debug("Injecting a data message to a new peer: " + peer);
@ -2234,8 +2284,10 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
} }
} }
for (int i = 0; i < _expireBuffer.size(); i++) for (PeerState peer : _expireBuffer) {
dropPeer(_expireBuffer.get(i), false, "idle too long"); sendDestroy(peer);
dropPeer(peer, false, "idle too long");
}
_expireBuffer.clear(); _expireBuffer.clear();
if (_alive) if (_alive)