* fixed some stupid threading issues in the packet handler (duh)

* use the new raw i2np message format (the previous corruptions were due to above)
* add a new test component (UDPFlooder) which floods all peers at the rate desired
* packet munging fix for highly fragmented messages
* include basic slow start code
* fixed the UDP peer rate refilling
* cleaned up some nextSend scheduling
This commit is contained in:
jrandom
2005-04-16 15:18:09 +00:00
committed by zzz
parent a7dfaee5ac
commit 9e5fe7d2b6
15 changed files with 323 additions and 118 deletions

View File

@ -35,7 +35,7 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM
public final static long DEFAULT_EXPIRATION_MS = 1*60*1000; // 1 minute by default public final static long DEFAULT_EXPIRATION_MS = 1*60*1000; // 1 minute by default
public final static int CHECKSUM_LENGTH = 1; //Hash.HASH_LENGTH; public final static int CHECKSUM_LENGTH = 1; //Hash.HASH_LENGTH;
private static final boolean RAW_FULL_SIZE = true; private static final boolean RAW_FULL_SIZE = false;
public I2NPMessageImpl(I2PAppContext context) { public I2NPMessageImpl(I2PAppContext context) {
_context = context; _context = context;
@ -123,7 +123,7 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM
+ "data.len=" + data.length + "data.len=" + data.length
+ " offset=" + offset + " offset=" + offset
+ " cur=" + cur + " cur=" + cur
+ " wanted=" + size + "]"); + " wanted=" + size + "]: " + getClass().getName());
SHA256EntryCache.CacheEntry cache = _context.sha().cache().acquire(size); SHA256EntryCache.CacheEntry cache = _context.sha().cache().acquire(size);
Hash calc = _context.sha().calculateHash(data, cur, size, cache); Hash calc = _context.sha().calculateHash(data, cur, size, cache);

View File

@ -23,7 +23,7 @@ public class ACKSender implements Runnable {
_log = ctx.logManager().getLog(ACKSender.class); _log = ctx.logManager().getLog(ACKSender.class);
_fragments = fragments; _fragments = fragments;
_transport = transport; _transport = transport;
_builder = new PacketBuilder(_context, _transport); _builder = new PacketBuilder(_context);
} }
public void run() { public void run() {

View File

@ -38,7 +38,7 @@ public class EstablishmentManager {
_context = ctx; _context = ctx;
_log = ctx.logManager().getLog(EstablishmentManager.class); _log = ctx.logManager().getLog(EstablishmentManager.class);
_transport = transport; _transport = transport;
_builder = new PacketBuilder(ctx, _transport); _builder = new PacketBuilder(ctx);
_inboundStates = new HashMap(32); _inboundStates = new HashMap(32);
_outboundStates = new HashMap(32); _outboundStates = new HashMap(32);
_activityLock = new Object(); _activityLock = new Object();
@ -281,7 +281,7 @@ public class EstablishmentManager {
_log.debug("Send created to: " + state.getRemoteHostInfo()); _log.debug("Send created to: " + state.getRemoteHostInfo());
state.generateSessionKey(); state.generateSessionKey();
_transport.send(_builder.buildSessionCreatedPacket(state)); _transport.send(_builder.buildSessionCreatedPacket(state, _transport.getExternalPort(), _transport.getIntroKey()));
// if they haven't advanced to sending us confirmed packets in 5s, // if they haven't advanced to sending us confirmed packets in 5s,
// repeat // repeat
state.setNextSendTime(now + 5*1000); state.setNextSendTime(now + 5*1000);
@ -308,7 +308,7 @@ public class EstablishmentManager {
// signs if we havent signed yet // signs if we havent signed yet
state.prepareSessionConfirmed(); state.prepareSessionConfirmed();
UDPPacket packets[] = _builder.buildSessionConfirmedPackets(state); UDPPacket packets[] = _builder.buildSessionConfirmedPackets(state, _context.router().getRouterInfo().getIdentity());
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Send confirm to: " + state.getRemoteHostInfo()); _log.debug("Send confirm to: " + state.getRemoteHostInfo());

View File

@ -38,7 +38,7 @@ public class InboundMessageFragments {
private static final int RECENTLY_COMPLETED_SIZE = 100; private static final int RECENTLY_COMPLETED_SIZE = 100;
/** how frequently do we want to send ACKs to a peer? */ /** how frequently do we want to send ACKs to a peer? */
private static final int ACK_FREQUENCY = 100; private static final int ACK_FREQUENCY = 200;
public InboundMessageFragments(RouterContext ctx, OutboundMessageFragments outbound, UDPTransport transport) { public InboundMessageFragments(RouterContext ctx, OutboundMessageFragments outbound, UDPTransport transport) {
_context = ctx; _context = ctx;
@ -132,7 +132,7 @@ public class InboundMessageFragments {
messageComplete = true; messageComplete = true;
messages.remove(messageId); messages.remove(messageId);
while (_recentlyCompletedMessages.size() >= RECENTLY_COMPLETED_SIZE) while (_recentlyCompletedMessages.size() >= RECENTLY_COMPLETED_SIZE)
_recentlyCompletedMessages.remove(0); _recentlyCompletedMessages.remove(0);
_recentlyCompletedMessages.add(messageId); _recentlyCompletedMessages.add(messageId);
@ -147,8 +147,6 @@ public class InboundMessageFragments {
_context.statManager().addRateData("udp.receivedCompleteTime", state.getLifetime(), state.getLifetime()); _context.statManager().addRateData("udp.receivedCompleteTime", state.getLifetime(), state.getLifetime());
_context.statManager().addRateData("udp.receivedCompleteFragments", state.getFragmentCount(), state.getLifetime()); _context.statManager().addRateData("udp.receivedCompleteFragments", state.getFragmentCount(), state.getLifetime());
_stateLock.notifyAll();
} else if (state.isExpired()) { } else if (state.isExpired()) {
messageExpired = true; messageExpired = true;
messages.remove(messageId); messages.remove(messageId);
@ -160,6 +158,8 @@ public class InboundMessageFragments {
if (!fragmentOK) if (!fragmentOK)
break; break;
} }
_stateLock.notifyAll();
} }
} }
@ -167,13 +167,15 @@ public class InboundMessageFragments {
if (data.readACKsIncluded()) { if (data.readACKsIncluded()) {
int fragments = 0; int fragments = 0;
long acks[] = data.readACKs(); long acks[] = data.readACKs();
_context.statManager().addRateData("udp.receivedACKs", acks.length, 0); if (acks != null) {
for (int i = 0; i < acks.length; i++) { _context.statManager().addRateData("udp.receivedACKs", acks.length, 0);
if (_log.shouldLog(Log.INFO)) for (int i = 0; i < acks.length; i++) {
_log.info("Full ACK of message " + acks[i] + " received!"); if (_log.shouldLog(Log.INFO))
fragments += _outbound.acked(acks[i], from.getRemotePeer()); _log.info("Full ACK of message " + acks[i] + " received!");
fragments += _outbound.acked(acks[i], from.getRemotePeer());
}
from.messageACKed(fragments * from.getMTU()); // estimated size
} }
from.messageACKed(fragments * from.getMTU()); // estimated size
} }
if (data.readECN()) if (data.readECN())
from.ECNReceived(); from.ECNReceived();

View File

@ -50,7 +50,11 @@ public class InboundMessageState {
public synchronized boolean receiveFragment(UDPPacketReader.DataReader data, int dataFragment) { public synchronized boolean receiveFragment(UDPPacketReader.DataReader data, int dataFragment) {
int fragmentNum = data.readMessageFragmentNum(dataFragment); int fragmentNum = data.readMessageFragmentNum(dataFragment);
if ( (fragmentNum < 0) || (fragmentNum > _fragments.length)) { if ( (fragmentNum < 0) || (fragmentNum > _fragments.length)) {
_log.log(Log.CRIT, "Invalid fragment " + fragmentNum + ": " + data, new Exception("source")); StringBuffer buf = new StringBuffer(1024);
buf.append("Invalid fragment ").append(fragmentNum);
buf.append(": ").append(data);
data.toRawString(buf);
_log.log(Log.CRIT, buf.toString(), new Exception("source"));
return false; return false;
} }
if (_fragments[fragmentNum] == null) { if (_fragments[fragmentNum] == null) {

View File

@ -61,8 +61,8 @@ public class MessageReceiver implements Runnable {
m.setUniqueId(state.getMessageId()); m.setUniqueId(state.getMessageId());
return m; return m;
} catch (I2NPMessageException ime) { } catch (I2NPMessageException ime) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.ERROR))
_log.warn("Message invalid: " + state, ime); _log.error("Message invalid: " + state, ime);
return null; return null;
} catch (Exception e) { } catch (Exception e) {
_log.log(Log.CRIT, "Error dealing with a message: " + state, e); _log.log(Log.CRIT, "Error dealing with a message: " + state, e);

View File

@ -42,7 +42,7 @@ public class OutboundMessageFragments {
_transport = transport; _transport = transport;
_activeMessages = new ArrayList(MAX_ACTIVE); _activeMessages = new ArrayList(MAX_ACTIVE);
_nextPacketMessage = 0; _nextPacketMessage = 0;
_builder = new PacketBuilder(ctx, _transport); _builder = new PacketBuilder(ctx);
_alive = true; _alive = true;
_context.statManager().createRateStat("udp.sendVolleyTime", "Long it takes to send a full volley", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.sendVolleyTime", "Long it takes to send a full volley", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.sendConfirmTime", "How long it takes to send a message and get the ACK", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.sendConfirmTime", "How long it takes to send a message and get the ACK", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
@ -89,10 +89,11 @@ public class OutboundMessageFragments {
*/ */
public void add(OutNetMessage msg) { public void add(OutNetMessage msg) {
OutboundMessageState state = new OutboundMessageState(_context); OutboundMessageState state = new OutboundMessageState(_context);
state.initialize(msg); boolean ok = state.initialize(msg);
finishMessages(); finishMessages();
synchronized (_activeMessages) { synchronized (_activeMessages) {
_activeMessages.add(state); if (ok)
_activeMessages.add(state);
_activeMessages.notifyAll(); _activeMessages.notifyAll();
} }
} }
@ -129,14 +130,14 @@ public class OutboundMessageFragments {
// it can not have an OutNetMessage if the source is the // it can not have an OutNetMessage if the source is the
// final after establishment message // final after establishment message
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Unable to send a direct message: " + state); _log.warn("Unable to send an expired direct message: " + state);
} }
i--; i--;
} else if (state.getPushCount() > MAX_VOLLEYS) { } else if (state.getPushCount() > MAX_VOLLEYS) {
_activeMessages.remove(i); _activeMessages.remove(i);
_context.statManager().addRateData("udp.sendAggressiveFailed", state.getPushCount(), state.getLifetime()); _context.statManager().addRateData("udp.sendAggressiveFailed", state.getPushCount(), state.getLifetime());
if (state.getPeer() != null) //if (state.getPeer() != null)
state.getPeer().congestionOccurred(); // state.getPeer().congestionOccurred();
if (state.getMessage() != null) { if (state.getMessage() != null) {
_transport.failed(state.getMessage()); _transport.failed(state.getMessage());
@ -144,7 +145,7 @@ public class OutboundMessageFragments {
// it can not have an OutNetMessage if the source is the // it can not have an OutNetMessage if the source is the
// final after establishment message // final after establishment message
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Unable to send a direct message: " + state); _log.warn("Unable to send a direct message after too many volleys: " + state);
} }
i--; i--;
@ -198,25 +199,31 @@ public class OutboundMessageFragments {
int fragmentSize = state.fragmentSize(currentFragment); int fragmentSize = state.fragmentSize(currentFragment);
if (peer.allocateSendingBytes(fragmentSize)) { if (peer.allocateSendingBytes(fragmentSize)) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Allocation of " + fragmentSize + " allowed"); _log.info("Allocation of " + fragmentSize + " allowed with "
+ peer.getSendWindowBytesRemaining()
+ "/" + peer.getSendWindowBytes()
+ " remaining"
+ " for message " + state.getMessageId() + ": " + state);
// for fairness, we move on in a round robin // for fairness, we move on in a round robin
_nextPacketMessage = i + 1; _nextPacketMessage = i + 1;
if (state.getPushCount() != oldVolley) { if (state.getPushCount() != oldVolley) {
_context.statManager().addRateData("udp.sendVolleyTime", state.getLifetime(), state.getFragmentCount()); _context.statManager().addRateData("udp.sendVolleyTime", state.getLifetime(), state.getFragmentCount());
state.setNextSendTime(now + 500); state.setNextSendTime(now + (1000-(now%1000)) + _context.random().nextInt(2000));
} else { } else {
if (peer.getSendWindowBytesRemaining() > 0) if (peer.getSendWindowBytesRemaining() > 0)
state.setNextSendTime(now); state.setNextSendTime(now);
else else
state.setNextSendTime(now + 50 ); state.setNextSendTime(now + (1000-(now%1000)));
} }
break; break;
} else { } else {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Allocation of " + fragmentSize + " rejected"); _log.warn("Allocation of " + fragmentSize + " rejected w/ wsize=" + peer.getSendWindowBytes()
state.setNextSendTime(now + _context.random().nextInt(500)); + " available=" + peer.getSendWindowBytesRemaining()
+ " for message " + state.getMessageId() + ": " + state);
state.setNextSendTime(now + (1000-(now%1000)));
currentFragment = -1; currentFragment = -1;
} }
} }
@ -229,15 +236,15 @@ public class OutboundMessageFragments {
if (currentFragment < 0) { if (currentFragment < 0) {
if (nextSend <= 0) { if (nextSend <= 0) {
try { try {
_activeMessages.wait(100); _activeMessages.wait();
} catch (InterruptedException ie) {} } catch (InterruptedException ie) {}
} else { } else {
// none of the packets were eligible for sending // none of the packets were eligible for sending
long delay = nextSend - now; long delay = nextSend - now;
if (delay <= 0) if (delay <= 0)
delay = 10; delay = 10;
if (delay > 500) if (delay > 1000)
delay = 500; delay = 1000;
try { try {
_activeMessages.wait(delay); _activeMessages.wait(delay);
} catch (InterruptedException ie) {} } catch (InterruptedException ie) {}

View File

@ -4,7 +4,7 @@ import java.util.Arrays;
import net.i2p.data.Base64; import net.i2p.data.Base64;
import net.i2p.data.ByteArray; import net.i2p.data.ByteArray;
import net.i2p.data.i2np.I2NPMessage; import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.RouterContext; import net.i2p.I2PAppContext;
import net.i2p.router.OutNetMessage; import net.i2p.router.OutNetMessage;
import net.i2p.util.ByteCache; import net.i2p.util.ByteCache;
import net.i2p.util.Log; import net.i2p.util.Log;
@ -14,7 +14,7 @@ import net.i2p.util.Log;
* *
*/ */
public class OutboundMessageState { public class OutboundMessageState {
private RouterContext _context; private I2PAppContext _context;
private Log _log; private Log _log;
/** may be null if we are part of the establishment */ /** may be null if we are part of the establishment */
private OutNetMessage _message; private OutNetMessage _message;
@ -35,19 +35,35 @@ public class OutboundMessageState {
public static final int MAX_FRAGMENTS = 32; public static final int MAX_FRAGMENTS = 32;
private static final ByteCache _cache = ByteCache.getInstance(64, MAX_FRAGMENTS*1024); private static final ByteCache _cache = ByteCache.getInstance(64, MAX_FRAGMENTS*1024);
public OutboundMessageState(RouterContext context) { public OutboundMessageState(I2PAppContext context) {
_context = context; _context = context;
_log = _context.logManager().getLog(OutboundMessageState.class); _log = _context.logManager().getLog(OutboundMessageState.class);
_pushCount = 0; _pushCount = 0;
_maxSends = 0; _maxSends = 0;
} }
public synchronized void initialize(OutNetMessage msg) { public synchronized boolean initialize(OutNetMessage msg) {
initialize(msg, msg.getMessage(), null); try {
initialize(msg, msg.getMessage(), null);
return true;
} catch (OutOfMemoryError oom) {
throw oom;
} catch (Exception e) {
_log.log(Log.CRIT, "Error initializing " + msg, e);
return false;
}
} }
public void initialize(I2NPMessage msg, PeerState peer) { public boolean initialize(I2NPMessage msg, PeerState peer) {
initialize(null, msg, peer); try {
initialize(null, msg, peer);
return true;
} catch (OutOfMemoryError oom) {
throw oom;
} catch (Exception e) {
_log.log(Log.CRIT, "Error initializing " + msg, e);
return false;
}
} }
private void initialize(OutNetMessage m, I2NPMessage msg, PeerState peer) { private void initialize(OutNetMessage m, I2NPMessage msg, PeerState peer) {
@ -227,6 +243,10 @@ public class OutboundMessageState {
buf.append("Message ").append(_messageId); buf.append("Message ").append(_messageId);
if (_fragmentSends != null) if (_fragmentSends != null)
buf.append(" with ").append(_fragmentSends.length).append(" fragments"); buf.append(" with ").append(_fragmentSends.length).append(" fragments");
if (_messageBuf != null)
buf.append(" of size ").append(_messageBuf.getValid());
buf.append(" volleys: ").append(_maxSends);
buf.append(" lifetime: ").append(getLifetime());
return buf.toString(); return buf.toString();
} }
} }

View File

@ -6,13 +6,14 @@ import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import net.i2p.I2PAppContext;
import net.i2p.data.Base64; import net.i2p.data.Base64;
import net.i2p.data.ByteArray; import net.i2p.data.ByteArray;
import net.i2p.data.DataHelper; import net.i2p.data.DataHelper;
import net.i2p.data.Hash; import net.i2p.data.Hash;
import net.i2p.data.RouterIdentity;
import net.i2p.data.SessionKey; import net.i2p.data.SessionKey;
import net.i2p.data.Signature; import net.i2p.data.Signature;
import net.i2p.router.RouterContext;
import net.i2p.util.ByteCache; import net.i2p.util.ByteCache;
import net.i2p.util.Log; import net.i2p.util.Log;
@ -22,16 +23,14 @@ import net.i2p.util.Log;
* *
*/ */
public class PacketBuilder { public class PacketBuilder {
private RouterContext _context; private I2PAppContext _context;
private Log _log; private Log _log;
private UDPTransport _transport;
private static final ByteCache _ivCache = ByteCache.getInstance(64, UDPPacket.IV_SIZE); private static final ByteCache _ivCache = ByteCache.getInstance(64, UDPPacket.IV_SIZE);
public PacketBuilder(RouterContext ctx, UDPTransport transport) { public PacketBuilder(I2PAppContext ctx) {
_context = ctx; _context = ctx;
_log = ctx.logManager().getLog(PacketBuilder.class); _log = ctx.logManager().getLog(PacketBuilder.class);
_transport = transport;
} }
public UDPPacket buildPacket(OutboundMessageState state, int fragment, PeerState peer) { public UDPPacket buildPacket(OutboundMessageState state, int fragment, PeerState peer) {
@ -136,7 +135,7 @@ public class PacketBuilder {
* *
* @return ready to send packet, or null if there was a problem * @return ready to send packet, or null if there was a problem
*/ */
public UDPPacket buildSessionCreatedPacket(InboundEstablishState state) { public UDPPacket buildSessionCreatedPacket(InboundEstablishState state, int externalPort, SessionKey ourIntroKey) {
UDPPacket packet = UDPPacket.acquire(_context); UDPPacket packet = UDPPacket.acquire(_context);
try { try {
packet.getPacket().setAddress(InetAddress.getByAddress(state.getSentIP())); packet.getPacket().setAddress(InetAddress.getByAddress(state.getSentIP()));
@ -187,7 +186,7 @@ public class PacketBuilder {
buf.append(" AliceIP: ").append(Base64.encode(state.getSentIP())); buf.append(" AliceIP: ").append(Base64.encode(state.getSentIP()));
buf.append(" AlicePort: ").append(state.getSentPort()); buf.append(" AlicePort: ").append(state.getSentPort());
buf.append(" BobIP: ").append(Base64.encode(state.getReceivedOurIP())); buf.append(" BobIP: ").append(Base64.encode(state.getReceivedOurIP()));
buf.append(" BobPort: ").append(_transport.getExternalPort()); buf.append(" BobPort: ").append(externalPort);
buf.append(" RelayTag: ").append(state.getSentRelayTag()); buf.append(" RelayTag: ").append(state.getSentRelayTag());
buf.append(" SignedOn: ").append(state.getSentSignedOnTime()); buf.append(" SignedOn: ").append(state.getSentSignedOnTime());
buf.append(" signature: ").append(Base64.encode(state.getSentSignature().getData())); buf.append(" signature: ").append(Base64.encode(state.getSentSignature().getData()));
@ -209,7 +208,7 @@ public class PacketBuilder {
if ( (off % 16) != 0) if ( (off % 16) != 0)
off += 16 - (off % 16); off += 16 - (off % 16);
packet.getPacket().setLength(off); packet.getPacket().setLength(off);
authenticate(packet, _transport.getIntroKey(), _transport.getIntroKey(), iv); authenticate(packet, ourIntroKey, ourIntroKey, iv);
setTo(packet, state.getSentIP(), state.getSentPort()); setTo(packet, state.getSentIP(), state.getSentPort());
_ivCache.release(iv); _ivCache.release(iv);
return packet; return packet;
@ -279,8 +278,8 @@ public class PacketBuilder {
* *
* @return ready to send packets, or null if there was a problem * @return ready to send packets, or null if there was a problem
*/ */
public UDPPacket[] buildSessionConfirmedPackets(OutboundEstablishState state) { public UDPPacket[] buildSessionConfirmedPackets(OutboundEstablishState state, RouterIdentity ourIdentity) {
byte identity[] = _context.router().getRouterInfo().getIdentity().toByteArray(); byte identity[] = ourIdentity.toByteArray();
int numFragments = identity.length / MAX_IDENTITY_FRAGMENT_SIZE; int numFragments = identity.length / MAX_IDENTITY_FRAGMENT_SIZE;
if (numFragments * MAX_IDENTITY_FRAGMENT_SIZE != identity.length) if (numFragments * MAX_IDENTITY_FRAGMENT_SIZE != identity.length)
numFragments++; numFragments++;

View File

@ -20,17 +20,16 @@ import net.i2p.util.Log;
* receiver's queue and pushing them as necessary. * receiver's queue and pushing them as necessary.
* *
*/ */
public class PacketHandler implements Runnable { public class PacketHandler {
private RouterContext _context; private RouterContext _context;
private Log _log; private Log _log;
private UDPTransport _transport; private UDPTransport _transport;
private UDPEndpoint _endpoint; private UDPEndpoint _endpoint;
private UDPPacketReader _reader;
private EstablishmentManager _establisher; private EstablishmentManager _establisher;
private InboundMessageFragments _inbound; private InboundMessageFragments _inbound;
private boolean _keepReading; private boolean _keepReading;
private static final int NUM_HANDLERS = 3; private static final int NUM_HANDLERS = 4;
public PacketHandler(RouterContext ctx, UDPTransport transport, UDPEndpoint endpoint, EstablishmentManager establisher, InboundMessageFragments inbound) { public PacketHandler(RouterContext ctx, UDPTransport transport, UDPEndpoint endpoint, EstablishmentManager establisher, InboundMessageFragments inbound) {
_context = ctx; _context = ctx;
@ -39,7 +38,6 @@ public class PacketHandler implements Runnable {
_endpoint = endpoint; _endpoint = endpoint;
_establisher = establisher; _establisher = establisher;
_inbound = inbound; _inbound = inbound;
_reader = new UDPPacketReader(ctx);
_context.statManager().createRateStat("udp.handleTime", "How long it takes to handle a received packet after its been pulled off the queue", "udp", new long[] { 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("udp.handleTime", "How long it takes to handle a received packet after its been pulled off the queue", "udp", new long[] { 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.queueTime", "How long after a packet is received can we begin handling it", "udp", new long[] { 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("udp.queueTime", "How long after a packet is received can we begin handling it", "udp", new long[] { 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.receivePacketSkew", "How long ago after the packet was sent did we receive it", "udp", new long[] { 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("udp.receivePacketSkew", "How long ago after the packet was sent did we receive it", "udp", new long[] { 10*60*1000, 60*60*1000 });
@ -48,7 +46,7 @@ public class PacketHandler implements Runnable {
public void startup() { public void startup() {
_keepReading = true; _keepReading = true;
for (int i = 0; i < NUM_HANDLERS; i++) { for (int i = 0; i < NUM_HANDLERS; i++) {
I2PThread t = new I2PThread(this, "Packet handler " + i + ": " + _endpoint.getListenPort()); I2PThread t = new I2PThread(new Handler(), "Packet handler " + i + ": " + _endpoint.getListenPort());
t.setDaemon(true); t.setDaemon(true);
t.start(); t.start();
} }
@ -58,30 +56,37 @@ public class PacketHandler implements Runnable {
_keepReading = false; _keepReading = false;
} }
public void run() { private class Handler implements Runnable {
while (_keepReading) { private UDPPacketReader _reader;
UDPPacket packet = _endpoint.receive(); public Handler() {
if (_log.shouldLog(Log.DEBUG)) _reader = new UDPPacketReader(_context);
_log.debug("Received the packet " + packet); }
long queueTime = packet.getLifetime();
long handleStart = _context.clock().now();
handlePacket(packet);
long handleTime = _context.clock().now() - handleStart;
_context.statManager().addRateData("udp.handleTime", handleTime, packet.getLifetime());
_context.statManager().addRateData("udp.queueTime", queueTime, packet.getLifetime());
if (handleTime > 1000) { public void run() {
if (_log.shouldLog(Log.WARN)) while (_keepReading) {
_log.warn("Took " + handleTime + " to process the packet " UDPPacket packet = _endpoint.receive();
+ packet + ": " + _reader); if (_log.shouldLog(Log.DEBUG))
_log.debug("Received the packet " + packet);
long queueTime = packet.getLifetime();
long handleStart = _context.clock().now();
handlePacket(_reader, packet);
long handleTime = _context.clock().now() - handleStart;
_context.statManager().addRateData("udp.handleTime", handleTime, packet.getLifetime());
_context.statManager().addRateData("udp.queueTime", queueTime, packet.getLifetime());
if (handleTime > 1000) {
if (_log.shouldLog(Log.WARN))
_log.warn("Took " + handleTime + " to process the packet "
+ packet + ": " + _reader);
}
// back to the cache with thee!
packet.release();
} }
// back to the cache with thee!
packet.release();
} }
} }
private void handlePacket(UDPPacket packet) { private void handlePacket(UDPPacketReader reader, UDPPacket packet) {
if (packet == null) return; if (packet == null) return;
InetAddress remAddr = packet.getPacket().getAddress(); InetAddress remAddr = packet.getPacket().getAddress();
@ -94,7 +99,7 @@ public class PacketHandler implements Runnable {
if (est != null) { if (est != null) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet received IS for an inbound establishment"); _log.debug("Packet received IS for an inbound establishment");
receivePacket(packet, est); receivePacket(reader, packet, est);
} else { } else {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet received is not for an inbound establishment"); _log.debug("Packet received is not for an inbound establishment");
@ -102,22 +107,22 @@ public class PacketHandler implements Runnable {
if (oest != null) { if (oest != null) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet received IS for an outbound establishment"); _log.debug("Packet received IS for an outbound establishment");
receivePacket(packet, oest); receivePacket(reader, packet, oest);
} else { } else {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet received is not for an inbound or outbound establishment"); _log.debug("Packet received is not for an inbound or outbound establishment");
// ok, not already known establishment, try as a new one // ok, not already known establishment, try as a new one
receivePacket(packet); receivePacket(reader, packet);
} }
} }
} else { } else {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet received IS for an existing peer"); _log.debug("Packet received IS for an existing peer");
receivePacket(packet, state); receivePacket(reader, packet, state);
} }
} }
private void receivePacket(UDPPacket packet, PeerState state) { private void receivePacket(UDPPacketReader reader, UDPPacket packet, PeerState state) {
boolean isValid = packet.validate(state.getCurrentMACKey()); boolean isValid = packet.validate(state.getCurrentMACKey());
if (!isValid) { if (!isValid) {
if (state.getNextMACKey() != null) if (state.getNextMACKey() != null)
@ -147,10 +152,10 @@ public class PacketHandler implements Runnable {
packet.decrypt(state.getCurrentCipherKey()); packet.decrypt(state.getCurrentCipherKey());
} }
handlePacket(packet, state, null, null); handlePacket(reader, packet, state, null, null);
} }
private void receivePacket(UDPPacket packet) { private void receivePacket(UDPPacketReader reader, UDPPacket packet) {
boolean isValid = packet.validate(_transport.getIntroKey()); boolean isValid = packet.validate(_transport.getIntroKey());
if (!isValid) { if (!isValid) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
@ -162,10 +167,10 @@ public class PacketHandler implements Runnable {
} }
packet.decrypt(_transport.getIntroKey()); packet.decrypt(_transport.getIntroKey());
handlePacket(packet, null, null, null); handlePacket(reader, packet, null, null, null);
} }
private void receivePacket(UDPPacket packet, InboundEstablishState state) { private void receivePacket(UDPPacketReader reader, UDPPacket packet, InboundEstablishState state) {
if ( (state != null) && (_log.shouldLog(Log.DEBUG)) ) { if ( (state != null) && (_log.shouldLog(Log.DEBUG)) ) {
StringBuffer buf = new StringBuffer(128); StringBuffer buf = new StringBuffer(128);
buf.append("Attempting to receive a packet on a known inbound state: "); buf.append("Attempting to receive a packet on a known inbound state: ");
@ -182,7 +187,7 @@ public class PacketHandler implements Runnable {
_log.warn("Valid introduction packet received for inbound con: " + packet); _log.warn("Valid introduction packet received for inbound con: " + packet);
packet.decrypt(state.getCipherKey()); packet.decrypt(state.getCipherKey());
handlePacket(packet, null, null, null); handlePacket(reader, packet, null, null, null);
return; return;
} else { } else {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
@ -192,10 +197,10 @@ public class PacketHandler implements Runnable {
} }
// ok, we couldn't handle it with the established stuff, so fall back // ok, we couldn't handle it with the established stuff, so fall back
// on earlier state packets // on earlier state packets
receivePacket(packet); receivePacket(reader, packet);
} }
private void receivePacket(UDPPacket packet, OutboundEstablishState state) { private void receivePacket(UDPPacketReader reader, UDPPacket packet, OutboundEstablishState state) {
if ( (state != null) && (_log.shouldLog(Log.DEBUG)) ) { if ( (state != null) && (_log.shouldLog(Log.DEBUG)) ) {
StringBuffer buf = new StringBuffer(128); StringBuffer buf = new StringBuffer(128);
buf.append("Attempting to receive a packet on a known outbound state: "); buf.append("Attempting to receive a packet on a known outbound state: ");
@ -213,7 +218,7 @@ public class PacketHandler implements Runnable {
_log.warn("Valid introduction packet received for outbound established con: " + packet); _log.warn("Valid introduction packet received for outbound established con: " + packet);
packet.decrypt(state.getCipherKey()); packet.decrypt(state.getCipherKey());
handlePacket(packet, null, state, null); handlePacket(reader, packet, null, state, null);
return; return;
} }
} }
@ -224,7 +229,7 @@ public class PacketHandler implements Runnable {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Valid introduction packet received for outbound established con with old intro key: " + packet); _log.warn("Valid introduction packet received for outbound established con with old intro key: " + packet);
packet.decrypt(state.getIntroKey()); packet.decrypt(state.getIntroKey());
handlePacket(packet, null, state, null); handlePacket(reader, packet, null, state, null);
return; return;
} else { } else {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
@ -233,7 +238,7 @@ public class PacketHandler implements Runnable {
// ok, we couldn't handle it with the established stuff, so fall back // ok, we couldn't handle it with the established stuff, so fall back
// on earlier state packets // on earlier state packets
receivePacket(packet); receivePacket(reader, packet);
} }
/** let packets be up to 30s slow */ /** let packets be up to 30s slow */
@ -242,10 +247,10 @@ public class PacketHandler implements Runnable {
/** /**
* Parse out the interesting bits and honor what it says * Parse out the interesting bits and honor what it says
*/ */
private void handlePacket(UDPPacket packet, PeerState state, OutboundEstablishState outState, InboundEstablishState inState) { private void handlePacket(UDPPacketReader reader, UDPPacket packet, PeerState state, OutboundEstablishState outState, InboundEstablishState inState) {
_reader.initialize(packet); reader.initialize(packet);
long now = _context.clock().now(); long now = _context.clock().now();
long when = _reader.readTimestamp() * 1000; long when = reader.readTimestamp() * 1000;
long skew = now - when; long skew = now - when;
if (skew > GRACE_PERIOD) { if (skew > GRACE_PERIOD) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
@ -263,31 +268,27 @@ public class PacketHandler implements Runnable {
int fromPort = packet.getPacket().getPort(); int fromPort = packet.getPacket().getPort();
String from = PeerState.calculateRemoteHostString(fromHost.getAddress(), fromPort); String from = PeerState.calculateRemoteHostString(fromHost.getAddress(), fromPort);
switch (_reader.readPayloadType()) { switch (reader.readPayloadType()) {
case UDPPacket.PAYLOAD_TYPE_SESSION_REQUEST: case UDPPacket.PAYLOAD_TYPE_SESSION_REQUEST:
_establisher.receiveSessionRequest(from, fromHost, fromPort, _reader); _establisher.receiveSessionRequest(from, fromHost, fromPort, reader);
break; break;
case UDPPacket.PAYLOAD_TYPE_SESSION_CONFIRMED: case UDPPacket.PAYLOAD_TYPE_SESSION_CONFIRMED:
_establisher.receiveSessionConfirmed(from, _reader); _establisher.receiveSessionConfirmed(from, reader);
break; break;
case UDPPacket.PAYLOAD_TYPE_SESSION_CREATED: case UDPPacket.PAYLOAD_TYPE_SESSION_CREATED:
_establisher.receiveSessionCreated(from, _reader); _establisher.receiveSessionCreated(from, reader);
break; break;
case UDPPacket.PAYLOAD_TYPE_DATA: case UDPPacket.PAYLOAD_TYPE_DATA:
if (outState != null) if (outState != null)
state = _establisher.receiveData(outState); state = _establisher.receiveData(outState);
handleData(packet, state); if (_log.shouldLog(Log.INFO))
_log.info("Received new DATA packet from " + state + ": " + packet);
_inbound.receiveData(state, reader.getDataReader());
break; break;
default: default:
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Unknown payload type: " + _reader.readPayloadType()); _log.warn("Unknown payload type: " + reader.readPayloadType());
return; return;
} }
} }
private void handleData(UDPPacket packet, PeerState peer) {
if (_log.shouldLog(Log.INFO))
_log.info("Received new DATA packet from " + peer + ": " + packet);
_inbound.receiveData(peer, _reader.getDataReader());
}
} }

View File

@ -79,6 +79,14 @@ public class PeerState {
private int _sendWindowBytes; private int _sendWindowBytes;
/** how many bytes can we send to the peer in the current second */ /** how many bytes can we send to the peer in the current second */
private int _sendWindowBytesRemaining; private int _sendWindowBytesRemaining;
private long _lastSendRefill;
private long _lastCongestionOccurred;
/**
* when sendWindowBytes is below this, grow the window size quickly,
* but after we reach it, grow it slowly
*
*/
private int _slowStartThreshold;
/** what IP is the peer sending and receiving packets on? */ /** what IP is the peer sending and receiving packets on? */
private byte[] _remoteIP; private byte[] _remoteIP;
/** what port is the peer sending and receiving packets on? */ /** what port is the peer sending and receiving packets on? */
@ -108,7 +116,8 @@ public class PeerState {
private static final int DEFAULT_SEND_WINDOW_BYTES = 16*1024; private static final int DEFAULT_SEND_WINDOW_BYTES = 16*1024;
private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES; private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES;
private static final int DEFAULT_MTU = 512; private static final int MAX_SEND_WINDOW_BYTES = 1024*1024;
private static final int DEFAULT_MTU = 1024;
public PeerState(I2PAppContext ctx) { public PeerState(I2PAppContext ctx) {
_context = ctx; _context = ctx;
@ -130,6 +139,9 @@ public class PeerState {
_remoteWantsPreviousACKs = false; _remoteWantsPreviousACKs = false;
_sendWindowBytes = DEFAULT_SEND_WINDOW_BYTES; _sendWindowBytes = DEFAULT_SEND_WINDOW_BYTES;
_sendWindowBytesRemaining = DEFAULT_SEND_WINDOW_BYTES; _sendWindowBytesRemaining = DEFAULT_SEND_WINDOW_BYTES;
_slowStartThreshold = MAX_SEND_WINDOW_BYTES/2;
_lastSendRefill = _context.clock().now();
_lastCongestionOccurred = -1;
_remoteIP = null; _remoteIP = null;
_remotePort = -1; _remotePort = -1;
_remoteRequiresIntroduction = false; _remoteRequiresIntroduction = false;
@ -298,9 +310,9 @@ public class PeerState {
*/ */
public boolean allocateSendingBytes(int size) { public boolean allocateSendingBytes(int size) {
long now = _context.clock().now(); long now = _context.clock().now();
if (_lastSendTime > 0) { if (_lastSendRefill + 1000 <= now) {
if (_lastSendTime + 1000 <= now) _sendWindowBytesRemaining = _sendWindowBytes;
_sendWindowBytesRemaining = _sendWindowBytes; _lastSendRefill = now;
} }
if (size <= _sendWindowBytesRemaining) { if (size <= _sendWindowBytesRemaining) {
_sendWindowBytesRemaining -= size; _sendWindowBytesRemaining -= size;
@ -334,6 +346,7 @@ public class PeerState {
_mtu = mtu; _mtu = mtu;
_mtuLastChecked = _context.clock().now(); _mtuLastChecked = _context.clock().now();
} }
public int getSlowStartThreshold() { return _slowStartThreshold; }
/** we received the message specified completely */ /** we received the message specified completely */
public void messageFullyReceived(Long messageId) { public void messageFullyReceived(Long messageId) {
@ -350,9 +363,16 @@ public class PeerState {
* *
*/ */
public void congestionOccurred() { public void congestionOccurred() {
long now = _context.clock().now();
if (_lastCongestionOccurred + 2000 > now)
return; // only shrink once every other second
_lastCongestionOccurred = now;
_sendWindowBytes /= 2; _sendWindowBytes /= 2;
if (_sendWindowBytes < MINIMUM_WINDOW_BYTES) if (_sendWindowBytes < MINIMUM_WINDOW_BYTES)
_sendWindowBytes = MINIMUM_WINDOW_BYTES; _sendWindowBytes = MINIMUM_WINDOW_BYTES;
if (_sendWindowBytes < _slowStartThreshold)
_slowStartThreshold = _sendWindowBytes;
} }
/** pull off the ACKs (Long) to send to the peer */ /** pull off the ACKs (Long) to send to the peer */
@ -368,7 +388,15 @@ public class PeerState {
/** we sent a message which was ACKed containing the given # of bytes */ /** we sent a message which was ACKed containing the given # of bytes */
public void messageACKed(int bytesACKed) { public void messageACKed(int bytesACKed) {
_consecutiveSendingSecondsWithoutACKs = 0; _consecutiveSendingSecondsWithoutACKs = 0;
_sendWindowBytes += bytesACKed; if (_sendWindowBytes <= _slowStartThreshold) {
_sendWindowBytes += bytesACKed;
} else {
double prob = bytesACKed / _sendWindowBytes;
if (_context.random().nextDouble() <= prob)
_sendWindowBytes += bytesACKed;
}
if (_sendWindowBytes > MAX_SEND_WINDOW_BYTES)
_sendWindowBytes = MAX_SEND_WINDOW_BYTES;
_lastReceiveTime = _context.clock().now(); _lastReceiveTime = _context.clock().now();
_messagesSent++; _messagesSent++;
} }

View File

@ -0,0 +1,97 @@
package net.i2p.router.transport.udp;
import java.util.ArrayList;
import java.util.List;
import net.i2p.data.RouterInfo;
import net.i2p.data.i2np.DataMessage;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
/**
*
*/
class UDPFlooder implements Runnable {
private RouterContext _context;
private Log _log;
private UDPTransport _transport;
private List _peers;
private boolean _alive;
public UDPFlooder(RouterContext ctx, UDPTransport transport) {
_context = ctx;
_log = ctx.logManager().getLog(UDPFlooder.class);
_transport = transport;
_peers = new ArrayList(4);
}
public void addPeer(PeerState peer) {
synchronized (_peers) {
_peers.add(peer);
_peers.notifyAll();
}
}
public void startup() {
_alive = true;
I2PThread t = new I2PThread(this, "flooder");
t.setDaemon(true);
t.start();
}
public void shutdown() {
_alive = false;
synchronized (_peers) {
_peers.notifyAll();
}
}
public void run() {
while (_alive) {
try {
synchronized (_peers) {
if (_peers.size() <= 0)
_peers.wait();
}
} catch (InterruptedException ie) {}
// peers always grows, so this is fairly safe
for (int i = 0; i < _peers.size(); i++) {
PeerState peer = (PeerState)_peers.get(i);
DataMessage m = new DataMessage(_context);
byte data[] = new byte[4096];
_context.random().nextBytes(data);
m.setData(data);
m.setMessageExpiration(_context.clock().now() + 10*1000);
m.setUniqueId(_context.random().nextLong(I2NPMessage.MAX_ID_VALUE));
if (true) {
OutNetMessage msg = new OutNetMessage(_context);
msg.setMessage(m);
msg.setExpiration(m.getMessageExpiration());
msg.setPriority(500);
RouterInfo to = _context.netDb().lookupRouterInfoLocally(peer.getRemotePeer());
if (to == null)
continue;
msg.setTarget(to);
_context.statManager().getStatLog().addData(peer.getRemotePeer().toBase64().substring(0,6), "udp.floodDataSent", 1, 0);
_transport.send(msg);
} else {
_transport.send(m, peer);
}
}
long floodDelay = calcFloodDelay();
try { Thread.sleep(floodDelay); } catch (InterruptedException ie) {}
}
}
private long calcFloodDelay() {
try {
return Long.parseLong(_context.getProperty("udp.floodDelay", "30000"));
} catch (Exception e) {
return 30*1000;
}
}
}

View File

@ -52,7 +52,7 @@ public class UDPPacketReader {
/** what type of payload is in here? */ /** what type of payload is in here? */
public int readPayloadType() { public int readPayloadType() {
// 3 highest order bits == payload type // 3 highest order bits == payload type
return _message[_payloadBeginOffset] >>> 4; return (_message[_payloadBeginOffset] & 0xFF) >>> 4;
} }
/** does this packet include rekeying data? */ /** does this packet include rekeying data? */
@ -106,6 +106,11 @@ public class UDPPacketReader {
} }
} }
public void toRawString(StringBuffer buf) {
if (_message != null)
buf.append(Base64.encode(_message, _payloadBeginOffset, _payloadLength));
}
/** Help read the SessionRequest payload */ /** Help read the SessionRequest payload */
public class SessionRequestReader { public class SessionRequestReader {
public static final int X_LENGTH = 256; public static final int X_LENGTH = 256;
@ -187,7 +192,7 @@ public class UDPPacketReader {
/** which fragment is this? */ /** which fragment is this? */
public int readCurrentFragmentNum() { public int readCurrentFragmentNum() {
int readOffset = readBodyOffset(); int readOffset = readBodyOffset();
return _message[readOffset] >>> 4; return (_message[readOffset] & 0xFF) >>> 4;
} }
/** how many fragments will there be? */ /** how many fragments will there be? */
public int readTotalFragmentNum() { public int readTotalFragmentNum() {
@ -323,7 +328,7 @@ public class UDPPacketReader {
public int readMessageFragmentNum(int fragmentNum) { public int readMessageFragmentNum(int fragmentNum) {
int off = getFragmentBegin(fragmentNum); int off = getFragmentBegin(fragmentNum);
off += 4; // messageId off += 4; // messageId
return _message[off] >>> 3; return (_message[off] & 0xFF) >>> 3;
} }
public boolean readMessageIsLast(int fragmentNum) { public boolean readMessageIsLast(int fragmentNum) {
int off = getFragmentBegin(fragmentNum); int off = getFragmentBegin(fragmentNum);
@ -434,7 +439,13 @@ public class UDPPacketReader {
for (int i = 0; i < numFragments; i++) { for (int i = 0; i < numFragments; i++) {
buf.append("containing messageId "); buf.append("containing messageId ");
buf.append(DataHelper.fromLong(_message, off, 4)); buf.append(DataHelper.fromLong(_message, off, 4));
off += 5; // messageId+info off += 4;
int fragNum = (_message[off] & 0XFF) >>> 3;
boolean isLast = (_message[off] & (1 << 2)) != 0;
off++;
buf.append(" frag# ").append(fragNum);
buf.append(" isLast? ").append(isLast);
buf.append(" info ").append((int)_message[off-1]);
int size = (int)DataHelper.fromLong(_message, off, 2); int size = (int)DataHelper.fromLong(_message, off, 2);
buf.append(" with ").append(size).append(" bytes"); buf.append(" with ").append(size).append(" bytes");
buf.append(' '); buf.append(' ');
@ -444,5 +455,17 @@ public class UDPPacketReader {
return buf.toString(); return buf.toString();
} }
public void toRawString(StringBuffer buf) {
UDPPacketReader.this.toRawString(buf);
buf.append(" payload: ");
int off = getFragmentBegin(0); // first fragment
off += 4; // messageId
off++; // fragment info
int size = (int)DataHelper.fromLong(_message, off, 2);
off += 2;
buf.append(Base64.encode(_message, off, size));
}
} }
} }

View File

@ -49,6 +49,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private OutboundRefiller _refiller; private OutboundRefiller _refiller;
private PacketPusher _pusher; private PacketPusher _pusher;
private InboundMessageFragments _inboundFragments; private InboundMessageFragments _inboundFragments;
private UDPFlooder _flooder;
/** list of RelayPeer objects for people who will relay to us */ /** list of RelayPeer objects for people who will relay to us */
private List _relayPeers; private List _relayPeers;
@ -84,6 +85,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
/** configure the priority queue with the given weighting per priority group */ /** configure the priority queue with the given weighting per priority group */
private static final int PRIORITY_WEIGHT[] = new int[] { 1, 1, 1, 1, 1, 2 }; private static final int PRIORITY_WEIGHT[] = new int[] { 1, 1, 1, 1, 1, 2 };
/** should we flood all UDP peers with the configured rate? */
private static final boolean SHOULD_FLOOD_PEERS = false;
public UDPTransport(RouterContext ctx) { public UDPTransport(RouterContext ctx) {
super(ctx); super(ctx);
_context = ctx; _context = ctx;
@ -101,6 +105,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_fragments = new OutboundMessageFragments(_context, this); _fragments = new OutboundMessageFragments(_context, this);
_inboundFragments = new InboundMessageFragments(_context, _fragments, this); _inboundFragments = new InboundMessageFragments(_context, _fragments, this);
_flooder = new UDPFlooder(_context, this);
} }
public void startup() { public void startup() {
@ -118,6 +123,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_refiller.shutdown(); _refiller.shutdown();
if (_inboundFragments != null) if (_inboundFragments != null)
_inboundFragments.shutdown(); _inboundFragments.shutdown();
if (_flooder != null)
_flooder.shutdown();
_introKey = new SessionKey(new byte[SessionKey.KEYSIZE_BYTES]); _introKey = new SessionKey(new byte[SessionKey.KEYSIZE_BYTES]);
System.arraycopy(_context.routerHash().getData(), 0, _introKey.getData(), 0, SessionKey.KEYSIZE_BYTES); System.arraycopy(_context.routerHash().getData(), 0, _introKey.getData(), 0, SessionKey.KEYSIZE_BYTES);
@ -165,6 +172,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (_refiller == null) if (_refiller == null)
_refiller = new OutboundRefiller(_context, _fragments, _outboundMessages); _refiller = new OutboundRefiller(_context, _fragments, _outboundMessages);
if (_flooder == null)
_flooder = new UDPFlooder(_context, this);
_endpoint.startup(); _endpoint.startup();
_establisher.startup(); _establisher.startup();
_handler.startup(); _handler.startup();
@ -173,9 +183,12 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_pusher = new PacketPusher(_context, _fragments, _endpoint.getSender()); _pusher = new PacketPusher(_context, _fragments, _endpoint.getSender());
_pusher.startup(); _pusher.startup();
_refiller.startup(); _refiller.startup();
_flooder.startup();
} }
public void shutdown() { public void shutdown() {
if (_flooder != null)
_flooder.shutdown();
if (_refiller != null) if (_refiller != null)
_refiller.shutdown(); _refiller.shutdown();
if (_handler != null) if (_handler != null)
@ -296,6 +309,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_context.shitlist().unshitlistRouter(peer.getRemotePeer()); _context.shitlist().unshitlistRouter(peer.getRemotePeer());
if (SHOULD_FLOOD_PEERS)
_flooder.addPeer(peer);
return true; return true;
} }
@ -321,6 +337,10 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
public String getStyle() { return STYLE; } public String getStyle() { return STYLE; }
public void send(OutNetMessage msg) { public void send(OutNetMessage msg) {
if (msg == null) return;
if (msg.getTarget() == null) return;
if (msg.getTarget().getIdentity() == null) return;
Hash to = msg.getTarget().getIdentity().calculateHash(); Hash to = msg.getTarget().getIdentity().calculateHash();
if (getPeerState(to) != null) { if (getPeerState(to) != null) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@ -471,7 +491,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
buf.append("<table border=\"1\">\n"); buf.append("<table border=\"1\">\n");
buf.append(" <tr><td><b>Peer</b></td><td><b>Location</b></td>\n"); buf.append(" <tr><td><b>Peer</b></td><td><b>Location</b></td>\n");
buf.append(" <td><b>Last send</b></td><td><b>Last recv</b></td>\n"); buf.append(" <td><b>Last send</b></td><td><b>Last recv</b></td>\n");
buf.append(" <td><b>Lifetime</b></td><td><b>Window size</b></td>\n"); buf.append(" <td><b>Lifetime</b></td><td><b>cwnd</b></td><td><b>ssthresh</b></td>\n");
buf.append(" <td><b>Sent</b></td><td><b>Received</b></td>\n"); buf.append(" <td><b>Sent</b></td><td><b>Received</b></td>\n");
buf.append(" </tr>\n"); buf.append(" </tr>\n");
out.write(buf.toString()); out.write(buf.toString());
@ -517,6 +537,10 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
buf.append(peer.getSendWindowBytes()); buf.append(peer.getSendWindowBytes());
buf.append("</td>"); buf.append("</td>");
buf.append("<td>");
buf.append(peer.getSlowStartThreshold());
buf.append("</td>");
buf.append("<td>"); buf.append("<td>");
buf.append(peer.getMessagesSent()); buf.append(peer.getMessagesSent());
buf.append("</td>"); buf.append("</td>");
@ -533,7 +557,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
out.write("</table>\n"); out.write("</table>\n");
} }
/** /**
* Cache the bid to reduce object churn * Cache the bid to reduce object churn
*/ */

View File

@ -240,7 +240,8 @@ class TestJob extends JobImpl {
} }
public String getName() { return "Tunnel test timeout"; } public String getName() { return "Tunnel test timeout"; }
public void runJob() { public void runJob() {
_log.error("Timeout: found? " + _found, getAddedBy()); if (_log.shouldLog(Log.WARN))
_log.warn("Timeout: found? " + _found, getAddedBy());
if (!_found) if (!_found)
testFailed(getContext().clock().now() - _started); testFailed(getContext().clock().now() - _started);
} }